update exports for baseparse API changes
[platform/upstream/gstreamer.git] / gst / gstpoll.c
1 /* GStreamer
2  * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4  * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5  * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6  *
7  * gstpoll.c: File descriptor set
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24 /**
25  * SECTION:gstpoll
26  * @short_description: Keep track of file descriptors and make it possible
27  *                     to wait on them in a cancelable way
28  *
29  * A #GstPoll keeps track of file descriptors much like fd_set (used with
30  * select()) or a struct pollfd array (used with poll()). Once created with
31  * gst_poll_new(), the set can be used to wait for file descriptors to be
32  * readable and/or writable. It is possible to make this wait be controlled
33  * by specifying %TRUE for the @controllable flag when creating the set (or
34  * later calling gst_poll_set_controllable()).
35  *
36  * New file descriptors are added to the set using gst_poll_add_fd(), and
37  * removed using gst_poll_remove_fd(). Controlling which file descriptors
38  * should be waited for to become readable and/or writable are done using
39  * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
40  *
41  * Use gst_poll_wait() to wait for the file descriptors to actually become
42  * readable and/or writable, or to timeout if no file descriptor is available
43  * in time. The wait can be controlled by calling gst_poll_restart() and
44  * gst_poll_set_flushing().
45  *
46  * Once the file descriptor set has been waited for, one can use
47  * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48  * gst_poll_fd_has_error() to see if it has generated an error,
49  * gst_poll_fd_can_read() to see if it is possible to read from the file
50  * descriptor, and gst_poll_fd_can_write() to see if it is possible to
51  * write to it.
52  *
53  */
54
55 #ifdef HAVE_CONFIG_H
56 #include "config.h"
57 #endif
58
59 #include "gst_private.h"
60 #include "glib-compat-private.h"
61
62 #include <sys/types.h>
63
64 #ifdef HAVE_UNISTD_H
65 #include <unistd.h>
66 #endif
67
68 #include <errno.h>
69 #include <fcntl.h>
70
71 #include <glib.h>
72
73 #ifdef G_OS_WIN32
74 #include <winsock2.h>
75 #else
76 #define _GNU_SOURCE 1
77 #ifdef HAVE_SYS_POLL_H
78 #include <sys/poll.h>
79 #endif
80 #ifdef HAVE_POLL_H
81 #include <poll.h>
82 #endif
83 #include <sys/time.h>
84 #include <sys/socket.h>
85 #endif
86
87 /* OS/X needs this because of bad headers */
88 #include <string.h>
89
90 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
91  * so we prefer our own poll emulation.
92  */
93 #if defined(BROKEN_POLL)
94 #undef HAVE_POLL
95 #endif
96
97 #include "gstpoll.h"
98
99 #define GST_CAT_DEFAULT GST_CAT_POLL
100
101 #ifdef G_OS_WIN32
102 typedef struct _WinsockFd WinsockFd;
103
104 struct _WinsockFd
105 {
106   gint fd;
107   glong event_mask;
108   WSANETWORKEVENTS events;
109   glong ignored_event_mask;
110 };
111 #endif
112
113 typedef enum
114 {
115   GST_POLL_MODE_AUTO,
116   GST_POLL_MODE_SELECT,
117   GST_POLL_MODE_PSELECT,
118   GST_POLL_MODE_POLL,
119   GST_POLL_MODE_PPOLL,
120   GST_POLL_MODE_WINDOWS
121 } GstPollMode;
122
123 struct _GstPoll
124 {
125   GstPollMode mode;
126
127   GMutex lock;
128   /* array of fds, always written to and read from with lock */
129   GArray *fds;
130   /* array of active fds, only written to from the waiting thread with the
131    * lock and read from with the lock or without the lock from the waiting
132    * thread */
133   GArray *active_fds;
134
135 #ifndef G_OS_WIN32
136   gchar buf[1];
137   GstPollFD control_read_fd;
138   GstPollFD control_write_fd;
139 #else
140   GArray *active_fds_ignored;
141   GArray *events;
142   GArray *active_events;
143
144   HANDLE wakeup_event;
145 #endif
146
147   gboolean controllable;
148   volatile gint waiting;
149   volatile gint control_pending;
150   volatile gint flushing;
151   gboolean timer;
152   volatile gint rebuild;
153 };
154
155 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
156     gboolean active);
157 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
158
159 #define IS_FLUSHING(s)      (g_atomic_int_get(&(s)->flushing))
160 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
161
162 #define INC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, 1))
163 #define DEC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, -1))
164 #define GET_WAITING(s)      (g_atomic_int_get(&(s)->waiting))
165
166 #define TEST_REBUILD(s)     (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
167 #define MARK_REBUILD(s)     (g_atomic_int_set(&(s)->rebuild, 1))
168
169 #ifndef G_OS_WIN32
170 #define WAKE_EVENT(s)       (write ((s)->control_write_fd.fd, "W", 1) == 1)
171 #define RELEASE_EVENT(s)    (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1)
172 #else
173 #define WAKE_EVENT(s)       (SetEvent ((s)->wakeup_event), errno = GetLastError () == NO_ERROR ? 0 : EACCES, errno == 0 ? 1 : 0)
174 #define RELEASE_EVENT(s)    (ResetEvent ((s)->wakeup_event))
175 #endif
176
177 /* the poll/select call is also performed on a control socket, that way
178  * we can send special commands to control it */
179 static inline gboolean
180 raise_wakeup (GstPoll * set)
181 {
182   gboolean result = TRUE;
183
184   if (g_atomic_int_add (&set->control_pending, 1) == 0) {
185     /* raise when nothing pending */
186     GST_LOG ("%p: raise", set);
187     result = WAKE_EVENT (set);
188   }
189   return result;
190 }
191
192 /* note how bad things can happen when the 2 threads both raise and release the
193  * wakeup. This is however not a problem because you must always pair a raise
194  * with a release */
195 static inline gboolean
196 release_wakeup (GstPoll * set)
197 {
198   gboolean result = TRUE;
199
200   if (g_atomic_int_dec_and_test (&set->control_pending)) {
201     GST_LOG ("%p: release", set);
202     result = RELEASE_EVENT (set);
203   }
204   return result;
205 }
206
207 static inline gint
208 release_all_wakeup (GstPoll * set)
209 {
210   gint old;
211
212   while (TRUE) {
213     if (!(old = g_atomic_int_get (&set->control_pending)))
214       /* nothing pending, just exit */
215       break;
216
217     /* try to remove all pending control messages */
218     if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
219       /* we managed to remove all messages, read the control socket */
220       if (RELEASE_EVENT (set))
221         break;
222       else
223         /* retry again until we read it successfully */
224         g_atomic_int_add (&set->control_pending, 1);
225     }
226   }
227   return old;
228 }
229
230 static gint
231 find_index (GArray * array, GstPollFD * fd)
232 {
233 #ifndef G_OS_WIN32
234   struct pollfd *ifd;
235 #else
236   WinsockFd *ifd;
237 #endif
238   guint i;
239
240   /* start by assuming the index found in the fd is still valid */
241   if (fd->idx >= 0 && fd->idx < array->len) {
242 #ifndef G_OS_WIN32
243     ifd = &g_array_index (array, struct pollfd, fd->idx);
244 #else
245     ifd = &g_array_index (array, WinsockFd, fd->idx);
246 #endif
247
248     if (ifd->fd == fd->fd) {
249       return fd->idx;
250     }
251   }
252
253   /* the pollfd array has changed and we need to lookup the fd again */
254   for (i = 0; i < array->len; i++) {
255 #ifndef G_OS_WIN32
256     ifd = &g_array_index (array, struct pollfd, i);
257 #else
258     ifd = &g_array_index (array, WinsockFd, i);
259 #endif
260
261     if (ifd->fd == fd->fd) {
262       fd->idx = (gint) i;
263       return fd->idx;
264     }
265   }
266
267   fd->idx = -1;
268   return fd->idx;
269 }
270
271 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
272 /* check if all file descriptors will fit in an fd_set */
273 static gboolean
274 selectable_fds (GstPoll * set)
275 {
276   guint i;
277
278   g_mutex_lock (&set->lock);
279   for (i = 0; i < set->fds->len; i++) {
280     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
281
282     if (pfd->fd >= FD_SETSIZE)
283       goto too_many;
284   }
285   g_mutex_unlock (&set->lock);
286
287   return TRUE;
288
289 too_many:
290   {
291     g_mutex_unlock (&set->lock);
292     return FALSE;
293   }
294 }
295
296 /* check if the timeout will convert to a timeout value used for poll()
297  * without a loss of precision
298  */
299 static gboolean
300 pollable_timeout (GstClockTime timeout)
301 {
302   if (timeout == GST_CLOCK_TIME_NONE)
303     return TRUE;
304
305   /* not a nice multiple of milliseconds */
306   if (timeout % 1000000)
307     return FALSE;
308
309   return TRUE;
310 }
311 #endif
312
313 static GstPollMode
314 choose_mode (GstPoll * set, GstClockTime timeout)
315 {
316   GstPollMode mode;
317
318   if (set->mode == GST_POLL_MODE_AUTO) {
319 #ifdef HAVE_PPOLL
320     mode = GST_POLL_MODE_PPOLL;
321 #elif defined(HAVE_POLL)
322     if (!selectable_fds (set) || pollable_timeout (timeout)) {
323       mode = GST_POLL_MODE_POLL;
324     } else {
325 #ifdef HAVE_PSELECT
326       mode = GST_POLL_MODE_PSELECT;
327 #else
328       mode = GST_POLL_MODE_SELECT;
329 #endif
330     }
331 #elif defined(HAVE_PSELECT)
332     mode = GST_POLL_MODE_PSELECT;
333 #else
334     mode = GST_POLL_MODE_SELECT;
335 #endif
336   } else {
337     mode = set->mode;
338   }
339   return mode;
340 }
341
342 #ifndef G_OS_WIN32
343 static gint
344 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
345     fd_set * errorfds)
346 {
347   gint max_fd = -1;
348   guint i;
349
350   FD_ZERO (readfds);
351   FD_ZERO (writefds);
352   FD_ZERO (errorfds);
353
354   g_mutex_lock (&set->lock);
355
356   for (i = 0; i < set->active_fds->len; i++) {
357     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
358
359     if (pfd->fd < FD_SETSIZE) {
360       if (pfd->events & POLLIN)
361         FD_SET (pfd->fd, readfds);
362       if (pfd->events & POLLOUT)
363         FD_SET (pfd->fd, writefds);
364       if (pfd->events)
365         FD_SET (pfd->fd, errorfds);
366       if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
367         max_fd = pfd->fd;
368     }
369   }
370
371   g_mutex_unlock (&set->lock);
372
373   return max_fd;
374 }
375
376 static void
377 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
378     fd_set * errorfds)
379 {
380   guint i;
381
382   g_mutex_lock (&set->lock);
383
384   for (i = 0; i < set->active_fds->len; i++) {
385     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
386
387     if (pfd->fd < FD_SETSIZE) {
388       pfd->revents = 0;
389       if (FD_ISSET (pfd->fd, readfds))
390         pfd->revents |= POLLIN;
391       if (FD_ISSET (pfd->fd, writefds))
392         pfd->revents |= POLLOUT;
393       if (FD_ISSET (pfd->fd, errorfds))
394         pfd->revents |= POLLERR;
395     }
396   }
397
398   g_mutex_unlock (&set->lock);
399 }
400 #else /* G_OS_WIN32 */
401 /*
402  * Translate errors thrown by the Winsock API used by GstPoll:
403  *   WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
404  */
405 static gint
406 gst_poll_winsock_error_to_errno (DWORD last_error)
407 {
408   switch (last_error) {
409     case WSA_INVALID_HANDLE:
410     case WSAEINVAL:
411     case WSAENOTSOCK:
412       return EBADF;
413
414     case WSA_NOT_ENOUGH_MEMORY:
415       return ENOMEM;
416
417       /*
418        * Anything else, including:
419        *   WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
420        *   WSANOTINITIALISED
421        */
422     default:
423       return EINVAL;
424   }
425 }
426
427 static void
428 gst_poll_free_winsock_event (GstPoll * set, gint idx)
429 {
430   WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
431   HANDLE event = g_array_index (set->events, HANDLE, idx);
432
433   WSAEventSelect (wfd->fd, event, 0);
434   CloseHandle (event);
435 }
436
437 static void
438 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
439     gboolean active)
440 {
441   WinsockFd *wfd;
442
443   wfd = &g_array_index (set->fds, WinsockFd, idx);
444
445   if (active)
446     wfd->event_mask |= flags;
447   else
448     wfd->event_mask &= ~flags;
449
450   /* reset ignored state if the new mask doesn't overlap at all */
451   if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
452     wfd->ignored_event_mask = 0;
453 }
454
455 static gboolean
456 gst_poll_prepare_winsock_active_sets (GstPoll * set)
457 {
458   guint i;
459
460   g_array_set_size (set->active_fds, 0);
461   g_array_set_size (set->active_fds_ignored, 0);
462   g_array_set_size (set->active_events, 0);
463   g_array_append_val (set->active_events, set->wakeup_event);
464
465   for (i = 0; i < set->fds->len; i++) {
466     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
467     HANDLE event = g_array_index (set->events, HANDLE, i);
468
469     if (wfd->ignored_event_mask == 0) {
470       gint ret;
471
472       g_array_append_val (set->active_fds, *wfd);
473       g_array_append_val (set->active_events, event);
474
475       ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
476       if (G_UNLIKELY (ret != 0)) {
477         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
478         return FALSE;
479       }
480     } else {
481       g_array_append_val (set->active_fds_ignored, wfd);
482     }
483   }
484
485   return TRUE;
486 }
487
488 static gint
489 gst_poll_collect_winsock_events (GstPoll * set)
490 {
491   gint res, i;
492
493   /*
494    * We need to check which events are signaled, and call
495    * WSAEnumNetworkEvents for those that are, which resets
496    * the event and clears the internal network event records.
497    */
498   res = 0;
499   for (i = 0; i < set->active_fds->len; i++) {
500     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
501     HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
502     DWORD wait_ret;
503
504     wait_ret = WaitForSingleObject (event, 0);
505     if (wait_ret == WAIT_OBJECT_0) {
506       gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
507
508       if (G_UNLIKELY (enum_ret != 0)) {
509         res = -1;
510         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
511         break;
512       }
513
514       res++;
515     } else {
516       /* clear any previously stored result */
517       memset (&wfd->events, 0, sizeof (wfd->events));
518     }
519   }
520
521   /* If all went well we also need to reset the ignored fds. */
522   if (res >= 0) {
523     res += set->active_fds_ignored->len;
524
525     for (i = 0; i < set->active_fds_ignored->len; i++) {
526       WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
527
528       wfd->ignored_event_mask = 0;
529     }
530
531     g_array_set_size (set->active_fds_ignored, 0);
532   }
533
534   return res;
535 }
536 #endif
537
538 /**
539  * gst_poll_new: (skip)
540  * @controllable: whether it should be possible to control a wait.
541  *
542  * Create a new file descriptor set. If @controllable, it
543  * is possible to restart or flush a call to gst_poll_wait() with
544  * gst_poll_restart() and gst_poll_set_flushing() respectively.
545  *
546  * Free-function: gst_poll_free
547  *
548  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
549  *     Free with gst_poll_free().
550  */
551 GstPoll *
552 gst_poll_new (gboolean controllable)
553 {
554   GstPoll *nset;
555
556   GST_DEBUG ("controllable : %d", controllable);
557
558   nset = g_slice_new0 (GstPoll);
559   g_mutex_init (&nset->lock);
560 #ifndef G_OS_WIN32
561   nset->mode = GST_POLL_MODE_AUTO;
562   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
563   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
564   nset->control_read_fd.fd = -1;
565   nset->control_write_fd.fd = -1;
566   {
567     gint control_sock[2];
568
569     if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
570       goto no_socket_pair;
571
572     fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
573     fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
574
575     nset->control_read_fd.fd = control_sock[0];
576     nset->control_write_fd.fd = control_sock[1];
577
578     gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
579     gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
580   }
581 #else
582   nset->mode = GST_POLL_MODE_WINDOWS;
583   nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
584   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
585   nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
586   nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
587   nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
588
589   nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
590 #endif
591
592   /* ensure (re)build, though already sneakily set in non-windows case */
593   MARK_REBUILD (nset);
594
595   nset->controllable = controllable;
596
597   return nset;
598
599   /* ERRORS */
600 #ifndef G_OS_WIN32
601 no_socket_pair:
602   {
603     GST_WARNING ("%p: can't create socket pair !", nset);
604     gst_poll_free (nset);
605     return NULL;
606   }
607 #endif
608 }
609
610 /**
611  * gst_poll_new_timer: (skip)
612  *
613  * Create a new poll object that can be used for scheduling cancellable
614  * timeouts.
615  *
616  * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
617  * performed from different threads. 
618  *
619  * Free-function: gst_poll_free
620  *
621  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
622  *     Free with gst_poll_free().
623  */
624 GstPoll *
625 gst_poll_new_timer (void)
626 {
627   GstPoll *poll;
628
629   /* make a new controllable poll set */
630   if (!(poll = gst_poll_new (TRUE)))
631     goto done;
632
633   /* we are a timer */
634   poll->timer = TRUE;
635
636 done:
637   return poll;
638 }
639
640 /**
641  * gst_poll_free:
642  * @set: (transfer full): a file descriptor set.
643  *
644  * Free a file descriptor set.
645  */
646 void
647 gst_poll_free (GstPoll * set)
648 {
649   g_return_if_fail (set != NULL);
650
651   GST_DEBUG ("%p: freeing", set);
652
653 #ifndef G_OS_WIN32
654   if (set->control_write_fd.fd >= 0)
655     close (set->control_write_fd.fd);
656   if (set->control_read_fd.fd >= 0)
657     close (set->control_read_fd.fd);
658 #else
659   CloseHandle (set->wakeup_event);
660
661   {
662     guint i;
663
664     for (i = 0; i < set->events->len; i++)
665       gst_poll_free_winsock_event (set, i);
666   }
667
668   g_array_free (set->active_events, TRUE);
669   g_array_free (set->events, TRUE);
670   g_array_free (set->active_fds_ignored, TRUE);
671 #endif
672
673   g_array_free (set->active_fds, TRUE);
674   g_array_free (set->fds, TRUE);
675   g_mutex_clear (&set->lock);
676   g_slice_free (GstPoll, set);
677 }
678
679 /**
680  * gst_poll_get_read_gpollfd:
681  * @set: a #GstPoll
682  * @fd: a #GPollFD
683  *
684  * Get a GPollFD for the reading part of the control socket. This is useful when
685  * integrating with a GSource and GMainLoop.
686  */
687 void
688 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
689 {
690   g_return_if_fail (set != NULL);
691   g_return_if_fail (fd != NULL);
692
693 #ifndef G_OS_WIN32
694   fd->fd = set->control_read_fd.fd;
695 #else
696 #if GLIB_SIZEOF_VOID_P == 8
697   fd->fd = (gint64) set->wakeup_event;
698 #else
699   fd->fd = (gint) set->wakeup_event;
700 #endif
701 #endif
702   fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
703   fd->revents = 0;
704 }
705
706 /**
707  * gst_poll_fd_init:
708  * @fd: a #GstPollFD
709  *
710  * Initializes @fd. Alternatively you can initialize it with
711  * #GST_POLL_FD_INIT.
712  */
713 void
714 gst_poll_fd_init (GstPollFD * fd)
715 {
716   g_return_if_fail (fd != NULL);
717
718   fd->fd = -1;
719   fd->idx = -1;
720 }
721
722 static gboolean
723 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
724 {
725   gint idx;
726
727   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
728
729   idx = find_index (set->fds, fd);
730   if (idx < 0) {
731 #ifndef G_OS_WIN32
732     struct pollfd nfd;
733
734     nfd.fd = fd->fd;
735     nfd.events = POLLERR | POLLNVAL | POLLHUP;
736     nfd.revents = 0;
737
738     g_array_append_val (set->fds, nfd);
739
740     fd->idx = set->fds->len - 1;
741 #else
742     WinsockFd wfd;
743     HANDLE event;
744
745     wfd.fd = fd->fd;
746     wfd.event_mask = FD_CLOSE;
747     memset (&wfd.events, 0, sizeof (wfd.events));
748     wfd.ignored_event_mask = 0;
749     event = WSACreateEvent ();
750
751     g_array_append_val (set->fds, wfd);
752     g_array_append_val (set->events, event);
753
754     fd->idx = set->fds->len - 1;
755 #endif
756     MARK_REBUILD (set);
757   } else {
758     GST_WARNING ("%p: fd already added !", set);
759   }
760
761   return TRUE;
762 }
763
764 /**
765  * gst_poll_add_fd:
766  * @set: a file descriptor set.
767  * @fd: a file descriptor.
768  *
769  * Add a file descriptor to the file descriptor set.
770  *
771  * Returns: %TRUE if the file descriptor was successfully added to the set.
772  */
773 gboolean
774 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
775 {
776   gboolean ret;
777
778   g_return_val_if_fail (set != NULL, FALSE);
779   g_return_val_if_fail (fd != NULL, FALSE);
780   g_return_val_if_fail (fd->fd >= 0, FALSE);
781
782   g_mutex_lock (&set->lock);
783
784   ret = gst_poll_add_fd_unlocked (set, fd);
785
786   g_mutex_unlock (&set->lock);
787
788   return ret;
789 }
790
791 /**
792  * gst_poll_remove_fd:
793  * @set: a file descriptor set.
794  * @fd: a file descriptor.
795  *
796  * Remove a file descriptor from the file descriptor set.
797  *
798  * Returns: %TRUE if the file descriptor was successfully removed from the set.
799  */
800 gboolean
801 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
802 {
803   gint idx;
804
805   g_return_val_if_fail (set != NULL, FALSE);
806   g_return_val_if_fail (fd != NULL, FALSE);
807   g_return_val_if_fail (fd->fd >= 0, FALSE);
808
809
810   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
811
812   g_mutex_lock (&set->lock);
813
814   /* get the index, -1 is an fd that is not added */
815   idx = find_index (set->fds, fd);
816   if (idx >= 0) {
817 #ifdef G_OS_WIN32
818     gst_poll_free_winsock_event (set, idx);
819     g_array_remove_index_fast (set->events, idx);
820 #endif
821
822     /* remove the fd at index, we use _remove_index_fast, which copies the last
823      * element of the array to the freed index */
824     g_array_remove_index_fast (set->fds, idx);
825
826     /* mark fd as removed by setting the index to -1 */
827     fd->idx = -1;
828     MARK_REBUILD (set);
829   } else {
830     GST_WARNING ("%p: couldn't find fd !", set);
831   }
832
833   g_mutex_unlock (&set->lock);
834
835   return idx >= 0;
836 }
837
838 /**
839  * gst_poll_fd_ctl_write:
840  * @set: a file descriptor set.
841  * @fd: a file descriptor.
842  * @active: a new status.
843  *
844  * Control whether the descriptor @fd in @set will be monitored for
845  * writability.
846  *
847  * Returns: %TRUE if the descriptor was successfully updated.
848  */
849 gboolean
850 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
851 {
852   gint idx;
853
854   g_return_val_if_fail (set != NULL, FALSE);
855   g_return_val_if_fail (fd != NULL, FALSE);
856   g_return_val_if_fail (fd->fd >= 0, FALSE);
857
858   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
859       fd->fd, fd->idx, active);
860
861   g_mutex_lock (&set->lock);
862
863   idx = find_index (set->fds, fd);
864   if (idx >= 0) {
865 #ifndef G_OS_WIN32
866     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
867
868     if (active)
869       pfd->events |= POLLOUT;
870     else
871       pfd->events &= ~POLLOUT;
872
873     GST_LOG ("pfd->events now %d (POLLOUT:%d)", pfd->events, POLLOUT);
874 #else
875     gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
876         active);
877 #endif
878     MARK_REBUILD (set);
879   } else {
880     GST_WARNING ("%p: couldn't find fd !", set);
881   }
882
883   g_mutex_unlock (&set->lock);
884
885   return idx >= 0;
886 }
887
888 static gboolean
889 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
890 {
891   gint idx;
892
893   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
894       fd->fd, fd->idx, active);
895
896   idx = find_index (set->fds, fd);
897
898   if (idx >= 0) {
899 #ifndef G_OS_WIN32
900     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
901
902     if (active)
903       pfd->events |= (POLLIN | POLLPRI);
904     else
905       pfd->events &= ~(POLLIN | POLLPRI);
906 #else
907     gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
908 #endif
909     MARK_REBUILD (set);
910   } else {
911     GST_WARNING ("%p: couldn't find fd !", set);
912   }
913
914   return idx >= 0;
915 }
916
917 /**
918  * gst_poll_fd_ctl_read:
919  * @set: a file descriptor set.
920  * @fd: a file descriptor.
921  * @active: a new status.
922  *
923  * Control whether the descriptor @fd in @set will be monitored for
924  * readability.
925  *
926  * Returns: %TRUE if the descriptor was successfully updated.
927  */
928 gboolean
929 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
930 {
931   gboolean ret;
932
933   g_return_val_if_fail (set != NULL, FALSE);
934   g_return_val_if_fail (fd != NULL, FALSE);
935   g_return_val_if_fail (fd->fd >= 0, FALSE);
936
937   g_mutex_lock (&set->lock);
938
939   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
940
941   g_mutex_unlock (&set->lock);
942
943   return ret;
944 }
945
946 /**
947  * gst_poll_fd_ignored:
948  * @set: a file descriptor set.
949  * @fd: a file descriptor.
950  *
951  * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
952  * the same result for @fd as last time. This function must be called if no
953  * operation (read/write/recv/send/etc.) will be performed on @fd before
954  * the next call to gst_poll_wait().
955  *
956  * The reason why this is needed is because the underlying implementation
957  * might not allow querying the fd more than once between calls to one of
958  * the re-enabling operations.
959  */
960 void
961 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
962 {
963 #ifdef G_OS_WIN32
964   gint idx;
965
966   g_return_if_fail (set != NULL);
967   g_return_if_fail (fd != NULL);
968   g_return_if_fail (fd->fd >= 0);
969
970   g_mutex_lock (&set->lock);
971
972   idx = find_index (set->fds, fd);
973   if (idx >= 0) {
974     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
975
976     wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
977     MARK_REBUILD (set);
978   }
979
980   g_mutex_unlock (&set->lock);
981 #endif
982 }
983
984 /**
985  * gst_poll_fd_has_closed:
986  * @set: a file descriptor set.
987  * @fd: a file descriptor.
988  *
989  * Check if @fd in @set has closed the connection.
990  *
991  * Returns: %TRUE if the connection was closed.
992  */
993 gboolean
994 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
995 {
996   gboolean res = FALSE;
997   gint idx;
998
999   g_return_val_if_fail (set != NULL, FALSE);
1000   g_return_val_if_fail (fd != NULL, FALSE);
1001   g_return_val_if_fail (fd->fd >= 0, FALSE);
1002
1003   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1004
1005   g_mutex_lock (&((GstPoll *) set)->lock);
1006
1007   idx = find_index (set->active_fds, fd);
1008   if (idx >= 0) {
1009 #ifndef G_OS_WIN32
1010     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1011
1012     res = (pfd->revents & POLLHUP) != 0;
1013 #else
1014     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1015
1016     res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1017 #endif
1018   } else {
1019     GST_WARNING ("%p: couldn't find fd !", set);
1020   }
1021
1022   g_mutex_unlock (&((GstPoll *) set)->lock);
1023
1024   return res;
1025 }
1026
1027 /**
1028  * gst_poll_fd_has_error:
1029  * @set: a file descriptor set.
1030  * @fd: a file descriptor.
1031  *
1032  * Check if @fd in @set has an error.
1033  *
1034  * Returns: %TRUE if the descriptor has an error.
1035  */
1036 gboolean
1037 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1038 {
1039   gboolean res = FALSE;
1040   gint idx;
1041
1042   g_return_val_if_fail (set != NULL, FALSE);
1043   g_return_val_if_fail (fd != NULL, FALSE);
1044   g_return_val_if_fail (fd->fd >= 0, FALSE);
1045
1046   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1047
1048   g_mutex_lock (&((GstPoll *) set)->lock);
1049
1050   idx = find_index (set->active_fds, fd);
1051   if (idx >= 0) {
1052 #ifndef G_OS_WIN32
1053     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1054
1055     res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1056 #else
1057     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1058
1059     res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1060         (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1061         (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1062         (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1063         (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1064 #endif
1065   } else {
1066     GST_WARNING ("%p: couldn't find fd !", set);
1067   }
1068
1069   g_mutex_unlock (&((GstPoll *) set)->lock);
1070
1071   return res;
1072 }
1073
1074 static gboolean
1075 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1076 {
1077   gboolean res = FALSE;
1078   gint idx;
1079
1080   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1081
1082   idx = find_index (set->active_fds, fd);
1083   if (idx >= 0) {
1084 #ifndef G_OS_WIN32
1085     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1086
1087     res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1088 #else
1089     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1090
1091     res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1092 #endif
1093   } else {
1094     GST_WARNING ("%p: couldn't find fd !", set);
1095   }
1096
1097   return res;
1098 }
1099
1100 /**
1101  * gst_poll_fd_can_read:
1102  * @set: a file descriptor set.
1103  * @fd: a file descriptor.
1104  *
1105  * Check if @fd in @set has data to be read.
1106  *
1107  * Returns: %TRUE if the descriptor has data to be read.
1108  */
1109 gboolean
1110 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1111 {
1112   gboolean res = FALSE;
1113
1114   g_return_val_if_fail (set != NULL, FALSE);
1115   g_return_val_if_fail (fd != NULL, FALSE);
1116   g_return_val_if_fail (fd->fd >= 0, FALSE);
1117
1118   g_mutex_lock (&((GstPoll *) set)->lock);
1119
1120   res = gst_poll_fd_can_read_unlocked (set, fd);
1121
1122   g_mutex_unlock (&((GstPoll *) set)->lock);
1123
1124   return res;
1125 }
1126
1127 /**
1128  * gst_poll_fd_can_write:
1129  * @set: a file descriptor set.
1130  * @fd: a file descriptor.
1131  *
1132  * Check if @fd in @set can be used for writing.
1133  *
1134  * Returns: %TRUE if the descriptor can be used for writing.
1135  */
1136 gboolean
1137 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1138 {
1139   gboolean res = FALSE;
1140   gint idx;
1141
1142   g_return_val_if_fail (set != NULL, FALSE);
1143   g_return_val_if_fail (fd != NULL, FALSE);
1144   g_return_val_if_fail (fd->fd >= 0, FALSE);
1145
1146   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1147
1148   g_mutex_lock (&((GstPoll *) set)->lock);
1149
1150   idx = find_index (set->active_fds, fd);
1151   if (idx >= 0) {
1152 #ifndef G_OS_WIN32
1153     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1154
1155     res = (pfd->revents & POLLOUT) != 0;
1156 #else
1157     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1158
1159     res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1160 #endif
1161   } else {
1162     GST_WARNING ("%p: couldn't find fd !", set);
1163   }
1164
1165   g_mutex_unlock (&((GstPoll *) set)->lock);
1166
1167   return res;
1168 }
1169
1170 /**
1171  * gst_poll_wait:
1172  * @set: a #GstPoll.
1173  * @timeout: a timeout in nanoseconds.
1174  *
1175  * Wait for activity on the file descriptors in @set. This function waits up to
1176  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
1177  *
1178  * For #GstPoll objects created with gst_poll_new(), this function can only be
1179  * called from a single thread at a time.  If called from multiple threads,
1180  * -1 will be returned with errno set to EPERM.
1181  *
1182  * This is not true for timer #GstPoll objects created with
1183  * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1184  * simultaneously.
1185  *
1186  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1187  * activity was detected after @timeout. If an error occurs, -1 is returned
1188  * and errno is set.
1189  */
1190 gint
1191 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1192 {
1193   gboolean restarting;
1194   gboolean is_timer;
1195   int res;
1196   gint old_waiting;
1197
1198   g_return_val_if_fail (set != NULL, -1);
1199
1200   GST_DEBUG ("timeout :%" GST_TIME_FORMAT, GST_TIME_ARGS (timeout));
1201
1202   is_timer = set->timer;
1203
1204   /* add one more waiter */
1205   old_waiting = INC_WAITING (set);
1206
1207   /* we cannot wait from multiple threads unless we are a timer */
1208   if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1209     goto already_waiting;
1210
1211   /* flushing, exit immediately */
1212   if (G_UNLIKELY (IS_FLUSHING (set)))
1213     goto flushing;
1214
1215   do {
1216     GstPollMode mode;
1217
1218     res = -1;
1219     restarting = FALSE;
1220
1221     mode = choose_mode (set, timeout);
1222
1223     if (TEST_REBUILD (set)) {
1224       g_mutex_lock (&set->lock);
1225 #ifndef G_OS_WIN32
1226       g_array_set_size (set->active_fds, set->fds->len);
1227       memcpy (set->active_fds->data, set->fds->data,
1228           set->fds->len * sizeof (struct pollfd));
1229 #else
1230       if (!gst_poll_prepare_winsock_active_sets (set))
1231         goto winsock_error;
1232 #endif
1233       g_mutex_unlock (&set->lock);
1234     }
1235
1236     switch (mode) {
1237       case GST_POLL_MODE_AUTO:
1238         g_assert_not_reached ();
1239         break;
1240       case GST_POLL_MODE_PPOLL:
1241       {
1242 #ifdef HAVE_PPOLL
1243         struct timespec ts;
1244         struct timespec *tsptr;
1245
1246         if (timeout != GST_CLOCK_TIME_NONE) {
1247           GST_TIME_TO_TIMESPEC (timeout, ts);
1248           tsptr = &ts;
1249         } else {
1250           tsptr = NULL;
1251         }
1252
1253         res =
1254             ppoll ((struct pollfd *) set->active_fds->data,
1255             set->active_fds->len, tsptr, NULL);
1256 #else
1257         g_assert_not_reached ();
1258         errno = ENOSYS;
1259 #endif
1260         break;
1261       }
1262       case GST_POLL_MODE_POLL:
1263       {
1264 #ifdef HAVE_POLL
1265         gint t;
1266
1267         if (timeout != GST_CLOCK_TIME_NONE) {
1268           t = GST_TIME_AS_MSECONDS (timeout);
1269         } else {
1270           t = -1;
1271         }
1272
1273         res =
1274             poll ((struct pollfd *) set->active_fds->data,
1275             set->active_fds->len, t);
1276 #else
1277         g_assert_not_reached ();
1278         errno = ENOSYS;
1279 #endif
1280         break;
1281       }
1282       case GST_POLL_MODE_PSELECT:
1283 #ifndef HAVE_PSELECT
1284       {
1285         g_assert_not_reached ();
1286         errno = ENOSYS;
1287         break;
1288       }
1289 #endif
1290       case GST_POLL_MODE_SELECT:
1291       {
1292 #ifndef G_OS_WIN32
1293         fd_set readfds;
1294         fd_set writefds;
1295         fd_set errorfds;
1296         gint max_fd;
1297
1298         max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1299
1300         if (mode == GST_POLL_MODE_SELECT) {
1301           struct timeval tv;
1302           struct timeval *tvptr;
1303
1304           if (timeout != GST_CLOCK_TIME_NONE) {
1305             GST_TIME_TO_TIMEVAL (timeout, tv);
1306             tvptr = &tv;
1307           } else {
1308             tvptr = NULL;
1309           }
1310
1311           GST_DEBUG ("Calling select");
1312           res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1313           GST_DEBUG ("After select, res:%d", res);
1314         } else {
1315 #ifdef HAVE_PSELECT
1316           struct timespec ts;
1317           struct timespec *tsptr;
1318
1319           if (timeout != GST_CLOCK_TIME_NONE) {
1320             GST_TIME_TO_TIMESPEC (timeout, ts);
1321             tsptr = &ts;
1322           } else {
1323             tsptr = NULL;
1324           }
1325
1326           GST_DEBUG ("Calling pselect");
1327           res =
1328               pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1329           GST_DEBUG ("After pselect, res:%d", res);
1330 #endif
1331         }
1332
1333         if (res >= 0) {
1334           fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1335         }
1336 #else /* G_OS_WIN32 */
1337         g_assert_not_reached ();
1338         errno = ENOSYS;
1339 #endif
1340         break;
1341       }
1342       case GST_POLL_MODE_WINDOWS:
1343       {
1344 #ifdef G_OS_WIN32
1345         gint ignore_count = set->active_fds_ignored->len;
1346         DWORD t, wait_ret;
1347
1348         if (G_LIKELY (ignore_count == 0)) {
1349           if (timeout != GST_CLOCK_TIME_NONE)
1350             t = GST_TIME_AS_MSECONDS (timeout);
1351           else
1352             t = INFINITE;
1353         } else {
1354           /* already one or more ignored fds, so we quickly sweep the others */
1355           t = 0;
1356         }
1357
1358         if (set->active_events->len != 0) {
1359           wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1360               (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1361         } else {
1362           wait_ret = WSA_WAIT_FAILED;
1363           WSASetLastError (WSA_INVALID_PARAMETER);
1364         }
1365
1366         if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1367           res = 0;
1368         } else if (wait_ret == WSA_WAIT_FAILED) {
1369           res = -1;
1370           errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1371         } else {
1372           /* the first entry is the wakeup event */
1373           if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1374             res = gst_poll_collect_winsock_events (set);
1375           } else {
1376             res = 1;            /* wakeup event */
1377           }
1378         }
1379 #else
1380         g_assert_not_reached ();
1381         errno = ENOSYS;
1382 #endif
1383         break;
1384       }
1385     }
1386
1387     if (!is_timer) {
1388       /* Applications needs to clear the control socket themselves for timer
1389        * polls.
1390        * For other polls, we need to clear the control socket. If there was only
1391        * one socket with activity and it was the control socket, we need to
1392        * restart */
1393       if (release_all_wakeup (set) > 0 && res == 1)
1394         restarting = TRUE;
1395     }
1396
1397     /* we got woken up and we are flushing, we need to stop */
1398     if (G_UNLIKELY (IS_FLUSHING (set)))
1399       goto flushing;
1400
1401   } while (G_UNLIKELY (restarting));
1402
1403   DEC_WAITING (set);
1404
1405   return res;
1406
1407   /* ERRORS */
1408 already_waiting:
1409   {
1410     GST_LOG ("%p: we are already waiting", set);
1411     DEC_WAITING (set);
1412     errno = EPERM;
1413     return -1;
1414   }
1415 flushing:
1416   {
1417     GST_LOG ("%p: we are flushing", set);
1418     DEC_WAITING (set);
1419     errno = EBUSY;
1420     return -1;
1421   }
1422 #ifdef G_OS_WIN32
1423 winsock_error:
1424   {
1425     GST_LOG ("%p: winsock error", set);
1426     g_mutex_unlock (&set->lock);
1427     DEC_WAITING (set);
1428     return -1;
1429   }
1430 #endif
1431 }
1432
1433 /**
1434  * gst_poll_set_controllable:
1435  * @set: a #GstPoll.
1436  * @controllable: new controllable state.
1437  *
1438  * When @controllable is %TRUE, this function ensures that future calls to
1439  * gst_poll_wait() will be affected by gst_poll_restart() and
1440  * gst_poll_set_flushing().
1441  *
1442  * Returns: %TRUE if the controllability of @set could be updated.
1443  */
1444 gboolean
1445 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1446 {
1447   g_return_val_if_fail (set != NULL, FALSE);
1448
1449   GST_LOG ("%p: controllable : %d", set, controllable);
1450
1451   set->controllable = controllable;
1452
1453   return TRUE;
1454 }
1455
1456 /**
1457  * gst_poll_restart:
1458  * @set: a #GstPoll.
1459  *
1460  * Restart any gst_poll_wait() that is in progress. This function is typically
1461  * used after adding or removing descriptors to @set.
1462  *
1463  * If @set is not controllable, then this call will have no effect.
1464  */
1465 void
1466 gst_poll_restart (GstPoll * set)
1467 {
1468   g_return_if_fail (set != NULL);
1469
1470   if (set->controllable && GET_WAITING (set) > 0) {
1471     /* we are controllable and waiting, wake up the waiter. The socket will be
1472      * cleared by the _wait() thread and the poll will be restarted */
1473     raise_wakeup (set);
1474   }
1475 }
1476
1477 /**
1478  * gst_poll_set_flushing:
1479  * @set: a #GstPoll.
1480  * @flushing: new flushing state.
1481  *
1482  * When @flushing is %TRUE, this function ensures that current and future calls
1483  * to gst_poll_wait() will return -1, with errno set to EBUSY.
1484  *
1485  * Unsetting the flushing state will restore normal operation of @set.
1486  */
1487 void
1488 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1489 {
1490   g_return_if_fail (set != NULL);
1491
1492   GST_LOG ("%p: flushing: %d", set, flushing);
1493
1494   /* update the new state first */
1495   SET_FLUSHING (set, flushing);
1496
1497   if (flushing && set->controllable && GET_WAITING (set) > 0) {
1498     /* we are flushing, controllable and waiting, wake up the waiter. When we
1499      * stop the flushing operation we don't clear the wakeup fd here, this will
1500      * happen in the _wait() thread. */
1501     raise_wakeup (set);
1502   }
1503 }
1504
1505 /**
1506  * gst_poll_write_control:
1507  * @set: a #GstPoll.
1508  *
1509  * Write a byte to the control socket of the controllable @set.
1510  * This function is mostly useful for timer #GstPoll objects created with
1511  * gst_poll_new_timer(). 
1512  *
1513  * It will make any current and future gst_poll_wait() function return with
1514  * 1, meaning the control socket is set. After an equal amount of calls to
1515  * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1516  * block again until their timeout expired.
1517  *
1518  * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
1519  * byte could not be written.
1520  */
1521 gboolean
1522 gst_poll_write_control (GstPoll * set)
1523 {
1524   gboolean res;
1525
1526   g_return_val_if_fail (set != NULL, FALSE);
1527   g_return_val_if_fail (set->timer, FALSE);
1528
1529   res = raise_wakeup (set);
1530
1531   return res;
1532 }
1533
1534 /**
1535  * gst_poll_read_control:
1536  * @set: a #GstPoll.
1537  *
1538  * Read a byte from the control socket of the controllable @set.
1539  * This function is mostly useful for timer #GstPoll objects created with
1540  * gst_poll_new_timer(). 
1541  *
1542  * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
1543  * was no byte to read.
1544  */
1545 gboolean
1546 gst_poll_read_control (GstPoll * set)
1547 {
1548   gboolean res;
1549
1550   g_return_val_if_fail (set != NULL, FALSE);
1551   g_return_val_if_fail (set->timer, FALSE);
1552
1553   res = release_wakeup (set);
1554
1555   return res;
1556 }