2.0 beta init
[framework/multimedia/gstreamer0.10.git] / gst / gstpoll.c
1 /* GStreamer
2  * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4  * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5  * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6  *
7  * gstpoll.c: File descriptor set
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22  * Boston, MA 02111-1307, USA.
23  */
24 /**
25  * SECTION:gstpoll
26  * @short_description: Keep track of file descriptors and make it possible
27  *                     to wait on them in a cancelable way
28  *
29  * A #GstPoll keeps track of file descriptors much like fd_set (used with
30  * select()) or a struct pollfd array (used with poll()). Once created with
31  * gst_poll_new(), the set can be used to wait for file descriptors to be
32  * readable and/or writeable. It is possible to make this wait be controlled
33  * by specifying %TRUE for the @controllable flag when creating the set (or
34  * later calling gst_poll_set_controllable()).
35  *
36  * New file descriptors are added to the set using gst_poll_add_fd(), and
37  * removed using gst_poll_remove_fd(). Controlling which file descriptors
38  * should be waited for to become readable and/or writeable are done using
39  * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
40  *
41  * Use gst_poll_wait() to wait for the file descriptors to actually become
42  * readable and/or writeable, or to timeout if no file descriptor is available
43  * in time. The wait can be controlled by calling gst_poll_restart() and
44  * gst_poll_set_flushing().
45  *
46  * Once the file descriptor set has been waited for, one can use
47  * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48  * gst_poll_fd_has_error() to see if it has generated an error,
49  * gst_poll_fd_can_read() to see if it is possible to read from the file
50  * descriptor, and gst_poll_fd_can_write() to see if it is possible to
51  * write to it.
52  *
53  */
54
55 #ifdef HAVE_CONFIG_H
56 #include "config.h"
57 #endif
58
59 #include "gst_private.h"
60 #include "glib-compat-private.h"
61
62 #include <sys/types.h>
63
64 #ifdef HAVE_UNISTD_H
65 #include <unistd.h>
66 #endif
67
68 #include <errno.h>
69 #include <fcntl.h>
70
71 #include <glib.h>
72
73 #ifdef G_OS_WIN32
74 #include <winsock2.h>
75 #define EINPROGRESS WSAEINPROGRESS
76 #else
77 #define _GNU_SOURCE 1
78 #ifdef HAVE_SYS_POLL_H
79 #include <sys/poll.h>
80 #endif
81 #ifdef HAVE_POLL_H
82 #include <poll.h>
83 #endif
84 #include <sys/time.h>
85 #include <sys/socket.h>
86 #endif
87
88 /* OS/X needs this because of bad headers */
89 #include <string.h>
90
91 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
92  * so we prefer our own poll emulation.
93  */
94 #if defined(BROKEN_POLL)
95 #undef HAVE_POLL
96 #endif
97
98 #include "gstpoll.h"
99
100 #define GST_CAT_DEFAULT GST_CAT_POLL
101
102 #ifdef G_OS_WIN32
103 typedef struct _WinsockFd WinsockFd;
104
105 struct _WinsockFd
106 {
107   gint fd;
108   glong event_mask;
109   WSANETWORKEVENTS events;
110   glong ignored_event_mask;
111 };
112 #endif
113
114 typedef enum
115 {
116   GST_POLL_MODE_AUTO,
117   GST_POLL_MODE_SELECT,
118   GST_POLL_MODE_PSELECT,
119   GST_POLL_MODE_POLL,
120   GST_POLL_MODE_PPOLL,
121   GST_POLL_MODE_WINDOWS
122 } GstPollMode;
123
124 struct _GstPoll
125 {
126   GstPollMode mode;
127
128   GMutex *lock;
129   /* array of fds, always written to and read from with lock */
130   GArray *fds;
131   /* array of active fds, only written to from the waiting thread with the
132    * lock and read from with the lock or without the lock from the waiting
133    * thread */
134   GArray *active_fds;
135
136 #ifndef G_OS_WIN32
137   gchar buf[1];
138   GstPollFD control_read_fd;
139   GstPollFD control_write_fd;
140 #else
141   GArray *active_fds_ignored;
142   GArray *events;
143   GArray *active_events;
144
145   HANDLE wakeup_event;
146 #endif
147
148   gboolean controllable;
149   volatile gint waiting;
150   volatile gint control_pending;
151   volatile gint flushing;
152   gboolean timer;
153   volatile gint rebuild;
154 };
155
156 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
157     gboolean active);
158 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
159
160 #define IS_FLUSHING(s)      (g_atomic_int_get(&(s)->flushing))
161 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
162
163 #define INC_WAITING(s)      (G_ATOMIC_INT_ADD(&(s)->waiting, 1))
164 #define DEC_WAITING(s)      (G_ATOMIC_INT_ADD(&(s)->waiting, -1))
165 #define GET_WAITING(s)      (g_atomic_int_get(&(s)->waiting))
166
167 #define TEST_REBUILD(s)     (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
168 #define MARK_REBUILD(s)     (g_atomic_int_set(&(s)->rebuild, 1))
169
170 #ifndef G_OS_WIN32
171 #define WAKE_EVENT(s)       (write ((s)->control_write_fd.fd, "W", 1) == 1)
172 #define RELEASE_EVENT(s)    (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1)
173 #else
174 #define WAKE_EVENT(s)       (SetEvent ((s)->wakeup_event), errno = GetLastError () == NO_ERROR ? 0 : EACCES, errno == 0 ? 1 : 0)
175 #define RELEASE_EVENT(s)    (ResetEvent ((s)->wakeup_event))
176 #endif
177
178 /* the poll/select call is also performed on a control socket, that way
179  * we can send special commands to control it */
180 static inline gboolean
181 raise_wakeup (GstPoll * set)
182 {
183   gboolean result = TRUE;
184
185   if (G_ATOMIC_INT_ADD (&set->control_pending, 1) == 0) {
186     /* raise when nothing pending */
187     result = WAKE_EVENT (set);
188   }
189   return result;
190 }
191
192 /* note how bad things can happen when the 2 threads both raise and release the
193  * wakeup. This is however not a problem because you must always pair a raise
194  * with a release */
195 static inline gboolean
196 release_wakeup (GstPoll * set)
197 {
198   gboolean result = TRUE;
199
200   if (g_atomic_int_dec_and_test (&set->control_pending)) {
201     result = RELEASE_EVENT (set);
202   }
203   return result;
204 }
205
206 static inline gint
207 release_all_wakeup (GstPoll * set)
208 {
209   gint old;
210
211   while (TRUE) {
212     if (!(old = g_atomic_int_get (&set->control_pending)))
213       /* nothing pending, just exit */
214       break;
215
216     /* try to remove all pending control messages */
217     if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
218       /* we managed to remove all messages, read the control socket */
219       if (RELEASE_EVENT (set))
220         break;
221       else
222         /* retry again until we read it successfully */
223         G_ATOMIC_INT_ADD (&set->control_pending, 1);
224     }
225   }
226   return old;
227 }
228
229 static gint
230 find_index (GArray * array, GstPollFD * fd)
231 {
232 #ifndef G_OS_WIN32
233   struct pollfd *ifd;
234 #else
235   WinsockFd *ifd;
236 #endif
237   guint i;
238
239   /* start by assuming the index found in the fd is still valid */
240   if (fd->idx >= 0 && fd->idx < array->len) {
241 #ifndef G_OS_WIN32
242     ifd = &g_array_index (array, struct pollfd, fd->idx);
243 #else
244     ifd = &g_array_index (array, WinsockFd, fd->idx);
245 #endif
246
247     if (ifd->fd == fd->fd) {
248       return fd->idx;
249     }
250   }
251
252   /* the pollfd array has changed and we need to lookup the fd again */
253   for (i = 0; i < array->len; i++) {
254 #ifndef G_OS_WIN32
255     ifd = &g_array_index (array, struct pollfd, i);
256 #else
257     ifd = &g_array_index (array, WinsockFd, i);
258 #endif
259
260     if (ifd->fd == fd->fd) {
261       fd->idx = (gint) i;
262       return fd->idx;
263     }
264   }
265
266   fd->idx = -1;
267   return fd->idx;
268 }
269
270 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
271 /* check if all file descriptors will fit in an fd_set */
272 static gboolean
273 selectable_fds (const GstPoll * set)
274 {
275   guint i;
276
277   g_mutex_lock (set->lock);
278   for (i = 0; i < set->fds->len; i++) {
279     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
280
281     if (pfd->fd >= FD_SETSIZE)
282       goto too_many;
283   }
284   g_mutex_unlock (set->lock);
285
286   return TRUE;
287
288 too_many:
289   {
290     g_mutex_unlock (set->lock);
291     return FALSE;
292   }
293 }
294
295 /* check if the timeout will convert to a timeout value used for poll()
296  * without a loss of precision
297  */
298 static gboolean
299 pollable_timeout (GstClockTime timeout)
300 {
301   if (timeout == GST_CLOCK_TIME_NONE)
302     return TRUE;
303
304   /* not a nice multiple of milliseconds */
305   if (timeout % 1000000)
306     return FALSE;
307
308   return TRUE;
309 }
310 #endif
311
312 static GstPollMode
313 choose_mode (const GstPoll * set, GstClockTime timeout)
314 {
315   GstPollMode mode;
316
317   if (set->mode == GST_POLL_MODE_AUTO) {
318 #ifdef HAVE_PPOLL
319     mode = GST_POLL_MODE_PPOLL;
320 #elif defined(HAVE_POLL)
321     if (!selectable_fds (set) || pollable_timeout (timeout)) {
322       mode = GST_POLL_MODE_POLL;
323     } else {
324 #ifdef HAVE_PSELECT
325       mode = GST_POLL_MODE_PSELECT;
326 #else
327       mode = GST_POLL_MODE_SELECT;
328 #endif
329     }
330 #elif defined(HAVE_PSELECT)
331     mode = GST_POLL_MODE_PSELECT;
332 #else
333     mode = GST_POLL_MODE_SELECT;
334 #endif
335   } else {
336     mode = set->mode;
337   }
338   return mode;
339 }
340
341 #ifndef G_OS_WIN32
342 static gint
343 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
344     fd_set * errorfds)
345 {
346   gint max_fd = -1;
347   guint i;
348
349   FD_ZERO (readfds);
350   FD_ZERO (writefds);
351   FD_ZERO (errorfds);
352
353   g_mutex_lock (set->lock);
354
355   for (i = 0; i < set->active_fds->len; i++) {
356     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
357
358     if (pfd->fd < FD_SETSIZE) {
359       if (pfd->events & POLLIN)
360         FD_SET (pfd->fd, readfds);
361       if (pfd->events & POLLOUT)
362         FD_SET (pfd->fd, writefds);
363       if (pfd->events)
364         FD_SET (pfd->fd, errorfds);
365       if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
366         max_fd = pfd->fd;
367     }
368   }
369
370   g_mutex_unlock (set->lock);
371
372   return max_fd;
373 }
374
375 static void
376 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
377     fd_set * errorfds)
378 {
379   guint i;
380
381   g_mutex_lock (set->lock);
382
383   for (i = 0; i < set->active_fds->len; i++) {
384     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
385
386     if (pfd->fd < FD_SETSIZE) {
387       pfd->revents = 0;
388       if (FD_ISSET (pfd->fd, readfds))
389         pfd->revents |= POLLIN;
390       if (FD_ISSET (pfd->fd, writefds))
391         pfd->revents |= POLLOUT;
392       if (FD_ISSET (pfd->fd, errorfds))
393         pfd->revents |= POLLERR;
394     }
395   }
396
397   g_mutex_unlock (set->lock);
398 }
399 #else /* G_OS_WIN32 */
400 /*
401  * Translate errors thrown by the Winsock API used by GstPoll:
402  *   WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
403  */
404 static gint
405 gst_poll_winsock_error_to_errno (DWORD last_error)
406 {
407   switch (last_error) {
408     case WSA_INVALID_HANDLE:
409     case WSAEINVAL:
410     case WSAENOTSOCK:
411       return EBADF;
412
413     case WSA_NOT_ENOUGH_MEMORY:
414       return ENOMEM;
415
416       /*
417        * Anything else, including:
418        *   WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
419        *   WSANOTINITIALISED
420        */
421     default:
422       return EINVAL;
423   }
424 }
425
426 static void
427 gst_poll_free_winsock_event (GstPoll * set, gint idx)
428 {
429   WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
430   HANDLE event = g_array_index (set->events, HANDLE, idx);
431
432   WSAEventSelect (wfd->fd, event, 0);
433   CloseHandle (event);
434 }
435
436 static void
437 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
438     gboolean active)
439 {
440   WinsockFd *wfd;
441
442   wfd = &g_array_index (set->fds, WinsockFd, idx);
443
444   if (active)
445     wfd->event_mask |= flags;
446   else
447     wfd->event_mask &= ~flags;
448
449   /* reset ignored state if the new mask doesn't overlap at all */
450   if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
451     wfd->ignored_event_mask = 0;
452 }
453
454 static gboolean
455 gst_poll_prepare_winsock_active_sets (GstPoll * set)
456 {
457   guint i;
458
459   g_array_set_size (set->active_fds, 0);
460   g_array_set_size (set->active_fds_ignored, 0);
461   g_array_set_size (set->active_events, 0);
462   g_array_append_val (set->active_events, set->wakeup_event);
463
464   for (i = 0; i < set->fds->len; i++) {
465     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
466     HANDLE event = g_array_index (set->events, HANDLE, i);
467
468     if (wfd->ignored_event_mask == 0) {
469       gint ret;
470
471       g_array_append_val (set->active_fds, *wfd);
472       g_array_append_val (set->active_events, event);
473
474       ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
475       if (G_UNLIKELY (ret != 0)) {
476         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
477         return FALSE;
478       }
479     } else {
480       g_array_append_val (set->active_fds_ignored, wfd);
481     }
482   }
483
484   return TRUE;
485 }
486
487 static gint
488 gst_poll_collect_winsock_events (GstPoll * set)
489 {
490   gint res, i;
491
492   /*
493    * We need to check which events are signaled, and call
494    * WSAEnumNetworkEvents for those that are, which resets
495    * the event and clears the internal network event records.
496    */
497   res = 0;
498   for (i = 0; i < set->active_fds->len; i++) {
499     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
500     HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
501     DWORD wait_ret;
502
503     wait_ret = WaitForSingleObject (event, 0);
504     if (wait_ret == WAIT_OBJECT_0) {
505       gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
506
507       if (G_UNLIKELY (enum_ret != 0)) {
508         res = -1;
509         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
510         break;
511       }
512
513       res++;
514     } else {
515       /* clear any previously stored result */
516       memset (&wfd->events, 0, sizeof (wfd->events));
517     }
518   }
519
520   /* If all went well we also need to reset the ignored fds. */
521   if (res >= 0) {
522     res += set->active_fds_ignored->len;
523
524     for (i = 0; i < set->active_fds_ignored->len; i++) {
525       WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
526
527       wfd->ignored_event_mask = 0;
528     }
529
530     g_array_set_size (set->active_fds_ignored, 0);
531   }
532
533   return res;
534 }
535 #endif
536
537 /**
538  * gst_poll_new:
539  * @controllable: whether it should be possible to control a wait.
540  *
541  * Create a new file descriptor set. If @controllable, it
542  * is possible to restart or flush a call to gst_poll_wait() with
543  * gst_poll_restart() and gst_poll_set_flushing() respectively.
544  *
545  * Free-function: gst_poll_free
546  *
547  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
548  *     Free with gst_poll_free().
549  *
550  * Since: 0.10.18
551  */
552 GstPoll *
553 gst_poll_new (gboolean controllable)
554 {
555   GstPoll *nset;
556
557   GST_DEBUG ("controllable : %d", controllable);
558
559   nset = g_slice_new0 (GstPoll);
560   nset->lock = g_mutex_new ();
561 #ifndef G_OS_WIN32
562   nset->mode = GST_POLL_MODE_AUTO;
563   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
564   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
565   nset->control_read_fd.fd = -1;
566   nset->control_write_fd.fd = -1;
567   {
568     gint control_sock[2];
569
570     if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
571       goto no_socket_pair;
572
573     fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
574     fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
575
576     nset->control_read_fd.fd = control_sock[0];
577     nset->control_write_fd.fd = control_sock[1];
578
579     gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
580     gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
581   }
582 #else
583   nset->mode = GST_POLL_MODE_WINDOWS;
584   nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
585   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
586   nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
587   nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
588   nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
589
590   nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
591 #endif
592
593   /* ensure (re)build, though already sneakily set in non-windows case */
594   MARK_REBUILD (nset);
595
596   nset->controllable = controllable;
597
598   return nset;
599
600   /* ERRORS */
601 #ifndef G_OS_WIN32
602 no_socket_pair:
603   {
604     GST_WARNING ("%p: can't create socket pair !", nset);
605     gst_poll_free (nset);
606     return NULL;
607   }
608 #endif
609 }
610
611 /**
612  * gst_poll_new_timer:
613  *
614  * Create a new poll object that can be used for scheduling cancellable
615  * timeouts.
616  *
617  * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
618  * performed from different threads. 
619  *
620  * Free-function: gst_poll_free
621  *
622  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
623  *     Free with gst_poll_free().
624  *
625  * Since: 0.10.23
626  */
627 GstPoll *
628 gst_poll_new_timer (void)
629 {
630   GstPoll *poll;
631
632   /* make a new controllable poll set */
633   if (!(poll = gst_poll_new (TRUE)))
634     goto done;
635
636   /* we are a timer */
637   poll->timer = TRUE;
638
639 done:
640   return poll;
641 }
642
643 /**
644  * gst_poll_free:
645  * @set: (transfer full): a file descriptor set.
646  *
647  * Free a file descriptor set.
648  *
649  * Since: 0.10.18
650  */
651 void
652 gst_poll_free (GstPoll * set)
653 {
654   g_return_if_fail (set != NULL);
655
656   GST_DEBUG ("%p: freeing", set);
657
658 #ifndef G_OS_WIN32
659   if (set->control_write_fd.fd >= 0)
660     close (set->control_write_fd.fd);
661   if (set->control_read_fd.fd >= 0)
662     close (set->control_read_fd.fd);
663 #else
664   CloseHandle (set->wakeup_event);
665
666   {
667     guint i;
668
669     for (i = 0; i < set->events->len; i++)
670       gst_poll_free_winsock_event (set, i);
671   }
672
673   g_array_free (set->active_events, TRUE);
674   g_array_free (set->events, TRUE);
675   g_array_free (set->active_fds_ignored, TRUE);
676 #endif
677
678   g_array_free (set->active_fds, TRUE);
679   g_array_free (set->fds, TRUE);
680   g_mutex_free (set->lock);
681   g_slice_free (GstPoll, set);
682 }
683
684 /**
685  * gst_poll_get_read_gpollfd:
686  * @set: a #GstPoll
687  * @fd: a #GPollFD
688  *
689  * Get a GPollFD for the reading part of the control socket. This is useful when
690  * integrating with a GSource and GMainLoop.
691  *
692  * Since: 0.10.32
693  */
694 void
695 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
696 {
697   g_return_if_fail (set != NULL);
698   g_return_if_fail (fd != NULL);
699
700 #ifndef G_OS_WIN32
701   fd->fd = set->control_read_fd.fd;
702 #else
703 #if GLIB_SIZEOF_VOID_P == 8
704   fd->fd = (gint64) set->wakeup_event;
705 #else
706   fd->fd = (gint) set->wakeup_event;
707 #endif
708 #endif
709   fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
710   fd->revents = 0;
711 }
712
713 /**
714  * gst_poll_fd_init:
715  * @fd: a #GstPollFD
716  *
717  * Initializes @fd. Alternatively you can initialize it with
718  * #GST_POLL_FD_INIT.
719  *
720  * Since: 0.10.18
721  */
722 void
723 gst_poll_fd_init (GstPollFD * fd)
724 {
725   g_return_if_fail (fd != NULL);
726
727   fd->fd = -1;
728   fd->idx = -1;
729 }
730
731 static gboolean
732 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
733 {
734   gint idx;
735
736   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
737
738   idx = find_index (set->fds, fd);
739   if (idx < 0) {
740 #ifndef G_OS_WIN32
741     struct pollfd nfd;
742
743     nfd.fd = fd->fd;
744     nfd.events = POLLERR | POLLNVAL | POLLHUP;
745     nfd.revents = 0;
746
747     g_array_append_val (set->fds, nfd);
748
749     fd->idx = set->fds->len - 1;
750 #else
751     WinsockFd wfd;
752     HANDLE event;
753
754     wfd.fd = fd->fd;
755     wfd.event_mask = FD_CLOSE;
756     memset (&wfd.events, 0, sizeof (wfd.events));
757     wfd.ignored_event_mask = 0;
758     event = WSACreateEvent ();
759
760     g_array_append_val (set->fds, wfd);
761     g_array_append_val (set->events, event);
762
763     fd->idx = set->fds->len - 1;
764 #endif
765     MARK_REBUILD (set);
766   } else {
767     GST_WARNING ("%p: couldn't find fd !", set);
768   }
769
770   return TRUE;
771 }
772
773 /**
774  * gst_poll_add_fd:
775  * @set: a file descriptor set.
776  * @fd: a file descriptor.
777  *
778  * Add a file descriptor to the file descriptor set.
779  *
780  * Returns: %TRUE if the file descriptor was successfully added to the set.
781  *
782  * Since: 0.10.18
783  */
784 gboolean
785 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
786 {
787   gboolean ret;
788
789   g_return_val_if_fail (set != NULL, FALSE);
790   g_return_val_if_fail (fd != NULL, FALSE);
791   g_return_val_if_fail (fd->fd >= 0, FALSE);
792
793   g_mutex_lock (set->lock);
794
795   ret = gst_poll_add_fd_unlocked (set, fd);
796
797   g_mutex_unlock (set->lock);
798
799   return ret;
800 }
801
802 /**
803  * gst_poll_remove_fd:
804  * @set: a file descriptor set.
805  * @fd: a file descriptor.
806  *
807  * Remove a file descriptor from the file descriptor set.
808  *
809  * Returns: %TRUE if the file descriptor was successfully removed from the set.
810  *
811  * Since: 0.10.18
812  */
813 gboolean
814 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
815 {
816   gint idx;
817
818   g_return_val_if_fail (set != NULL, FALSE);
819   g_return_val_if_fail (fd != NULL, FALSE);
820   g_return_val_if_fail (fd->fd >= 0, FALSE);
821
822
823   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
824
825   g_mutex_lock (set->lock);
826
827   /* get the index, -1 is an fd that is not added */
828   idx = find_index (set->fds, fd);
829   if (idx >= 0) {
830 #ifdef G_OS_WIN32
831     gst_poll_free_winsock_event (set, idx);
832     g_array_remove_index_fast (set->events, idx);
833 #endif
834
835     /* remove the fd at index, we use _remove_index_fast, which copies the last
836      * element of the array to the freed index */
837     g_array_remove_index_fast (set->fds, idx);
838
839     /* mark fd as removed by setting the index to -1 */
840     fd->idx = -1;
841     MARK_REBUILD (set);
842   } else {
843     GST_WARNING ("%p: couldn't find fd !", set);
844   }
845
846   g_mutex_unlock (set->lock);
847
848   return idx >= 0;
849 }
850
851 /**
852  * gst_poll_fd_ctl_write:
853  * @set: a file descriptor set.
854  * @fd: a file descriptor.
855  * @active: a new status.
856  *
857  * Control whether the descriptor @fd in @set will be monitored for
858  * writability.
859  *
860  * Returns: %TRUE if the descriptor was successfully updated.
861  *
862  * Since: 0.10.18
863  */
864 gboolean
865 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
866 {
867   gint idx;
868
869   g_return_val_if_fail (set != NULL, FALSE);
870   g_return_val_if_fail (fd != NULL, FALSE);
871   g_return_val_if_fail (fd->fd >= 0, FALSE);
872
873   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
874       fd->fd, fd->idx, active);
875
876   g_mutex_lock (set->lock);
877
878   idx = find_index (set->fds, fd);
879   if (idx >= 0) {
880 #ifndef G_OS_WIN32
881     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
882
883     if (active)
884       pfd->events |= POLLOUT;
885     else
886       pfd->events &= ~POLLOUT;
887
888     GST_LOG ("pfd->events now %d (POLLOUT:%d)", pfd->events, POLLOUT);
889 #else
890     gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
891         active);
892 #endif
893     MARK_REBUILD (set);
894   } else {
895     GST_WARNING ("%p: couldn't find fd !", set);
896   }
897
898   g_mutex_unlock (set->lock);
899
900   return idx >= 0;
901 }
902
903 static gboolean
904 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
905 {
906   gint idx;
907
908   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
909       fd->fd, fd->idx, active);
910
911   idx = find_index (set->fds, fd);
912
913   if (idx >= 0) {
914 #ifndef G_OS_WIN32
915     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
916
917     if (active)
918       pfd->events |= (POLLIN | POLLPRI);
919     else
920       pfd->events &= ~(POLLIN | POLLPRI);
921 #else
922     gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
923 #endif
924     MARK_REBUILD (set);
925   } else {
926     GST_WARNING ("%p: couldn't find fd !", set);
927   }
928
929   return idx >= 0;
930 }
931
932 /**
933  * gst_poll_fd_ctl_read:
934  * @set: a file descriptor set.
935  * @fd: a file descriptor.
936  * @active: a new status.
937  *
938  * Control whether the descriptor @fd in @set will be monitored for
939  * readability.
940  *
941  * Returns: %TRUE if the descriptor was successfully updated.
942  *
943  * Since: 0.10.18
944  */
945 gboolean
946 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
947 {
948   gboolean ret;
949
950   g_return_val_if_fail (set != NULL, FALSE);
951   g_return_val_if_fail (fd != NULL, FALSE);
952   g_return_val_if_fail (fd->fd >= 0, FALSE);
953
954   g_mutex_lock (set->lock);
955
956   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
957
958   g_mutex_unlock (set->lock);
959
960   return ret;
961 }
962
963 /**
964  * gst_poll_fd_ignored:
965  * @set: a file descriptor set.
966  * @fd: a file descriptor.
967  *
968  * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
969  * the same result for @fd as last time. This function must be called if no
970  * operation (read/write/recv/send/etc.) will be performed on @fd before
971  * the next call to gst_poll_wait().
972  *
973  * The reason why this is needed is because the underlying implementation
974  * might not allow querying the fd more than once between calls to one of
975  * the re-enabling operations.
976  *
977  * Since: 0.10.18
978  */
979 void
980 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
981 {
982 #ifdef G_OS_WIN32
983   gint idx;
984
985   g_return_if_fail (set != NULL);
986   g_return_if_fail (fd != NULL);
987   g_return_if_fail (fd->fd >= 0);
988
989   g_mutex_lock (set->lock);
990
991   idx = find_index (set->fds, fd);
992   if (idx >= 0) {
993     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
994
995     wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
996     MARK_REBUILD (set);
997   }
998
999   g_mutex_unlock (set->lock);
1000 #endif
1001 }
1002
1003 /**
1004  * gst_poll_fd_has_closed:
1005  * @set: a file descriptor set.
1006  * @fd: a file descriptor.
1007  *
1008  * Check if @fd in @set has closed the connection.
1009  *
1010  * Returns: %TRUE if the connection was closed.
1011  *
1012  * Since: 0.10.18
1013  */
1014 gboolean
1015 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
1016 {
1017   gboolean res = FALSE;
1018   gint idx;
1019
1020   g_return_val_if_fail (set != NULL, FALSE);
1021   g_return_val_if_fail (fd != NULL, FALSE);
1022   g_return_val_if_fail (fd->fd >= 0, FALSE);
1023
1024   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1025
1026   g_mutex_lock (set->lock);
1027
1028   idx = find_index (set->active_fds, fd);
1029   if (idx >= 0) {
1030 #ifndef G_OS_WIN32
1031     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1032
1033     res = (pfd->revents & POLLHUP) != 0;
1034 #else
1035     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1036
1037     res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1038 #endif
1039   } else {
1040     GST_WARNING ("%p: couldn't find fd !", set);
1041   }
1042
1043   g_mutex_unlock (set->lock);
1044
1045   return res;
1046 }
1047
1048 /**
1049  * gst_poll_fd_has_error:
1050  * @set: a file descriptor set.
1051  * @fd: a file descriptor.
1052  *
1053  * Check if @fd in @set has an error.
1054  *
1055  * Returns: %TRUE if the descriptor has an error.
1056  *
1057  * Since: 0.10.18
1058  */
1059 gboolean
1060 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1061 {
1062   gboolean res = FALSE;
1063   gint idx;
1064
1065   g_return_val_if_fail (set != NULL, FALSE);
1066   g_return_val_if_fail (fd != NULL, FALSE);
1067   g_return_val_if_fail (fd->fd >= 0, FALSE);
1068
1069   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1070
1071   g_mutex_lock (set->lock);
1072
1073   idx = find_index (set->active_fds, fd);
1074   if (idx >= 0) {
1075 #ifndef G_OS_WIN32
1076     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1077
1078     res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1079 #else
1080     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1081
1082     res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1083         (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1084         (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1085         (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1086         (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1087 #endif
1088   } else {
1089     GST_WARNING ("%p: couldn't find fd !", set);
1090   }
1091
1092   g_mutex_unlock (set->lock);
1093
1094   return res;
1095 }
1096
1097 static gboolean
1098 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1099 {
1100   gboolean res = FALSE;
1101   gint idx;
1102
1103   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1104
1105   idx = find_index (set->active_fds, fd);
1106   if (idx >= 0) {
1107 #ifndef G_OS_WIN32
1108     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1109
1110     res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1111 #else
1112     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1113
1114     res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1115 #endif
1116   } else {
1117     GST_WARNING ("%p: couldn't find fd !", set);
1118   }
1119
1120   return res;
1121 }
1122
1123 /**
1124  * gst_poll_fd_can_read:
1125  * @set: a file descriptor set.
1126  * @fd: a file descriptor.
1127  *
1128  * Check if @fd in @set has data to be read.
1129  *
1130  * Returns: %TRUE if the descriptor has data to be read.
1131  *
1132  * Since: 0.10.18
1133  */
1134 gboolean
1135 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1136 {
1137   gboolean res = FALSE;
1138
1139   g_return_val_if_fail (set != NULL, FALSE);
1140   g_return_val_if_fail (fd != NULL, FALSE);
1141   g_return_val_if_fail (fd->fd >= 0, FALSE);
1142
1143   g_mutex_lock (set->lock);
1144
1145   res = gst_poll_fd_can_read_unlocked (set, fd);
1146
1147   g_mutex_unlock (set->lock);
1148
1149   return res;
1150 }
1151
1152 /**
1153  * gst_poll_fd_can_write:
1154  * @set: a file descriptor set.
1155  * @fd: a file descriptor.
1156  *
1157  * Check if @fd in @set can be used for writing.
1158  *
1159  * Returns: %TRUE if the descriptor can be used for writing.
1160  *
1161  * Since: 0.10.18
1162  */
1163 gboolean
1164 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1165 {
1166   gboolean res = FALSE;
1167   gint idx;
1168
1169   g_return_val_if_fail (set != NULL, FALSE);
1170   g_return_val_if_fail (fd != NULL, FALSE);
1171   g_return_val_if_fail (fd->fd >= 0, FALSE);
1172
1173   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1174
1175   g_mutex_lock (set->lock);
1176
1177   idx = find_index (set->active_fds, fd);
1178   if (idx >= 0) {
1179 #ifndef G_OS_WIN32
1180     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1181
1182     res = (pfd->revents & POLLOUT) != 0;
1183 #else
1184     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1185
1186     res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1187 #endif
1188   } else {
1189     GST_WARNING ("%p: couldn't find fd !", set);
1190   }
1191
1192   g_mutex_unlock (set->lock);
1193
1194   return res;
1195 }
1196
1197 /**
1198  * gst_poll_wait:
1199  * @set: a #GstPoll.
1200  * @timeout: a timeout in nanoseconds.
1201  *
1202  * Wait for activity on the file descriptors in @set. This function waits up to
1203  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
1204  *
1205  * For #GstPoll objects created with gst_poll_new(), this function can only be
1206  * called from a single thread at a time.  If called from multiple threads,
1207  * -1 will be returned with errno set to EPERM.
1208  *
1209  * This is not true for timer #GstPoll objects created with
1210  * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1211  * simultaneously.
1212  *
1213  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1214  * activity was detected after @timeout. If an error occurs, -1 is returned
1215  * and errno is set.
1216  *
1217  * Since: 0.10.18
1218  */
1219 gint
1220 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1221 {
1222   gboolean restarting;
1223   gboolean is_timer;
1224   int res;
1225   gint old_waiting;
1226
1227   g_return_val_if_fail (set != NULL, -1);
1228
1229   GST_DEBUG ("timeout :%" GST_TIME_FORMAT, GST_TIME_ARGS (timeout));
1230
1231   is_timer = set->timer;
1232
1233   /* add one more waiter */
1234   old_waiting = INC_WAITING (set);
1235
1236   /* we cannot wait from multiple threads unless we are a timer */
1237   if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1238     goto already_waiting;
1239
1240   /* flushing, exit immediately */
1241   if (G_UNLIKELY (IS_FLUSHING (set)))
1242     goto flushing;
1243
1244   do {
1245     GstPollMode mode;
1246
1247     res = -1;
1248     restarting = FALSE;
1249
1250     mode = choose_mode (set, timeout);
1251
1252     if (TEST_REBUILD (set)) {
1253       g_mutex_lock (set->lock);
1254 #ifndef G_OS_WIN32
1255       g_array_set_size (set->active_fds, set->fds->len);
1256       memcpy (set->active_fds->data, set->fds->data,
1257           set->fds->len * sizeof (struct pollfd));
1258 #else
1259       if (!gst_poll_prepare_winsock_active_sets (set))
1260         goto winsock_error;
1261 #endif
1262       g_mutex_unlock (set->lock);
1263     }
1264
1265     switch (mode) {
1266       case GST_POLL_MODE_AUTO:
1267         g_assert_not_reached ();
1268         break;
1269       case GST_POLL_MODE_PPOLL:
1270       {
1271 #ifdef HAVE_PPOLL
1272         struct timespec ts;
1273         struct timespec *tsptr;
1274
1275         if (timeout != GST_CLOCK_TIME_NONE) {
1276           GST_TIME_TO_TIMESPEC (timeout, ts);
1277           tsptr = &ts;
1278         } else {
1279           tsptr = NULL;
1280         }
1281
1282         res =
1283             ppoll ((struct pollfd *) set->active_fds->data,
1284             set->active_fds->len, tsptr, NULL);
1285 #else
1286         g_assert_not_reached ();
1287         errno = ENOSYS;
1288 #endif
1289         break;
1290       }
1291       case GST_POLL_MODE_POLL:
1292       {
1293 #ifdef HAVE_POLL
1294         gint t;
1295
1296         if (timeout != GST_CLOCK_TIME_NONE) {
1297           t = GST_TIME_AS_MSECONDS (timeout);
1298         } else {
1299           t = -1;
1300         }
1301
1302         res =
1303             poll ((struct pollfd *) set->active_fds->data,
1304             set->active_fds->len, t);
1305 #else
1306         g_assert_not_reached ();
1307         errno = ENOSYS;
1308 #endif
1309         break;
1310       }
1311       case GST_POLL_MODE_PSELECT:
1312 #ifndef HAVE_PSELECT
1313       {
1314         g_assert_not_reached ();
1315         errno = ENOSYS;
1316         break;
1317       }
1318 #endif
1319       case GST_POLL_MODE_SELECT:
1320       {
1321 #ifndef G_OS_WIN32
1322         fd_set readfds;
1323         fd_set writefds;
1324         fd_set errorfds;
1325         gint max_fd;
1326
1327         max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1328
1329         if (mode == GST_POLL_MODE_SELECT) {
1330           struct timeval tv;
1331           struct timeval *tvptr;
1332
1333           if (timeout != GST_CLOCK_TIME_NONE) {
1334             GST_TIME_TO_TIMEVAL (timeout, tv);
1335             tvptr = &tv;
1336           } else {
1337             tvptr = NULL;
1338           }
1339
1340           GST_DEBUG ("Calling select");
1341           res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1342           GST_DEBUG ("After select, res:%d", res);
1343         } else {
1344 #ifdef HAVE_PSELECT
1345           struct timespec ts;
1346           struct timespec *tsptr;
1347
1348           if (timeout != GST_CLOCK_TIME_NONE) {
1349             GST_TIME_TO_TIMESPEC (timeout, ts);
1350             tsptr = &ts;
1351           } else {
1352             tsptr = NULL;
1353           }
1354
1355           GST_DEBUG ("Calling pselect");
1356           res =
1357               pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1358           GST_DEBUG ("After pselect, res:%d", res);
1359 #endif
1360         }
1361
1362         if (res >= 0) {
1363           fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1364         }
1365 #else /* G_OS_WIN32 */
1366         g_assert_not_reached ();
1367         errno = ENOSYS;
1368 #endif
1369         break;
1370       }
1371       case GST_POLL_MODE_WINDOWS:
1372       {
1373 #ifdef G_OS_WIN32
1374         gint ignore_count = set->active_fds_ignored->len;
1375         DWORD t, wait_ret;
1376
1377         if (G_LIKELY (ignore_count == 0)) {
1378           if (timeout != GST_CLOCK_TIME_NONE)
1379             t = GST_TIME_AS_MSECONDS (timeout);
1380           else
1381             t = INFINITE;
1382         } else {
1383           /* already one or more ignored fds, so we quickly sweep the others */
1384           t = 0;
1385         }
1386
1387         if (set->active_events->len != 0) {
1388           wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1389               (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1390         } else {
1391           wait_ret = WSA_WAIT_FAILED;
1392           WSASetLastError (WSA_INVALID_PARAMETER);
1393         }
1394
1395         if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1396           res = 0;
1397         } else if (wait_ret == WSA_WAIT_FAILED) {
1398           res = -1;
1399           errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1400         } else {
1401           /* the first entry is the wakeup event */
1402           if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1403             res = gst_poll_collect_winsock_events (set);
1404           } else {
1405             res = 1;            /* wakeup event */
1406           }
1407         }
1408 #else
1409         g_assert_not_reached ();
1410         errno = ENOSYS;
1411 #endif
1412         break;
1413       }
1414     }
1415
1416     if (!is_timer) {
1417       /* Applications needs to clear the control socket themselves for timer
1418        * polls.
1419        * For other polls, we need to clear the control socket. If there was only
1420        * one socket with activity and it was the control socket, we need to
1421        * restart */
1422       if (release_all_wakeup (set) > 0 && res == 1)
1423         restarting = TRUE;
1424     }
1425
1426     if (G_UNLIKELY (IS_FLUSHING (set))) {
1427       /* we got woken up and we are flushing, we need to stop */
1428       errno = EBUSY;
1429       res = -1;
1430       break;
1431     }
1432   } while (G_UNLIKELY (restarting));
1433
1434   DEC_WAITING (set);
1435
1436   return res;
1437
1438   /* ERRORS */
1439 already_waiting:
1440   {
1441     DEC_WAITING (set);
1442     errno = EPERM;
1443     return -1;
1444   }
1445 flushing:
1446   {
1447     DEC_WAITING (set);
1448     errno = EBUSY;
1449     return -1;
1450   }
1451 #ifdef G_OS_WIN32
1452 winsock_error:
1453   {
1454     g_mutex_unlock (set->lock);
1455     DEC_WAITING (set);
1456     return -1;
1457   }
1458 #endif
1459 }
1460
1461 /**
1462  * gst_poll_set_controllable:
1463  * @set: a #GstPoll.
1464  * @controllable: new controllable state.
1465  *
1466  * When @controllable is %TRUE, this function ensures that future calls to
1467  * gst_poll_wait() will be affected by gst_poll_restart() and
1468  * gst_poll_set_flushing().
1469  *
1470  * Returns: %TRUE if the controllability of @set could be updated.
1471  *
1472  * Since: 0.10.18
1473  */
1474 gboolean
1475 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1476 {
1477   g_return_val_if_fail (set != NULL, FALSE);
1478
1479   GST_LOG ("%p: controllable : %d", set, controllable);
1480
1481   set->controllable = controllable;
1482
1483   return TRUE;
1484 }
1485
1486 /**
1487  * gst_poll_restart:
1488  * @set: a #GstPoll.
1489  *
1490  * Restart any gst_poll_wait() that is in progress. This function is typically
1491  * used after adding or removing descriptors to @set.
1492  *
1493  * If @set is not controllable, then this call will have no effect.
1494  *
1495  * Since: 0.10.18
1496  */
1497 void
1498 gst_poll_restart (GstPoll * set)
1499 {
1500   g_return_if_fail (set != NULL);
1501
1502   if (set->controllable && GET_WAITING (set) > 0) {
1503     /* we are controllable and waiting, wake up the waiter. The socket will be
1504      * cleared by the _wait() thread and the poll will be restarted */
1505     raise_wakeup (set);
1506   }
1507 }
1508
1509 /**
1510  * gst_poll_set_flushing:
1511  * @set: a #GstPoll.
1512  * @flushing: new flushing state.
1513  *
1514  * When @flushing is %TRUE, this function ensures that current and future calls
1515  * to gst_poll_wait() will return -1, with errno set to EBUSY.
1516  *
1517  * Unsetting the flushing state will restore normal operation of @set.
1518  *
1519  * Since: 0.10.18
1520  */
1521 void
1522 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1523 {
1524   g_return_if_fail (set != NULL);
1525
1526   /* update the new state first */
1527   SET_FLUSHING (set, flushing);
1528
1529   if (flushing && set->controllable && GET_WAITING (set) > 0) {
1530     /* we are flushing, controllable and waiting, wake up the waiter. When we
1531      * stop the flushing operation we don't clear the wakeup fd here, this will
1532      * happen in the _wait() thread. */
1533     raise_wakeup (set);
1534   }
1535 }
1536
1537 /**
1538  * gst_poll_write_control:
1539  * @set: a #GstPoll.
1540  *
1541  * Write a byte to the control socket of the controllable @set.
1542  * This function is mostly useful for timer #GstPoll objects created with
1543  * gst_poll_new_timer(). 
1544  *
1545  * It will make any current and future gst_poll_wait() function return with
1546  * 1, meaning the control socket is set. After an equal amount of calls to
1547  * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1548  * block again until their timeout expired.
1549  *
1550  * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
1551  * byte could not be written.
1552  *
1553  * Since: 0.10.23
1554  */
1555 gboolean
1556 gst_poll_write_control (GstPoll * set)
1557 {
1558   gboolean res;
1559
1560   g_return_val_if_fail (set != NULL, FALSE);
1561   g_return_val_if_fail (set->timer, FALSE);
1562
1563   res = raise_wakeup (set);
1564
1565   return res;
1566 }
1567
1568 /**
1569  * gst_poll_read_control:
1570  * @set: a #GstPoll.
1571  *
1572  * Read a byte from the control socket of the controllable @set.
1573  * This function is mostly useful for timer #GstPoll objects created with
1574  * gst_poll_new_timer(). 
1575  *
1576  * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
1577  * was no byte to read.
1578  *
1579  * Since: 0.10.23
1580  */
1581 gboolean
1582 gst_poll_read_control (GstPoll * set)
1583 {
1584   gboolean res;
1585
1586   g_return_val_if_fail (set != NULL, FALSE);
1587   g_return_val_if_fail (set->timer, FALSE);
1588
1589   res = release_wakeup (set);
1590
1591   return res;
1592 }