2 * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4 * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5 * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
7 * gstpoll.c: File descriptor set
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Library General Public
11 * License as published by the Free Software Foundation; either
12 * version 2 of the License, or (at your option) any later version.
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Library General Public License for more details.
19 * You should have received a copy of the GNU Library General Public
20 * License along with this library; if not, write to the
21 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22 * Boston, MA 02111-1307, USA.
26 * @short_description: Keep track of file descriptors and make it possible
27 * to wait on them in a cancelable way
29 * A #GstPoll keeps track of file descriptors much like fd_set (used with
30 * select()) or a struct pollfd array (used with poll()). Once created with
31 * gst_poll_new(), the set can be used to wait for file descriptors to be
32 * readable and/or writeable. It is possible to make this wait be controlled
33 * by specifying %TRUE for the @controllable flag when creating the set (or
34 * later calling gst_poll_set_controllable()).
36 * New file descriptors are added to the set using gst_poll_add_fd(), and
37 * removed using gst_poll_remove_fd(). Controlling which file descriptors
38 * should be waited for to become readable and/or writeable are done using
39 * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
41 * Use gst_poll_wait() to wait for the file descriptors to actually become
42 * readable and/or writeable, or to timeout if no file descriptor is available
43 * in time. The wait can be controlled by calling gst_poll_restart() and
44 * gst_poll_set_flushing().
46 * Once the file descriptor set has been waited for, one can use
47 * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48 * gst_poll_fd_has_error() to see if it has generated an error,
49 * gst_poll_fd_can_read() to see if it is possible to read from the file
50 * descriptor, and gst_poll_fd_can_write() to see if it is possible to
59 #include "gst_private.h"
61 #include <sys/types.h>
74 #define EINPROGRESS WSAEINPROGRESS
79 #include <sys/socket.h>
82 /* OS/X needs this because of bad headers */
85 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
86 * so we prefer our own poll emulation.
88 #if defined(BROKEN_POLL)
94 #define GST_CAT_DEFAULT GST_CAT_POLL
97 typedef struct _WinsockFd WinsockFd;
103 WSANETWORKEVENTS events;
104 glong ignored_event_mask;
111 GST_POLL_MODE_SELECT,
112 GST_POLL_MODE_PSELECT,
115 GST_POLL_MODE_WINDOWS
123 /* array of fds, always written to and read from with lock */
125 /* array of active fds, only written to from the waiting thread with the
126 * lock and read from with the lock or without the lock from the waiting
132 GstPollFD control_read_fd;
133 GstPollFD control_write_fd;
135 GArray *active_fds_ignored;
137 GArray *active_events;
142 gboolean controllable;
143 volatile gint waiting;
144 volatile gint control_pending;
145 volatile gint flushing;
147 volatile gint rebuild;
150 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
152 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
154 #define IS_FLUSHING(s) (g_atomic_int_get(&(s)->flushing))
155 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
157 #define INC_WAITING(s) (g_atomic_int_exchange_and_add(&(s)->waiting, 1))
158 #define DEC_WAITING(s) (g_atomic_int_exchange_and_add(&(s)->waiting, -1))
159 #define GET_WAITING(s) (g_atomic_int_get(&(s)->waiting))
161 #define TEST_REBUILD(s) (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
162 #define MARK_REBUILD(s) (g_atomic_int_set(&(s)->rebuild, 1))
165 #define WAKE_EVENT(s) (write ((s)->control_write_fd.fd, "W", 1) == 1)
166 #define RELEASE_EVENT(s) (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1)
168 #define WAKE_EVENT(s) (SetEvent ((s)->wakeup_event))
169 #define RELEASE_EVENT(s) (ResetEvent ((s)->wakeup_event))
172 /* the poll/select call is also performed on a control socket, that way
173 * we can send special commands to control it */
174 static inline gboolean
175 raise_wakeup (GstPoll * set)
177 gboolean result = TRUE;
179 if (g_atomic_int_exchange_and_add (&set->control_pending, 1) == 0) {
180 /* raise when nothing pending */
181 result = WAKE_EVENT (set);
186 /* note how bad things can happen when the 2 threads both raise and release the
187 * wakeup. This is however not a problem because you must always pair a raise
189 static inline gboolean
190 release_wakeup (GstPoll * set)
192 gboolean result = TRUE;
194 if (g_atomic_int_dec_and_test (&set->control_pending)) {
195 result = RELEASE_EVENT (set);
201 release_all_wakeup (GstPoll * set)
206 if (!(old = g_atomic_int_get (&set->control_pending)))
207 /* nothing pending, just exit */
210 /* try to remove all pending control messages */
211 if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
212 /* we managed to remove all messages, read the control socket */
213 (void) RELEASE_EVENT (set);
221 find_index (GArray * array, GstPollFD * fd)
230 /* start by assuming the index found in the fd is still valid */
231 if (fd->idx >= 0 && fd->idx < array->len) {
233 ifd = &g_array_index (array, struct pollfd, fd->idx);
235 ifd = &g_array_index (array, WinsockFd, fd->idx);
238 if (ifd->fd == fd->fd) {
243 /* the pollfd array has changed and we need to lookup the fd again */
244 for (i = 0; i < array->len; i++) {
246 ifd = &g_array_index (array, struct pollfd, i);
248 ifd = &g_array_index (array, WinsockFd, i);
251 if (ifd->fd == fd->fd) {
261 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
262 /* check if all file descriptors will fit in an fd_set */
264 selectable_fds (const GstPoll * set)
268 g_mutex_lock (set->lock);
269 for (i = 0; i < set->fds->len; i++) {
270 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
272 if (pfd->fd >= FD_SETSIZE)
275 g_mutex_unlock (set->lock);
281 g_mutex_unlock (set->lock);
286 /* check if the timeout will convert to a timeout value used for poll()
287 * without a loss of precision
290 pollable_timeout (GstClockTime timeout)
292 if (timeout == GST_CLOCK_TIME_NONE)
295 /* not a nice multiple of milliseconds */
296 if (timeout % 1000000)
304 choose_mode (const GstPoll * set, GstClockTime timeout)
308 if (set->mode == GST_POLL_MODE_AUTO) {
310 mode = GST_POLL_MODE_PPOLL;
311 #elif defined(HAVE_POLL)
312 if (!selectable_fds (set) || pollable_timeout (timeout)) {
313 mode = GST_POLL_MODE_POLL;
316 mode = GST_POLL_MODE_PSELECT;
318 mode = GST_POLL_MODE_SELECT;
321 #elif defined(HAVE_PSELECT)
322 mode = GST_POLL_MODE_PSELECT;
324 mode = GST_POLL_MODE_SELECT;
334 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
344 g_mutex_lock (set->lock);
346 for (i = 0; i < set->active_fds->len; i++) {
347 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
349 if (pfd->fd < FD_SETSIZE) {
350 if (pfd->events & POLLIN)
351 FD_SET (pfd->fd, readfds);
352 if (pfd->events & POLLOUT)
353 FD_SET (pfd->fd, writefds);
355 FD_SET (pfd->fd, errorfds);
356 if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
361 g_mutex_unlock (set->lock);
367 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
372 g_mutex_lock (set->lock);
374 for (i = 0; i < set->active_fds->len; i++) {
375 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
377 if (pfd->fd < FD_SETSIZE) {
379 if (FD_ISSET (pfd->fd, readfds))
380 pfd->revents |= POLLIN;
381 if (FD_ISSET (pfd->fd, writefds))
382 pfd->revents |= POLLOUT;
383 if (FD_ISSET (pfd->fd, errorfds))
384 pfd->revents |= POLLERR;
388 g_mutex_unlock (set->lock);
390 #else /* G_OS_WIN32 */
392 * Translate errors thrown by the Winsock API used by GstPoll:
393 * WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
396 gst_poll_winsock_error_to_errno (DWORD last_error)
398 switch (last_error) {
399 case WSA_INVALID_HANDLE:
404 case WSA_NOT_ENOUGH_MEMORY:
408 * Anything else, including:
409 * WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
418 gst_poll_free_winsock_event (GstPoll * set, gint idx)
420 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
421 HANDLE event = g_array_index (set->events, HANDLE, idx);
423 WSAEventSelect (wfd->fd, event, 0);
428 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
433 wfd = &g_array_index (set->fds, WinsockFd, idx);
436 wfd->event_mask |= flags;
438 wfd->event_mask &= ~flags;
440 /* reset ignored state if the new mask doesn't overlap at all */
441 if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
442 wfd->ignored_event_mask = 0;
446 gst_poll_prepare_winsock_active_sets (GstPoll * set)
450 g_array_set_size (set->active_fds, 0);
451 g_array_set_size (set->active_fds_ignored, 0);
452 g_array_set_size (set->active_events, 0);
453 g_array_append_val (set->active_events, set->wakeup_event);
455 for (i = 0; i < set->fds->len; i++) {
456 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
457 HANDLE event = g_array_index (set->events, HANDLE, i);
459 if (wfd->ignored_event_mask == 0) {
462 g_array_append_val (set->active_fds, *wfd);
463 g_array_append_val (set->active_events, event);
465 ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
466 if (G_UNLIKELY (ret != 0)) {
467 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
471 g_array_append_val (set->active_fds_ignored, wfd);
479 gst_poll_collect_winsock_events (GstPoll * set)
484 * We need to check which events are signaled, and call
485 * WSAEnumNetworkEvents for those that are, which resets
486 * the event and clears the internal network event records.
489 for (i = 0; i < set->active_fds->len; i++) {
490 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
491 HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
494 wait_ret = WaitForSingleObject (event, 0);
495 if (wait_ret == WAIT_OBJECT_0) {
496 gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
498 if (G_UNLIKELY (enum_ret != 0)) {
500 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
506 /* clear any previously stored result */
507 memset (&wfd->events, 0, sizeof (wfd->events));
511 /* If all went well we also need to reset the ignored fds. */
513 res += set->active_fds_ignored->len;
515 for (i = 0; i < set->active_fds_ignored->len; i++) {
516 WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
518 wfd->ignored_event_mask = 0;
521 g_array_set_size (set->active_fds_ignored, 0);
530 * @controllable: whether it should be possible to control a wait.
532 * Create a new file descriptor set. If @controllable, it
533 * is possible to restart or flush a call to gst_poll_wait() with
534 * gst_poll_restart() and gst_poll_set_flushing() respectively.
536 * Free-function: gst_poll_free
538 * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
539 * Free with gst_poll_free().
544 gst_poll_new (gboolean controllable)
548 GST_DEBUG ("controllable : %d", controllable);
550 nset = g_slice_new0 (GstPoll);
551 nset->lock = g_mutex_new ();
553 nset->mode = GST_POLL_MODE_AUTO;
554 nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
555 nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
556 nset->control_read_fd.fd = -1;
557 nset->control_write_fd.fd = -1;
559 gint control_sock[2];
561 if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
564 fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
565 fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
567 nset->control_read_fd.fd = control_sock[0];
568 nset->control_write_fd.fd = control_sock[1];
570 gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
571 gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
574 nset->mode = GST_POLL_MODE_WINDOWS;
575 nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
576 nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
577 nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
578 nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
579 nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
581 nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
584 /* ensure (re)build, though already sneakily set in non-windows case */
587 nset->controllable = controllable;
595 GST_WARNING ("%p: can't create socket pair !", nset);
596 gst_poll_free (nset);
603 * gst_poll_new_timer:
605 * Create a new poll object that can be used for scheduling cancellable
608 * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
609 * performed from different threads.
611 * Free-function: gst_poll_free
613 * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
614 * Free with gst_poll_free().
619 gst_poll_new_timer (void)
623 /* make a new controllable poll set */
624 if (!(poll = gst_poll_new (TRUE)))
636 * @set: (transfer full): a file descriptor set.
638 * Free a file descriptor set.
643 gst_poll_free (GstPoll * set)
645 g_return_if_fail (set != NULL);
647 GST_DEBUG ("%p: freeing", set);
650 if (set->control_write_fd.fd >= 0)
651 close (set->control_write_fd.fd);
652 if (set->control_read_fd.fd >= 0)
653 close (set->control_read_fd.fd);
655 CloseHandle (set->wakeup_event);
660 for (i = 0; i < set->events->len; i++)
661 gst_poll_free_winsock_event (set, i);
664 g_array_free (set->active_events, TRUE);
665 g_array_free (set->events, TRUE);
666 g_array_free (set->active_fds_ignored, TRUE);
669 g_array_free (set->active_fds, TRUE);
670 g_array_free (set->fds, TRUE);
671 g_mutex_free (set->lock);
672 g_slice_free (GstPoll, set);
676 * gst_poll_get_read_gpollfd:
680 * Get a GPollFD for the reading part of the control socket. This is useful when
681 * integrating with a GSource and GMainLoop.
686 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
688 g_return_if_fail (set != NULL);
689 g_return_if_fail (fd != NULL);
692 fd->fd = set->control_read_fd.fd;
694 #if GLIB_SIZEOF_VOID_P == 8
695 fd->fd = (gint64) set->wakeup_event;
697 fd->fd = (gint) set->wakeup_event;
700 fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
708 * Initializes @fd. Alternatively you can initialize it with
714 gst_poll_fd_init (GstPollFD * fd)
716 g_return_if_fail (fd != NULL);
723 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
727 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
729 idx = find_index (set->fds, fd);
735 nfd.events = POLLERR | POLLNVAL | POLLHUP;
738 g_array_append_val (set->fds, nfd);
740 fd->idx = set->fds->len - 1;
746 wfd.event_mask = FD_CLOSE;
747 memset (&wfd.events, 0, sizeof (wfd.events));
748 wfd.ignored_event_mask = 0;
749 event = WSACreateEvent ();
751 g_array_append_val (set->fds, wfd);
752 g_array_append_val (set->events, event);
754 fd->idx = set->fds->len - 1;
758 GST_WARNING ("%p: couldn't find fd !", set);
766 * @set: a file descriptor set.
767 * @fd: a file descriptor.
769 * Add a file descriptor to the file descriptor set.
771 * Returns: %TRUE if the file descriptor was successfully added to the set.
776 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
780 g_return_val_if_fail (set != NULL, FALSE);
781 g_return_val_if_fail (fd != NULL, FALSE);
782 g_return_val_if_fail (fd->fd >= 0, FALSE);
784 g_mutex_lock (set->lock);
786 ret = gst_poll_add_fd_unlocked (set, fd);
788 g_mutex_unlock (set->lock);
794 * gst_poll_remove_fd:
795 * @set: a file descriptor set.
796 * @fd: a file descriptor.
798 * Remove a file descriptor from the file descriptor set.
800 * Returns: %TRUE if the file descriptor was successfully removed from the set.
805 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
809 g_return_val_if_fail (set != NULL, FALSE);
810 g_return_val_if_fail (fd != NULL, FALSE);
811 g_return_val_if_fail (fd->fd >= 0, FALSE);
814 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
816 g_mutex_lock (set->lock);
818 /* get the index, -1 is an fd that is not added */
819 idx = find_index (set->fds, fd);
822 gst_poll_free_winsock_event (set, idx);
823 g_array_remove_index_fast (set->events, idx);
826 /* remove the fd at index, we use _remove_index_fast, which copies the last
827 * element of the array to the freed index */
828 g_array_remove_index_fast (set->fds, idx);
830 /* mark fd as removed by setting the index to -1 */
834 GST_WARNING ("%p: couldn't find fd !", set);
837 g_mutex_unlock (set->lock);
843 * gst_poll_fd_ctl_write:
844 * @set: a file descriptor set.
845 * @fd: a file descriptor.
846 * @active: a new status.
848 * Control whether the descriptor @fd in @set will be monitored for
851 * Returns: %TRUE if the descriptor was successfully updated.
856 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
860 g_return_val_if_fail (set != NULL, FALSE);
861 g_return_val_if_fail (fd != NULL, FALSE);
862 g_return_val_if_fail (fd->fd >= 0, FALSE);
864 GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
865 fd->fd, fd->idx, active);
867 g_mutex_lock (set->lock);
869 idx = find_index (set->fds, fd);
872 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
875 pfd->events |= POLLOUT;
877 pfd->events &= ~POLLOUT;
879 GST_LOG ("pfd->events now %d (POLLOUT:%d)", pfd->events, POLLOUT);
881 gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
886 GST_WARNING ("%p: couldn't find fd !", set);
889 g_mutex_unlock (set->lock);
895 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
899 GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
900 fd->fd, fd->idx, active);
902 idx = find_index (set->fds, fd);
906 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
909 pfd->events |= (POLLIN | POLLPRI);
911 pfd->events &= ~(POLLIN | POLLPRI);
913 gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
917 GST_WARNING ("%p: couldn't find fd !", set);
924 * gst_poll_fd_ctl_read:
925 * @set: a file descriptor set.
926 * @fd: a file descriptor.
927 * @active: a new status.
929 * Control whether the descriptor @fd in @set will be monitored for
932 * Returns: %TRUE if the descriptor was successfully updated.
937 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
941 g_return_val_if_fail (set != NULL, FALSE);
942 g_return_val_if_fail (fd != NULL, FALSE);
943 g_return_val_if_fail (fd->fd >= 0, FALSE);
945 g_mutex_lock (set->lock);
947 ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
949 g_mutex_unlock (set->lock);
955 * gst_poll_fd_ignored:
956 * @set: a file descriptor set.
957 * @fd: a file descriptor.
959 * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
960 * the same result for @fd as last time. This function must be called if no
961 * operation (read/write/recv/send/etc.) will be performed on @fd before
962 * the next call to gst_poll_wait().
964 * The reason why this is needed is because the underlying implementation
965 * might not allow querying the fd more than once between calls to one of
966 * the re-enabling operations.
971 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
976 g_return_if_fail (set != NULL);
977 g_return_if_fail (fd != NULL);
978 g_return_if_fail (fd->fd >= 0);
980 g_mutex_lock (set->lock);
982 idx = find_index (set->fds, fd);
984 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
986 wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
990 g_mutex_unlock (set->lock);
995 * gst_poll_fd_has_closed:
996 * @set: a file descriptor set.
997 * @fd: a file descriptor.
999 * Check if @fd in @set has closed the connection.
1001 * Returns: %TRUE if the connection was closed.
1006 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
1008 gboolean res = FALSE;
1011 g_return_val_if_fail (set != NULL, FALSE);
1012 g_return_val_if_fail (fd != NULL, FALSE);
1013 g_return_val_if_fail (fd->fd >= 0, FALSE);
1015 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1017 g_mutex_lock (set->lock);
1019 idx = find_index (set->active_fds, fd);
1022 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1024 res = (pfd->revents & POLLHUP) != 0;
1026 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1028 res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1031 GST_WARNING ("%p: couldn't find fd !", set);
1034 g_mutex_unlock (set->lock);
1040 * gst_poll_fd_has_error:
1041 * @set: a file descriptor set.
1042 * @fd: a file descriptor.
1044 * Check if @fd in @set has an error.
1046 * Returns: %TRUE if the descriptor has an error.
1051 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1053 gboolean res = FALSE;
1056 g_return_val_if_fail (set != NULL, FALSE);
1057 g_return_val_if_fail (fd != NULL, FALSE);
1058 g_return_val_if_fail (fd->fd >= 0, FALSE);
1060 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1062 g_mutex_lock (set->lock);
1064 idx = find_index (set->active_fds, fd);
1067 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1069 res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1071 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1073 res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1074 (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1075 (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1076 (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1077 (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1080 GST_WARNING ("%p: couldn't find fd !", set);
1083 g_mutex_unlock (set->lock);
1089 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1091 gboolean res = FALSE;
1094 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1096 idx = find_index (set->active_fds, fd);
1099 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1101 res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1103 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1105 res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1108 GST_WARNING ("%p: couldn't find fd !", set);
1115 * gst_poll_fd_can_read:
1116 * @set: a file descriptor set.
1117 * @fd: a file descriptor.
1119 * Check if @fd in @set has data to be read.
1121 * Returns: %TRUE if the descriptor has data to be read.
1126 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1128 gboolean res = FALSE;
1130 g_return_val_if_fail (set != NULL, FALSE);
1131 g_return_val_if_fail (fd != NULL, FALSE);
1132 g_return_val_if_fail (fd->fd >= 0, FALSE);
1134 g_mutex_lock (set->lock);
1136 res = gst_poll_fd_can_read_unlocked (set, fd);
1138 g_mutex_unlock (set->lock);
1144 * gst_poll_fd_can_write:
1145 * @set: a file descriptor set.
1146 * @fd: a file descriptor.
1148 * Check if @fd in @set can be used for writing.
1150 * Returns: %TRUE if the descriptor can be used for writing.
1155 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1157 gboolean res = FALSE;
1160 g_return_val_if_fail (set != NULL, FALSE);
1161 g_return_val_if_fail (fd != NULL, FALSE);
1162 g_return_val_if_fail (fd->fd >= 0, FALSE);
1164 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1166 g_mutex_lock (set->lock);
1168 idx = find_index (set->active_fds, fd);
1171 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1173 res = (pfd->revents & POLLOUT) != 0;
1175 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1177 res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1180 GST_WARNING ("%p: couldn't find fd !", set);
1183 g_mutex_unlock (set->lock);
1191 * @timeout: a timeout in nanoseconds.
1193 * Wait for activity on the file descriptors in @set. This function waits up to
1194 * the specified @timeout. A timeout of #GST_CLOCK_TIME_NONE waits forever.
1196 * For #GstPoll objects created with gst_poll_new(), this function can only be
1197 * called from a single thread at a time. If called from multiple threads,
1198 * -1 will be returned with errno set to EPERM.
1200 * This is not true for timer #GstPoll objects created with
1201 * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1204 * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1205 * activity was detected after @timeout. If an error occurs, -1 is returned
1211 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1213 gboolean restarting;
1218 g_return_val_if_fail (set != NULL, -1);
1220 GST_DEBUG ("timeout :%" GST_TIME_FORMAT, GST_TIME_ARGS (timeout));
1222 is_timer = set->timer;
1224 /* add one more waiter */
1225 old_waiting = INC_WAITING (set);
1227 /* we cannot wait from multiple threads unless we are a timer */
1228 if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1229 goto already_waiting;
1231 /* flushing, exit immediatly */
1232 if (G_UNLIKELY (IS_FLUSHING (set)))
1241 mode = choose_mode (set, timeout);
1243 if (TEST_REBUILD (set)) {
1244 g_mutex_lock (set->lock);
1246 g_array_set_size (set->active_fds, set->fds->len);
1247 memcpy (set->active_fds->data, set->fds->data,
1248 set->fds->len * sizeof (struct pollfd));
1250 if (!gst_poll_prepare_winsock_active_sets (set))
1253 g_mutex_unlock (set->lock);
1257 case GST_POLL_MODE_AUTO:
1258 g_assert_not_reached ();
1260 case GST_POLL_MODE_PPOLL:
1264 struct timespec *tsptr;
1266 if (timeout != GST_CLOCK_TIME_NONE) {
1267 GST_TIME_TO_TIMESPEC (timeout, ts);
1274 ppoll ((struct pollfd *) set->active_fds->data,
1275 set->active_fds->len, tsptr, NULL);
1277 g_assert_not_reached ();
1282 case GST_POLL_MODE_POLL:
1287 if (timeout != GST_CLOCK_TIME_NONE) {
1288 t = GST_TIME_AS_MSECONDS (timeout);
1294 poll ((struct pollfd *) set->active_fds->data,
1295 set->active_fds->len, t);
1297 g_assert_not_reached ();
1302 case GST_POLL_MODE_PSELECT:
1303 #ifndef HAVE_PSELECT
1305 g_assert_not_reached ();
1310 case GST_POLL_MODE_SELECT:
1318 max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1320 if (mode == GST_POLL_MODE_SELECT) {
1322 struct timeval *tvptr;
1324 if (timeout != GST_CLOCK_TIME_NONE) {
1325 GST_TIME_TO_TIMEVAL (timeout, tv);
1331 GST_DEBUG ("Calling select");
1332 res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1333 GST_DEBUG ("After select, res:%d", res);
1337 struct timespec *tsptr;
1339 if (timeout != GST_CLOCK_TIME_NONE) {
1340 GST_TIME_TO_TIMESPEC (timeout, ts);
1346 GST_DEBUG ("Calling pselect");
1348 pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1349 GST_DEBUG ("After pselect, res:%d", res);
1354 fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1356 #else /* G_OS_WIN32 */
1357 g_assert_not_reached ();
1362 case GST_POLL_MODE_WINDOWS:
1365 gint ignore_count = set->active_fds_ignored->len;
1368 if (G_LIKELY (ignore_count == 0)) {
1369 if (timeout != GST_CLOCK_TIME_NONE)
1370 t = GST_TIME_AS_MSECONDS (timeout);
1374 /* already one or more ignored fds, so we quickly sweep the others */
1378 if (set->active_events->len != 0) {
1379 wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1380 (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1382 wait_ret = WSA_WAIT_FAILED;
1383 WSASetLastError (WSA_INVALID_PARAMETER);
1386 if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1388 } else if (wait_ret == WSA_WAIT_FAILED) {
1390 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1392 /* the first entry is the wakeup event */
1393 if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1394 res = gst_poll_collect_winsock_events (set);
1396 res = 1; /* wakeup event */
1400 g_assert_not_reached ();
1408 /* Applications needs to clear the control socket themselves for timer
1410 * For other polls, we need to clear the control socket. If there was only
1411 * one socket with activity and it was the control socket, we need to
1413 if (release_all_wakeup (set) > 0 && res == 1)
1417 if (G_UNLIKELY (IS_FLUSHING (set))) {
1418 /* we got woken up and we are flushing, we need to stop */
1423 } while (G_UNLIKELY (restarting));
1445 g_mutex_unlock (set->lock);
1453 * gst_poll_set_controllable:
1455 * @controllable: new controllable state.
1457 * When @controllable is %TRUE, this function ensures that future calls to
1458 * gst_poll_wait() will be affected by gst_poll_restart() and
1459 * gst_poll_set_flushing().
1461 * Returns: %TRUE if the controllability of @set could be updated.
1466 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1468 g_return_val_if_fail (set != NULL, FALSE);
1470 GST_LOG ("%p: controllable : %d", set, controllable);
1472 set->controllable = controllable;
1481 * Restart any gst_poll_wait() that is in progress. This function is typically
1482 * used after adding or removing descriptors to @set.
1484 * If @set is not controllable, then this call will have no effect.
1489 gst_poll_restart (GstPoll * set)
1491 g_return_if_fail (set != NULL);
1493 if (set->controllable && GET_WAITING (set) > 0) {
1494 /* we are controllable and waiting, wake up the waiter. The socket will be
1495 * cleared by the _wait() thread and the poll will be restarted */
1501 * gst_poll_set_flushing:
1503 * @flushing: new flushing state.
1505 * When @flushing is %TRUE, this function ensures that current and future calls
1506 * to gst_poll_wait() will return -1, with errno set to EBUSY.
1508 * Unsetting the flushing state will restore normal operation of @set.
1513 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1515 g_return_if_fail (set != NULL);
1517 /* update the new state first */
1518 SET_FLUSHING (set, flushing);
1520 if (flushing && set->controllable && GET_WAITING (set) > 0) {
1521 /* we are flushing, controllable and waiting, wake up the waiter. When we
1522 * stop the flushing operation we don't clear the wakeup fd here, this will
1523 * happen in the _wait() thread. */
1529 * gst_poll_write_control:
1532 * Write a byte to the control socket of the controllable @set.
1533 * This function is mostly useful for timer #GstPoll objects created with
1534 * gst_poll_new_timer().
1536 * It will make any current and future gst_poll_wait() function return with
1537 * 1, meaning the control socket is set. After an equal amount of calls to
1538 * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1539 * block again until their timeout expired.
1541 * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
1542 * byte could not be written.
1547 gst_poll_write_control (GstPoll * set)
1551 g_return_val_if_fail (set != NULL, FALSE);
1552 g_return_val_if_fail (set->timer, FALSE);
1554 res = raise_wakeup (set);
1560 * gst_poll_read_control:
1563 * Read a byte from the control socket of the controllable @set.
1564 * This function is mostly useful for timer #GstPoll objects created with
1565 * gst_poll_new_timer().
1567 * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
1568 * was no byte to read.
1573 gst_poll_read_control (GstPoll * set)
1577 g_return_val_if_fail (set != NULL, FALSE);
1578 g_return_val_if_fail (set->timer, FALSE);
1580 res = release_wakeup (set);