poll: don't call WSAWaitForMultipleEvents with no events
[platform/upstream/gstreamer.git] / gst / gstpoll.c
1 /* GStreamer
2  * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4  * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5  * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6  *
7  * gstpoll.c: File descriptor set
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22  * Boston, MA 02111-1307, USA.
23  */
24 /**
25  * SECTION:gstpoll
26  * @short_description: Keep track of file descriptors and make it possible
27  *                     to wait on them in a cancelable way
28  *
29  * A #GstPoll keeps track of file descriptors much like fd_set (used with
30  * select()) or a struct pollfd array (used with poll()). Once created with
31  * gst_poll_new(), the set can be used to wait for file descriptors to be
32  * readable and/or writeable. It is possible to make this wait be controlled
33  * by specifying %TRUE for the @controllable flag when creating the set (or
34  * later calling gst_poll_set_controllable()).
35  *
36  * New file descriptors are added to the set using gst_poll_add_fd(), and
37  * removed using gst_poll_remove_fd(). Controlling which file descriptors
38  * should be waited for to become readable and/or writeable are done using
39  * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
40  *
41  * Use gst_poll_wait() to wait for the file descriptors to actually become
42  * readable and/or writeable, or to timeout if no file descriptor is available
43  * in time. The wait can be controlled by calling gst_poll_restart() and
44  * gst_poll_set_flushing().
45  *
46  * Once the file descriptor set has been waited for, one can use
47  * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48  * gst_poll_fd_has_error() to see if it has generated an error,
49  * gst_poll_fd_can_read() to see if it is possible to read from the file
50  * descriptor, and gst_poll_fd_can_write() to see if it is possible to
51  * write to it.
52  *
53  */
54
55 #ifdef HAVE_CONFIG_H
56 #include "config.h"
57 #endif
58
59 #include "gst_private.h"
60
61 #include <sys/types.h>
62
63 #ifdef HAVE_UNISTD_H
64 #include <unistd.h>
65 #endif
66
67 #include <errno.h>
68 #include <fcntl.h>
69
70 #include <glib.h>
71
72 #ifdef G_OS_WIN32
73 #include <winsock2.h>
74 #define EINPROGRESS WSAEINPROGRESS
75 #else
76 #define _GNU_SOURCE 1
77 #include <sys/poll.h>
78 #include <sys/time.h>
79 #include <sys/socket.h>
80 #endif
81
82 /* OS/X needs this because of bad headers */
83 #include <string.h>
84
85 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
86  * so we prefer our own poll emulation.
87  */
88 #if defined(BROKEN_POLL)
89 #undef HAVE_POLL
90 #endif
91
92 #include "gstpoll.h"
93
94 #define GST_CAT_DEFAULT GST_CAT_POLL
95
96 #ifdef G_OS_WIN32
97 typedef struct _WinsockFd WinsockFd;
98
99 struct _WinsockFd
100 {
101   gint fd;
102   glong event_mask;
103   WSANETWORKEVENTS events;
104   glong ignored_event_mask;
105 };
106 #endif
107
108 typedef enum
109 {
110   GST_POLL_MODE_AUTO,
111   GST_POLL_MODE_SELECT,
112   GST_POLL_MODE_PSELECT,
113   GST_POLL_MODE_POLL,
114   GST_POLL_MODE_PPOLL,
115   GST_POLL_MODE_WINDOWS
116 } GstPollMode;
117
118 struct _GstPoll
119 {
120   GstPollMode mode;
121
122   GMutex *lock;
123   /* array of fds, always written to and read from with lock */
124   GArray *fds;
125   /* array of active fds, only written to from the waiting thread with the
126    * lock and read from with the lock or without the lock from the waiting
127    * thread */
128   GArray *active_fds;
129
130 #ifndef G_OS_WIN32
131   gchar buf[1];
132   GstPollFD control_read_fd;
133   GstPollFD control_write_fd;
134 #else
135   GArray *active_fds_ignored;
136   GArray *events;
137   GArray *active_events;
138
139   HANDLE wakeup_event;
140 #endif
141
142   gboolean controllable;
143   volatile gint waiting;
144   volatile gint control_pending;
145   volatile gint flushing;
146   gboolean timer;
147   volatile gint rebuild;
148 };
149
150 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
151     gboolean active);
152 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
153
154 #define IS_FLUSHING(s)      (g_atomic_int_get(&(s)->flushing))
155 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
156
157 #define INC_WAITING(s)      (g_atomic_int_exchange_and_add(&(s)->waiting, 1))
158 #define DEC_WAITING(s)      (g_atomic_int_exchange_and_add(&(s)->waiting, -1))
159 #define GET_WAITING(s)      (g_atomic_int_get(&(s)->waiting))
160
161 #define TEST_REBUILD(s)     (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
162 #define MARK_REBUILD(s)     (g_atomic_int_set(&(s)->rebuild, 1))
163
164 #ifndef G_OS_WIN32
165 #define WAKE_EVENT(s)       (write ((s)->control_write_fd.fd, "W", 1) == 1)
166 #define RELEASE_EVENT(s)    (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1)
167 #else
168 #define WAKE_EVENT(s)       (SetEvent ((s)->wakeup_event))
169 #define RELEASE_EVENT(s)    (ResetEvent ((s)->wakeup_event))
170 #endif
171
172 /* the poll/select call is also performed on a control socket, that way
173  * we can send special commands to control it */
174 static inline gboolean
175 raise_wakeup (GstPoll * set)
176 {
177   gboolean result = TRUE;
178
179   if (g_atomic_int_exchange_and_add (&set->control_pending, 1) == 0) {
180     /* raise when nothing pending */
181     result = WAKE_EVENT (set);
182   }
183   return result;
184 }
185
186 /* note how bad things can happen when the 2 threads both raise and release the
187  * wakeup. This is however not a problem because you must always pair a raise
188  * with a release */
189 static inline gboolean
190 release_wakeup (GstPoll * set)
191 {
192   gboolean result = TRUE;
193
194   if (g_atomic_int_dec_and_test (&set->control_pending)) {
195     result = RELEASE_EVENT (set);
196   }
197   return result;
198 }
199
200 static inline gint
201 release_all_wakeup (GstPoll * set)
202 {
203   gint old;
204
205   while (TRUE) {
206     if (!(old = g_atomic_int_get (&set->control_pending)))
207       /* nothing pending, just exit */
208       break;
209
210     /* try to remove all pending control messages */
211     if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
212       /* we managed to remove all messages, read the control socket */
213       (void) RELEASE_EVENT (set);
214       break;
215     }
216   }
217   return old;
218 }
219
220 static gint
221 find_index (GArray * array, GstPollFD * fd)
222 {
223 #ifndef G_OS_WIN32
224   struct pollfd *ifd;
225 #else
226   WinsockFd *ifd;
227 #endif
228   guint i;
229
230   /* start by assuming the index found in the fd is still valid */
231   if (fd->idx >= 0 && fd->idx < array->len) {
232 #ifndef G_OS_WIN32
233     ifd = &g_array_index (array, struct pollfd, fd->idx);
234 #else
235     ifd = &g_array_index (array, WinsockFd, fd->idx);
236 #endif
237
238     if (ifd->fd == fd->fd) {
239       return fd->idx;
240     }
241   }
242
243   /* the pollfd array has changed and we need to lookup the fd again */
244   for (i = 0; i < array->len; i++) {
245 #ifndef G_OS_WIN32
246     ifd = &g_array_index (array, struct pollfd, i);
247 #else
248     ifd = &g_array_index (array, WinsockFd, i);
249 #endif
250
251     if (ifd->fd == fd->fd) {
252       fd->idx = (gint) i;
253       return fd->idx;
254     }
255   }
256
257   fd->idx = -1;
258   return fd->idx;
259 }
260
261 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
262 /* check if all file descriptors will fit in an fd_set */
263 static gboolean
264 selectable_fds (const GstPoll * set)
265 {
266   guint i;
267
268   g_mutex_lock (set->lock);
269   for (i = 0; i < set->fds->len; i++) {
270     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
271
272     if (pfd->fd >= FD_SETSIZE)
273       goto too_many;
274   }
275   g_mutex_unlock (set->lock);
276
277   return TRUE;
278
279 too_many:
280   {
281     g_mutex_unlock (set->lock);
282     return FALSE;
283   }
284 }
285
286 /* check if the timeout will convert to a timeout value used for poll()
287  * without a loss of precision
288  */
289 static gboolean
290 pollable_timeout (GstClockTime timeout)
291 {
292   if (timeout == GST_CLOCK_TIME_NONE)
293     return TRUE;
294
295   /* not a nice multiple of milliseconds */
296   if (timeout % 1000000)
297     return FALSE;
298
299   return TRUE;
300 }
301 #endif
302
303 static GstPollMode
304 choose_mode (const GstPoll * set, GstClockTime timeout)
305 {
306   GstPollMode mode;
307
308   if (set->mode == GST_POLL_MODE_AUTO) {
309 #ifdef HAVE_PPOLL
310     mode = GST_POLL_MODE_PPOLL;
311 #elif defined(HAVE_POLL)
312     if (!selectable_fds (set) || pollable_timeout (timeout)) {
313       mode = GST_POLL_MODE_POLL;
314     } else {
315 #ifdef HAVE_PSELECT
316       mode = GST_POLL_MODE_PSELECT;
317 #else
318       mode = GST_POLL_MODE_SELECT;
319 #endif
320     }
321 #elif defined(HAVE_PSELECT)
322     mode = GST_POLL_MODE_PSELECT;
323 #else
324     mode = GST_POLL_MODE_SELECT;
325 #endif
326   } else {
327     mode = set->mode;
328   }
329   return mode;
330 }
331
332 #ifndef G_OS_WIN32
333 static gint
334 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
335     fd_set * errorfds)
336 {
337   gint max_fd = -1;
338   guint i;
339
340   FD_ZERO (readfds);
341   FD_ZERO (writefds);
342   FD_ZERO (errorfds);
343
344   g_mutex_lock (set->lock);
345
346   for (i = 0; i < set->active_fds->len; i++) {
347     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
348
349     if (pfd->fd < FD_SETSIZE) {
350       if (pfd->events & POLLIN)
351         FD_SET (pfd->fd, readfds);
352       if (pfd->events & POLLOUT)
353         FD_SET (pfd->fd, writefds);
354       if (pfd->events)
355         FD_SET (pfd->fd, errorfds);
356       if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
357         max_fd = pfd->fd;
358     }
359   }
360
361   g_mutex_unlock (set->lock);
362
363   return max_fd;
364 }
365
366 static void
367 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
368     fd_set * errorfds)
369 {
370   guint i;
371
372   g_mutex_lock (set->lock);
373
374   for (i = 0; i < set->active_fds->len; i++) {
375     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
376
377     if (pfd->fd < FD_SETSIZE) {
378       pfd->revents = 0;
379       if (FD_ISSET (pfd->fd, readfds))
380         pfd->revents |= POLLIN;
381       if (FD_ISSET (pfd->fd, writefds))
382         pfd->revents |= POLLOUT;
383       if (FD_ISSET (pfd->fd, errorfds))
384         pfd->revents |= POLLERR;
385     }
386   }
387
388   g_mutex_unlock (set->lock);
389 }
390 #else /* G_OS_WIN32 */
391 /*
392  * Translate errors thrown by the Winsock API used by GstPoll:
393  *   WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
394  */
395 static gint
396 gst_poll_winsock_error_to_errno (DWORD last_error)
397 {
398   switch (last_error) {
399     case WSA_INVALID_HANDLE:
400     case WSAEINVAL:
401     case WSAENOTSOCK:
402       return EBADF;
403
404     case WSA_NOT_ENOUGH_MEMORY:
405       return ENOMEM;
406
407       /*
408        * Anything else, including:
409        *   WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
410        *   WSANOTINITIALISED
411        */
412     default:
413       return EINVAL;
414   }
415 }
416
417 static void
418 gst_poll_free_winsock_event (GstPoll * set, gint idx)
419 {
420   WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
421   HANDLE event = g_array_index (set->events, HANDLE, idx);
422
423   WSAEventSelect (wfd->fd, event, 0);
424   CloseHandle (event);
425 }
426
427 static void
428 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
429     gboolean active)
430 {
431   WinsockFd *wfd;
432
433   wfd = &g_array_index (set->fds, WinsockFd, idx);
434
435   if (active)
436     wfd->event_mask |= flags;
437   else
438     wfd->event_mask &= ~flags;
439
440   /* reset ignored state if the new mask doesn't overlap at all */
441   if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
442     wfd->ignored_event_mask = 0;
443 }
444
445 static gboolean
446 gst_poll_prepare_winsock_active_sets (GstPoll * set)
447 {
448   guint i;
449
450   g_array_set_size (set->active_fds, 0);
451   g_array_set_size (set->active_fds_ignored, 0);
452   g_array_set_size (set->active_events, 0);
453   g_array_append_val (set->active_events, set->wakeup_event);
454
455   for (i = 0; i < set->fds->len; i++) {
456     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
457     HANDLE event = g_array_index (set->events, HANDLE, i);
458
459     if (wfd->ignored_event_mask == 0) {
460       gint ret;
461
462       g_array_append_val (set->active_fds, *wfd);
463       g_array_append_val (set->active_events, event);
464
465       ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
466       if (G_UNLIKELY (ret != 0)) {
467         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
468         return FALSE;
469       }
470     } else {
471       g_array_append_val (set->active_fds_ignored, wfd);
472     }
473   }
474
475   return TRUE;
476 }
477
478 static gint
479 gst_poll_collect_winsock_events (GstPoll * set)
480 {
481   gint res, i;
482
483   /*
484    * We need to check which events are signaled, and call
485    * WSAEnumNetworkEvents for those that are, which resets
486    * the event and clears the internal network event records.
487    */
488   res = 0;
489   for (i = 0; i < set->active_fds->len; i++) {
490     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
491     HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
492     DWORD wait_ret;
493
494     wait_ret = WaitForSingleObject (event, 0);
495     if (wait_ret == WAIT_OBJECT_0) {
496       gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
497
498       if (G_UNLIKELY (enum_ret != 0)) {
499         res = -1;
500         errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
501         break;
502       }
503
504       res++;
505     } else {
506       /* clear any previously stored result */
507       memset (&wfd->events, 0, sizeof (wfd->events));
508     }
509   }
510
511   /* If all went well we also need to reset the ignored fds. */
512   if (res >= 0) {
513     res += set->active_fds_ignored->len;
514
515     for (i = 0; i < set->active_fds_ignored->len; i++) {
516       WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
517
518       wfd->ignored_event_mask = 0;
519     }
520
521     g_array_set_size (set->active_fds_ignored, 0);
522   }
523
524   return res;
525 }
526 #endif
527
528 /**
529  * gst_poll_new:
530  * @controllable: whether it should be possible to control a wait.
531  *
532  * Create a new file descriptor set. If @controllable, it
533  * is possible to restart or flush a call to gst_poll_wait() with
534  * gst_poll_restart() and gst_poll_set_flushing() respectively.
535  *
536  * Free-function: gst_poll_free
537  *
538  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
539  *     Free with gst_poll_free().
540  *
541  * Since: 0.10.18
542  */
543 GstPoll *
544 gst_poll_new (gboolean controllable)
545 {
546   GstPoll *nset;
547
548   GST_DEBUG ("controllable : %d", controllable);
549
550   nset = g_slice_new0 (GstPoll);
551   nset->lock = g_mutex_new ();
552 #ifndef G_OS_WIN32
553   nset->mode = GST_POLL_MODE_AUTO;
554   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
555   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
556   nset->control_read_fd.fd = -1;
557   nset->control_write_fd.fd = -1;
558   {
559     gint control_sock[2];
560
561     if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
562       goto no_socket_pair;
563
564     fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
565     fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
566
567     nset->control_read_fd.fd = control_sock[0];
568     nset->control_write_fd.fd = control_sock[1];
569
570     gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
571     gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
572   }
573 #else
574   nset->mode = GST_POLL_MODE_WINDOWS;
575   nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
576   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
577   nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
578   nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
579   nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
580
581   nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
582 #endif
583
584   /* ensure (re)build, though already sneakily set in non-windows case */
585   MARK_REBUILD (nset);
586
587   nset->controllable = controllable;
588
589   return nset;
590
591   /* ERRORS */
592 #ifndef G_OS_WIN32
593 no_socket_pair:
594   {
595     GST_WARNING ("%p: can't create socket pair !", nset);
596     gst_poll_free (nset);
597     return NULL;
598   }
599 #endif
600 }
601
602 /**
603  * gst_poll_new_timer:
604  *
605  * Create a new poll object that can be used for scheduling cancellable
606  * timeouts.
607  *
608  * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
609  * performed from different threads. 
610  *
611  * Free-function: gst_poll_free
612  *
613  * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
614  *     Free with gst_poll_free().
615  *
616  * Since: 0.10.23
617  */
618 GstPoll *
619 gst_poll_new_timer (void)
620 {
621   GstPoll *poll;
622
623   /* make a new controllable poll set */
624   if (!(poll = gst_poll_new (TRUE)))
625     goto done;
626
627   /* we are a timer */
628   poll->timer = TRUE;
629
630 done:
631   return poll;
632 }
633
634 /**
635  * gst_poll_free:
636  * @set: (transfer full): a file descriptor set.
637  *
638  * Free a file descriptor set.
639  *
640  * Since: 0.10.18
641  */
642 void
643 gst_poll_free (GstPoll * set)
644 {
645   g_return_if_fail (set != NULL);
646
647   GST_DEBUG ("%p: freeing", set);
648
649 #ifndef G_OS_WIN32
650   if (set->control_write_fd.fd >= 0)
651     close (set->control_write_fd.fd);
652   if (set->control_read_fd.fd >= 0)
653     close (set->control_read_fd.fd);
654 #else
655   CloseHandle (set->wakeup_event);
656
657   {
658     guint i;
659
660     for (i = 0; i < set->events->len; i++)
661       gst_poll_free_winsock_event (set, i);
662   }
663
664   g_array_free (set->active_events, TRUE);
665   g_array_free (set->events, TRUE);
666   g_array_free (set->active_fds_ignored, TRUE);
667 #endif
668
669   g_array_free (set->active_fds, TRUE);
670   g_array_free (set->fds, TRUE);
671   g_mutex_free (set->lock);
672   g_slice_free (GstPoll, set);
673 }
674
675 /**
676  * gst_poll_get_read_gpollfd:
677  * @set: a #GstPoll
678  * @fd: a #GPollFD
679  *
680  * Get a GPollFD for the reading part of the control socket. This is useful when
681  * integrating with a GSource and GMainLoop.
682  *
683  * Since: 0.10.32
684  */
685 void
686 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
687 {
688   g_return_if_fail (set != NULL);
689   g_return_if_fail (fd != NULL);
690
691 #ifndef G_OS_WIN32
692   fd->fd = set->control_read_fd.fd;
693 #else
694 #if GLIB_SIZEOF_VOID_P == 8
695   fd->fd = (gint64) set->wakeup_event;
696 #else
697   fd->fd = (gint) set->wakeup_event;
698 #endif
699 #endif
700   fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
701   fd->revents = 0;
702 }
703
704 /**
705  * gst_poll_fd_init:
706  * @fd: a #GstPollFD
707  *
708  * Initializes @fd. Alternatively you can initialize it with
709  * #GST_POLL_FD_INIT.
710  *
711  * Since: 0.10.18
712  */
713 void
714 gst_poll_fd_init (GstPollFD * fd)
715 {
716   g_return_if_fail (fd != NULL);
717
718   fd->fd = -1;
719   fd->idx = -1;
720 }
721
722 static gboolean
723 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
724 {
725   gint idx;
726
727   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
728
729   idx = find_index (set->fds, fd);
730   if (idx < 0) {
731 #ifndef G_OS_WIN32
732     struct pollfd nfd;
733
734     nfd.fd = fd->fd;
735     nfd.events = POLLERR | POLLNVAL | POLLHUP;
736     nfd.revents = 0;
737
738     g_array_append_val (set->fds, nfd);
739
740     fd->idx = set->fds->len - 1;
741 #else
742     WinsockFd wfd;
743     HANDLE event;
744
745     wfd.fd = fd->fd;
746     wfd.event_mask = FD_CLOSE;
747     memset (&wfd.events, 0, sizeof (wfd.events));
748     wfd.ignored_event_mask = 0;
749     event = WSACreateEvent ();
750
751     g_array_append_val (set->fds, wfd);
752     g_array_append_val (set->events, event);
753
754     fd->idx = set->fds->len - 1;
755 #endif
756     MARK_REBUILD (set);
757   } else {
758     GST_WARNING ("%p: couldn't find fd !", set);
759   }
760
761   return TRUE;
762 }
763
764 /**
765  * gst_poll_add_fd:
766  * @set: a file descriptor set.
767  * @fd: a file descriptor.
768  *
769  * Add a file descriptor to the file descriptor set.
770  *
771  * Returns: %TRUE if the file descriptor was successfully added to the set.
772  *
773  * Since: 0.10.18
774  */
775 gboolean
776 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
777 {
778   gboolean ret;
779
780   g_return_val_if_fail (set != NULL, FALSE);
781   g_return_val_if_fail (fd != NULL, FALSE);
782   g_return_val_if_fail (fd->fd >= 0, FALSE);
783
784   g_mutex_lock (set->lock);
785
786   ret = gst_poll_add_fd_unlocked (set, fd);
787
788   g_mutex_unlock (set->lock);
789
790   return ret;
791 }
792
793 /**
794  * gst_poll_remove_fd:
795  * @set: a file descriptor set.
796  * @fd: a file descriptor.
797  *
798  * Remove a file descriptor from the file descriptor set.
799  *
800  * Returns: %TRUE if the file descriptor was successfully removed from the set.
801  *
802  * Since: 0.10.18
803  */
804 gboolean
805 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
806 {
807   gint idx;
808
809   g_return_val_if_fail (set != NULL, FALSE);
810   g_return_val_if_fail (fd != NULL, FALSE);
811   g_return_val_if_fail (fd->fd >= 0, FALSE);
812
813
814   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
815
816   g_mutex_lock (set->lock);
817
818   /* get the index, -1 is an fd that is not added */
819   idx = find_index (set->fds, fd);
820   if (idx >= 0) {
821 #ifdef G_OS_WIN32
822     gst_poll_free_winsock_event (set, idx);
823     g_array_remove_index_fast (set->events, idx);
824 #endif
825
826     /* remove the fd at index, we use _remove_index_fast, which copies the last
827      * element of the array to the freed index */
828     g_array_remove_index_fast (set->fds, idx);
829
830     /* mark fd as removed by setting the index to -1 */
831     fd->idx = -1;
832     MARK_REBUILD (set);
833   } else {
834     GST_WARNING ("%p: couldn't find fd !", set);
835   }
836
837   g_mutex_unlock (set->lock);
838
839   return idx >= 0;
840 }
841
842 /**
843  * gst_poll_fd_ctl_write:
844  * @set: a file descriptor set.
845  * @fd: a file descriptor.
846  * @active: a new status.
847  *
848  * Control whether the descriptor @fd in @set will be monitored for
849  * writability.
850  *
851  * Returns: %TRUE if the descriptor was successfully updated.
852  *
853  * Since: 0.10.18
854  */
855 gboolean
856 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
857 {
858   gint idx;
859
860   g_return_val_if_fail (set != NULL, FALSE);
861   g_return_val_if_fail (fd != NULL, FALSE);
862   g_return_val_if_fail (fd->fd >= 0, FALSE);
863
864   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
865       fd->fd, fd->idx, active);
866
867   g_mutex_lock (set->lock);
868
869   idx = find_index (set->fds, fd);
870   if (idx >= 0) {
871 #ifndef G_OS_WIN32
872     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
873
874     if (active)
875       pfd->events |= POLLOUT;
876     else
877       pfd->events &= ~POLLOUT;
878
879     GST_LOG ("pfd->events now %d (POLLOUT:%d)", pfd->events, POLLOUT);
880 #else
881     gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
882         active);
883 #endif
884     MARK_REBUILD (set);
885   } else {
886     GST_WARNING ("%p: couldn't find fd !", set);
887   }
888
889   g_mutex_unlock (set->lock);
890
891   return idx >= 0;
892 }
893
894 static gboolean
895 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
896 {
897   gint idx;
898
899   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
900       fd->fd, fd->idx, active);
901
902   idx = find_index (set->fds, fd);
903
904   if (idx >= 0) {
905 #ifndef G_OS_WIN32
906     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
907
908     if (active)
909       pfd->events |= (POLLIN | POLLPRI);
910     else
911       pfd->events &= ~(POLLIN | POLLPRI);
912 #else
913     gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
914 #endif
915     MARK_REBUILD (set);
916   } else {
917     GST_WARNING ("%p: couldn't find fd !", set);
918   }
919
920   return idx >= 0;
921 }
922
923 /**
924  * gst_poll_fd_ctl_read:
925  * @set: a file descriptor set.
926  * @fd: a file descriptor.
927  * @active: a new status.
928  *
929  * Control whether the descriptor @fd in @set will be monitored for
930  * readability.
931  *
932  * Returns: %TRUE if the descriptor was successfully updated.
933  *
934  * Since: 0.10.18
935  */
936 gboolean
937 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
938 {
939   gboolean ret;
940
941   g_return_val_if_fail (set != NULL, FALSE);
942   g_return_val_if_fail (fd != NULL, FALSE);
943   g_return_val_if_fail (fd->fd >= 0, FALSE);
944
945   g_mutex_lock (set->lock);
946
947   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
948
949   g_mutex_unlock (set->lock);
950
951   return ret;
952 }
953
954 /**
955  * gst_poll_fd_ignored:
956  * @set: a file descriptor set.
957  * @fd: a file descriptor.
958  *
959  * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
960  * the same result for @fd as last time. This function must be called if no
961  * operation (read/write/recv/send/etc.) will be performed on @fd before
962  * the next call to gst_poll_wait().
963  *
964  * The reason why this is needed is because the underlying implementation
965  * might not allow querying the fd more than once between calls to one of
966  * the re-enabling operations.
967  *
968  * Since: 0.10.18
969  */
970 void
971 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
972 {
973 #ifdef G_OS_WIN32
974   gint idx;
975
976   g_return_if_fail (set != NULL);
977   g_return_if_fail (fd != NULL);
978   g_return_if_fail (fd->fd >= 0);
979
980   g_mutex_lock (set->lock);
981
982   idx = find_index (set->fds, fd);
983   if (idx >= 0) {
984     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
985
986     wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
987     MARK_REBUILD (set);
988   }
989
990   g_mutex_unlock (set->lock);
991 #endif
992 }
993
994 /**
995  * gst_poll_fd_has_closed:
996  * @set: a file descriptor set.
997  * @fd: a file descriptor.
998  *
999  * Check if @fd in @set has closed the connection.
1000  *
1001  * Returns: %TRUE if the connection was closed.
1002  *
1003  * Since: 0.10.18
1004  */
1005 gboolean
1006 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
1007 {
1008   gboolean res = FALSE;
1009   gint idx;
1010
1011   g_return_val_if_fail (set != NULL, FALSE);
1012   g_return_val_if_fail (fd != NULL, FALSE);
1013   g_return_val_if_fail (fd->fd >= 0, FALSE);
1014
1015   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1016
1017   g_mutex_lock (set->lock);
1018
1019   idx = find_index (set->active_fds, fd);
1020   if (idx >= 0) {
1021 #ifndef G_OS_WIN32
1022     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1023
1024     res = (pfd->revents & POLLHUP) != 0;
1025 #else
1026     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1027
1028     res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1029 #endif
1030   } else {
1031     GST_WARNING ("%p: couldn't find fd !", set);
1032   }
1033
1034   g_mutex_unlock (set->lock);
1035
1036   return res;
1037 }
1038
1039 /**
1040  * gst_poll_fd_has_error:
1041  * @set: a file descriptor set.
1042  * @fd: a file descriptor.
1043  *
1044  * Check if @fd in @set has an error.
1045  *
1046  * Returns: %TRUE if the descriptor has an error.
1047  *
1048  * Since: 0.10.18
1049  */
1050 gboolean
1051 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1052 {
1053   gboolean res = FALSE;
1054   gint idx;
1055
1056   g_return_val_if_fail (set != NULL, FALSE);
1057   g_return_val_if_fail (fd != NULL, FALSE);
1058   g_return_val_if_fail (fd->fd >= 0, FALSE);
1059
1060   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1061
1062   g_mutex_lock (set->lock);
1063
1064   idx = find_index (set->active_fds, fd);
1065   if (idx >= 0) {
1066 #ifndef G_OS_WIN32
1067     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1068
1069     res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1070 #else
1071     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1072
1073     res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1074         (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1075         (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1076         (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1077         (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1078 #endif
1079   } else {
1080     GST_WARNING ("%p: couldn't find fd !", set);
1081   }
1082
1083   g_mutex_unlock (set->lock);
1084
1085   return res;
1086 }
1087
1088 static gboolean
1089 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1090 {
1091   gboolean res = FALSE;
1092   gint idx;
1093
1094   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1095
1096   idx = find_index (set->active_fds, fd);
1097   if (idx >= 0) {
1098 #ifndef G_OS_WIN32
1099     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1100
1101     res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1102 #else
1103     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1104
1105     res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1106 #endif
1107   } else {
1108     GST_WARNING ("%p: couldn't find fd !", set);
1109   }
1110
1111   return res;
1112 }
1113
1114 /**
1115  * gst_poll_fd_can_read:
1116  * @set: a file descriptor set.
1117  * @fd: a file descriptor.
1118  *
1119  * Check if @fd in @set has data to be read.
1120  *
1121  * Returns: %TRUE if the descriptor has data to be read.
1122  *
1123  * Since: 0.10.18
1124  */
1125 gboolean
1126 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1127 {
1128   gboolean res = FALSE;
1129
1130   g_return_val_if_fail (set != NULL, FALSE);
1131   g_return_val_if_fail (fd != NULL, FALSE);
1132   g_return_val_if_fail (fd->fd >= 0, FALSE);
1133
1134   g_mutex_lock (set->lock);
1135
1136   res = gst_poll_fd_can_read_unlocked (set, fd);
1137
1138   g_mutex_unlock (set->lock);
1139
1140   return res;
1141 }
1142
1143 /**
1144  * gst_poll_fd_can_write:
1145  * @set: a file descriptor set.
1146  * @fd: a file descriptor.
1147  *
1148  * Check if @fd in @set can be used for writing.
1149  *
1150  * Returns: %TRUE if the descriptor can be used for writing.
1151  *
1152  * Since: 0.10.18
1153  */
1154 gboolean
1155 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1156 {
1157   gboolean res = FALSE;
1158   gint idx;
1159
1160   g_return_val_if_fail (set != NULL, FALSE);
1161   g_return_val_if_fail (fd != NULL, FALSE);
1162   g_return_val_if_fail (fd->fd >= 0, FALSE);
1163
1164   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1165
1166   g_mutex_lock (set->lock);
1167
1168   idx = find_index (set->active_fds, fd);
1169   if (idx >= 0) {
1170 #ifndef G_OS_WIN32
1171     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1172
1173     res = (pfd->revents & POLLOUT) != 0;
1174 #else
1175     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1176
1177     res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1178 #endif
1179   } else {
1180     GST_WARNING ("%p: couldn't find fd !", set);
1181   }
1182
1183   g_mutex_unlock (set->lock);
1184
1185   return res;
1186 }
1187
1188 /**
1189  * gst_poll_wait:
1190  * @set: a #GstPoll.
1191  * @timeout: a timeout in nanoseconds.
1192  *
1193  * Wait for activity on the file descriptors in @set. This function waits up to
1194  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
1195  *
1196  * For #GstPoll objects created with gst_poll_new(), this function can only be
1197  * called from a single thread at a time.  If called from multiple threads,
1198  * -1 will be returned with errno set to EPERM.
1199  *
1200  * This is not true for timer #GstPoll objects created with
1201  * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1202  * simultaneously.
1203  *
1204  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1205  * activity was detected after @timeout. If an error occurs, -1 is returned
1206  * and errno is set.
1207  *
1208  * Since: 0.10.18
1209  */
1210 gint
1211 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1212 {
1213   gboolean restarting;
1214   gboolean is_timer;
1215   int res;
1216   gint old_waiting;
1217
1218   g_return_val_if_fail (set != NULL, -1);
1219
1220   GST_DEBUG ("timeout :%" GST_TIME_FORMAT, GST_TIME_ARGS (timeout));
1221
1222   is_timer = set->timer;
1223
1224   /* add one more waiter */
1225   old_waiting = INC_WAITING (set);
1226
1227   /* we cannot wait from multiple threads unless we are a timer */
1228   if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1229     goto already_waiting;
1230
1231   /* flushing, exit immediatly */
1232   if (G_UNLIKELY (IS_FLUSHING (set)))
1233     goto flushing;
1234
1235   do {
1236     GstPollMode mode;
1237
1238     res = -1;
1239     restarting = FALSE;
1240
1241     mode = choose_mode (set, timeout);
1242
1243     if (TEST_REBUILD (set)) {
1244       g_mutex_lock (set->lock);
1245 #ifndef G_OS_WIN32
1246       g_array_set_size (set->active_fds, set->fds->len);
1247       memcpy (set->active_fds->data, set->fds->data,
1248           set->fds->len * sizeof (struct pollfd));
1249 #else
1250       if (!gst_poll_prepare_winsock_active_sets (set))
1251         goto winsock_error;
1252 #endif
1253       g_mutex_unlock (set->lock);
1254     }
1255
1256     switch (mode) {
1257       case GST_POLL_MODE_AUTO:
1258         g_assert_not_reached ();
1259         break;
1260       case GST_POLL_MODE_PPOLL:
1261       {
1262 #ifdef HAVE_PPOLL
1263         struct timespec ts;
1264         struct timespec *tsptr;
1265
1266         if (timeout != GST_CLOCK_TIME_NONE) {
1267           GST_TIME_TO_TIMESPEC (timeout, ts);
1268           tsptr = &ts;
1269         } else {
1270           tsptr = NULL;
1271         }
1272
1273         res =
1274             ppoll ((struct pollfd *) set->active_fds->data,
1275             set->active_fds->len, tsptr, NULL);
1276 #else
1277         g_assert_not_reached ();
1278         errno = ENOSYS;
1279 #endif
1280         break;
1281       }
1282       case GST_POLL_MODE_POLL:
1283       {
1284 #ifdef HAVE_POLL
1285         gint t;
1286
1287         if (timeout != GST_CLOCK_TIME_NONE) {
1288           t = GST_TIME_AS_MSECONDS (timeout);
1289         } else {
1290           t = -1;
1291         }
1292
1293         res =
1294             poll ((struct pollfd *) set->active_fds->data,
1295             set->active_fds->len, t);
1296 #else
1297         g_assert_not_reached ();
1298         errno = ENOSYS;
1299 #endif
1300         break;
1301       }
1302       case GST_POLL_MODE_PSELECT:
1303 #ifndef HAVE_PSELECT
1304       {
1305         g_assert_not_reached ();
1306         errno = ENOSYS;
1307         break;
1308       }
1309 #endif
1310       case GST_POLL_MODE_SELECT:
1311       {
1312 #ifndef G_OS_WIN32
1313         fd_set readfds;
1314         fd_set writefds;
1315         fd_set errorfds;
1316         gint max_fd;
1317
1318         max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1319
1320         if (mode == GST_POLL_MODE_SELECT) {
1321           struct timeval tv;
1322           struct timeval *tvptr;
1323
1324           if (timeout != GST_CLOCK_TIME_NONE) {
1325             GST_TIME_TO_TIMEVAL (timeout, tv);
1326             tvptr = &tv;
1327           } else {
1328             tvptr = NULL;
1329           }
1330
1331           GST_DEBUG ("Calling select");
1332           res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1333           GST_DEBUG ("After select, res:%d", res);
1334         } else {
1335 #ifdef HAVE_PSELECT
1336           struct timespec ts;
1337           struct timespec *tsptr;
1338
1339           if (timeout != GST_CLOCK_TIME_NONE) {
1340             GST_TIME_TO_TIMESPEC (timeout, ts);
1341             tsptr = &ts;
1342           } else {
1343             tsptr = NULL;
1344           }
1345
1346           GST_DEBUG ("Calling pselect");
1347           res =
1348               pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1349           GST_DEBUG ("After pselect, res:%d", res);
1350 #endif
1351         }
1352
1353         if (res >= 0) {
1354           fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1355         }
1356 #else /* G_OS_WIN32 */
1357         g_assert_not_reached ();
1358         errno = ENOSYS;
1359 #endif
1360         break;
1361       }
1362       case GST_POLL_MODE_WINDOWS:
1363       {
1364 #ifdef G_OS_WIN32
1365         gint ignore_count = set->active_fds_ignored->len;
1366         DWORD t, wait_ret;
1367
1368         if (G_LIKELY (ignore_count == 0)) {
1369           if (timeout != GST_CLOCK_TIME_NONE)
1370             t = GST_TIME_AS_MSECONDS (timeout);
1371           else
1372             t = INFINITE;
1373         } else {
1374           /* already one or more ignored fds, so we quickly sweep the others */
1375           t = 0;
1376         }
1377
1378         if (set->active_events->len != 0) {
1379           wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1380               (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1381         } else {
1382           wait_ret = WSA_WAIT_FAILED;
1383           WSASetLastError (WSA_INVALID_PARAMETER);
1384         }
1385
1386         if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1387           res = 0;
1388         } else if (wait_ret == WSA_WAIT_FAILED) {
1389           res = -1;
1390           errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1391         } else {
1392           /* the first entry is the wakeup event */
1393           if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1394             res = gst_poll_collect_winsock_events (set);
1395           } else {
1396             res = 1;            /* wakeup event */
1397           }
1398         }
1399 #else
1400         g_assert_not_reached ();
1401         errno = ENOSYS;
1402 #endif
1403         break;
1404       }
1405     }
1406
1407     if (!is_timer) {
1408       /* Applications needs to clear the control socket themselves for timer
1409        * polls.
1410        * For other polls, we need to clear the control socket. If there was only
1411        * one socket with activity and it was the control socket, we need to
1412        * restart */
1413       if (release_all_wakeup (set) > 0 && res == 1)
1414         restarting = TRUE;
1415     }
1416
1417     if (G_UNLIKELY (IS_FLUSHING (set))) {
1418       /* we got woken up and we are flushing, we need to stop */
1419       errno = EBUSY;
1420       res = -1;
1421       break;
1422     }
1423   } while (G_UNLIKELY (restarting));
1424
1425   DEC_WAITING (set);
1426
1427   return res;
1428
1429   /* ERRORS */
1430 already_waiting:
1431   {
1432     DEC_WAITING (set);
1433     errno = EPERM;
1434     return -1;
1435   }
1436 flushing:
1437   {
1438     DEC_WAITING (set);
1439     errno = EBUSY;
1440     return -1;
1441   }
1442 #ifdef G_OS_WIN32
1443 winsock_error:
1444   {
1445     g_mutex_unlock (set->lock);
1446     DEC_WAITING (set);
1447     return -1;
1448   }
1449 #endif
1450 }
1451
1452 /**
1453  * gst_poll_set_controllable:
1454  * @set: a #GstPoll.
1455  * @controllable: new controllable state.
1456  *
1457  * When @controllable is %TRUE, this function ensures that future calls to
1458  * gst_poll_wait() will be affected by gst_poll_restart() and
1459  * gst_poll_set_flushing().
1460  *
1461  * Returns: %TRUE if the controllability of @set could be updated.
1462  *
1463  * Since: 0.10.18
1464  */
1465 gboolean
1466 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1467 {
1468   g_return_val_if_fail (set != NULL, FALSE);
1469
1470   GST_LOG ("%p: controllable : %d", set, controllable);
1471
1472   set->controllable = controllable;
1473
1474   return TRUE;
1475 }
1476
1477 /**
1478  * gst_poll_restart:
1479  * @set: a #GstPoll.
1480  *
1481  * Restart any gst_poll_wait() that is in progress. This function is typically
1482  * used after adding or removing descriptors to @set.
1483  *
1484  * If @set is not controllable, then this call will have no effect.
1485  *
1486  * Since: 0.10.18
1487  */
1488 void
1489 gst_poll_restart (GstPoll * set)
1490 {
1491   g_return_if_fail (set != NULL);
1492
1493   if (set->controllable && GET_WAITING (set) > 0) {
1494     /* we are controllable and waiting, wake up the waiter. The socket will be
1495      * cleared by the _wait() thread and the poll will be restarted */
1496     raise_wakeup (set);
1497   }
1498 }
1499
1500 /**
1501  * gst_poll_set_flushing:
1502  * @set: a #GstPoll.
1503  * @flushing: new flushing state.
1504  *
1505  * When @flushing is %TRUE, this function ensures that current and future calls
1506  * to gst_poll_wait() will return -1, with errno set to EBUSY.
1507  *
1508  * Unsetting the flushing state will restore normal operation of @set.
1509  *
1510  * Since: 0.10.18
1511  */
1512 void
1513 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1514 {
1515   g_return_if_fail (set != NULL);
1516
1517   /* update the new state first */
1518   SET_FLUSHING (set, flushing);
1519
1520   if (flushing && set->controllable && GET_WAITING (set) > 0) {
1521     /* we are flushing, controllable and waiting, wake up the waiter. When we
1522      * stop the flushing operation we don't clear the wakeup fd here, this will
1523      * happen in the _wait() thread. */
1524     raise_wakeup (set);
1525   }
1526 }
1527
1528 /**
1529  * gst_poll_write_control:
1530  * @set: a #GstPoll.
1531  *
1532  * Write a byte to the control socket of the controllable @set.
1533  * This function is mostly useful for timer #GstPoll objects created with
1534  * gst_poll_new_timer(). 
1535  *
1536  * It will make any current and future gst_poll_wait() function return with
1537  * 1, meaning the control socket is set. After an equal amount of calls to
1538  * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1539  * block again until their timeout expired.
1540  *
1541  * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
1542  * byte could not be written.
1543  *
1544  * Since: 0.10.23
1545  */
1546 gboolean
1547 gst_poll_write_control (GstPoll * set)
1548 {
1549   gboolean res;
1550
1551   g_return_val_if_fail (set != NULL, FALSE);
1552   g_return_val_if_fail (set->timer, FALSE);
1553
1554   res = raise_wakeup (set);
1555
1556   return res;
1557 }
1558
1559 /**
1560  * gst_poll_read_control:
1561  * @set: a #GstPoll.
1562  *
1563  * Read a byte from the control socket of the controllable @set.
1564  * This function is mostly useful for timer #GstPoll objects created with
1565  * gst_poll_new_timer(). 
1566  *
1567  * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
1568  * was no byte to read.
1569  *
1570  * Since: 0.10.23
1571  */
1572 gboolean
1573 gst_poll_read_control (GstPoll * set)
1574 {
1575   gboolean res;
1576
1577   g_return_val_if_fail (set != NULL, FALSE);
1578   g_return_val_if_fail (set->timer, FALSE);
1579
1580   res = release_wakeup (set);
1581
1582   return res;
1583 }