2 * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4 * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5 * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
7 * gstpoll.c: File descriptor set
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Library General Public
11 * License as published by the Free Software Foundation; either
12 * version 2 of the License, or (at your option) any later version.
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Library General Public License for more details.
19 * You should have received a copy of the GNU Library General Public
20 * License along with this library; if not, write to the
21 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22 * Boston, MA 02110-1301, USA.
26 * @short_description: Keep track of file descriptors and make it possible
27 * to wait on them in a cancellable way
29 * A #GstPoll keeps track of file descriptors much like fd_set (used with
30 * select()) or a struct pollfd array (used with poll()). Once created with
31 * gst_poll_new(), the set can be used to wait for file descriptors to be
32 * readable and/or writable. It is possible to make this wait be controlled
33 * by specifying %TRUE for the @controllable flag when creating the set (or
34 * later calling gst_poll_set_controllable()).
36 * New file descriptors are added to the set using gst_poll_add_fd(), and
37 * removed using gst_poll_remove_fd(). Controlling which file descriptors
38 * should be waited for to become readable and/or writable are done using
39 * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
41 * Use gst_poll_wait() to wait for the file descriptors to actually become
42 * readable and/or writable, or to timeout if no file descriptor is available
43 * in time. The wait can be controlled by calling gst_poll_restart() and
44 * gst_poll_set_flushing().
46 * Once the file descriptor set has been waited for, one can use
47 * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48 * gst_poll_fd_has_error() to see if it has generated an error,
49 * gst_poll_fd_can_read() to see if it is possible to read from the file
50 * descriptor, and gst_poll_fd_can_write() to see if it is possible to
59 #include "gst_private.h"
60 #include "glib-compat-private.h"
62 #include <sys/types.h>
77 #ifdef HAVE_SYS_POLL_H
84 #include <sys/socket.h>
87 /* OS/X needs this because of bad headers */
90 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
91 * so we prefer our own poll emulation.
93 #if defined(BROKEN_POLL)
99 #define GST_CAT_DEFAULT GST_CAT_POLL
102 typedef struct _WinsockFd WinsockFd;
108 WSANETWORKEVENTS events;
109 glong ignored_event_mask;
116 GST_POLL_MODE_SELECT,
117 GST_POLL_MODE_PSELECT,
120 GST_POLL_MODE_WINDOWS
128 /* array of fds, always written to and read from with lock */
130 /* array of active fds, only written to from the waiting thread with the
131 * lock and read from with the lock or without the lock from the waiting
137 GstPollFD control_read_fd;
138 GstPollFD control_write_fd;
140 GArray *active_fds_ignored;
142 GArray *active_events;
147 gboolean controllable;
148 volatile gint waiting;
149 volatile gint control_pending;
150 volatile gint flushing;
152 volatile gint rebuild;
155 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
157 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
159 #define IS_FLUSHING(s) (g_atomic_int_get(&(s)->flushing))
160 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
162 #define INC_WAITING(s) (g_atomic_int_add(&(s)->waiting, 1))
163 #define DEC_WAITING(s) (g_atomic_int_add(&(s)->waiting, -1))
164 #define GET_WAITING(s) (g_atomic_int_get(&(s)->waiting))
166 #define TEST_REBUILD(s) (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
167 #define MARK_REBUILD(s) (g_atomic_int_set(&(s)->rebuild, 1))
170 #define WAKE_EVENT(s) (write ((s)->control_write_fd.fd, "W", 1) == 1)
171 #define RELEASE_EVENT(s) (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1)
173 #define WAKE_EVENT(s) (SetEvent ((s)->wakeup_event), errno = GetLastError () == NO_ERROR ? 0 : EACCES, errno == 0 ? 1 : 0)
174 #define RELEASE_EVENT(s) (ResetEvent ((s)->wakeup_event))
177 /* the poll/select call is also performed on a control socket, that way
178 * we can send special commands to control it */
179 static inline gboolean
180 raise_wakeup (GstPoll * set)
182 gboolean result = TRUE;
184 if (g_atomic_int_add (&set->control_pending, 1) == 0) {
185 /* raise when nothing pending */
186 GST_LOG ("%p: raise", set);
187 result = WAKE_EVENT (set);
192 /* note how bad things can happen when the 2 threads both raise and release the
193 * wakeup. This is however not a problem because you must always pair a raise
195 static inline gboolean
196 release_wakeup (GstPoll * set)
198 gboolean result = TRUE;
200 if (g_atomic_int_dec_and_test (&set->control_pending)) {
201 GST_LOG ("%p: release", set);
202 result = RELEASE_EVENT (set);
208 release_all_wakeup (GstPoll * set)
213 if (!(old = g_atomic_int_get (&set->control_pending)))
214 /* nothing pending, just exit */
217 /* try to remove all pending control messages */
218 if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
219 /* we managed to remove all messages, read the control socket */
220 if (RELEASE_EVENT (set))
223 /* retry again until we read it successfully */
224 g_atomic_int_add (&set->control_pending, 1);
231 find_index (GArray * array, GstPollFD * fd)
240 /* start by assuming the index found in the fd is still valid */
241 if (fd->idx >= 0 && fd->idx < array->len) {
243 ifd = &g_array_index (array, struct pollfd, fd->idx);
245 ifd = &g_array_index (array, WinsockFd, fd->idx);
248 if (ifd->fd == fd->fd) {
253 /* the pollfd array has changed and we need to lookup the fd again */
254 for (i = 0; i < array->len; i++) {
256 ifd = &g_array_index (array, struct pollfd, i);
258 ifd = &g_array_index (array, WinsockFd, i);
261 if (ifd->fd == fd->fd) {
271 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
272 /* check if all file descriptors will fit in an fd_set */
274 selectable_fds (GstPoll * set)
278 g_mutex_lock (&set->lock);
279 for (i = 0; i < set->fds->len; i++) {
280 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
282 if (pfd->fd >= FD_SETSIZE)
285 g_mutex_unlock (&set->lock);
291 g_mutex_unlock (&set->lock);
296 /* check if the timeout will convert to a timeout value used for poll()
297 * without a loss of precision
300 pollable_timeout (GstClockTime timeout)
302 if (timeout == GST_CLOCK_TIME_NONE)
305 /* not a nice multiple of milliseconds */
306 if (timeout % 1000000)
314 choose_mode (GstPoll * set, GstClockTime timeout)
318 if (set->mode == GST_POLL_MODE_AUTO) {
320 mode = GST_POLL_MODE_PPOLL;
321 #elif defined(HAVE_POLL)
322 if (!selectable_fds (set) || pollable_timeout (timeout)) {
323 mode = GST_POLL_MODE_POLL;
326 mode = GST_POLL_MODE_PSELECT;
328 mode = GST_POLL_MODE_SELECT;
331 #elif defined(HAVE_PSELECT)
332 mode = GST_POLL_MODE_PSELECT;
334 mode = GST_POLL_MODE_SELECT;
344 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
354 g_mutex_lock (&set->lock);
356 for (i = 0; i < set->active_fds->len; i++) {
357 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
359 if (pfd->fd < FD_SETSIZE) {
360 if (pfd->events & POLLIN)
361 FD_SET (pfd->fd, readfds);
362 if (pfd->events & POLLOUT)
363 FD_SET (pfd->fd, writefds);
365 FD_SET (pfd->fd, errorfds);
366 if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
371 g_mutex_unlock (&set->lock);
377 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
382 g_mutex_lock (&set->lock);
384 for (i = 0; i < set->active_fds->len; i++) {
385 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
387 if (pfd->fd < FD_SETSIZE) {
389 if (FD_ISSET (pfd->fd, readfds))
390 pfd->revents |= POLLIN;
391 if (FD_ISSET (pfd->fd, writefds))
392 pfd->revents |= POLLOUT;
393 if (FD_ISSET (pfd->fd, errorfds))
394 pfd->revents |= POLLERR;
398 g_mutex_unlock (&set->lock);
400 #else /* G_OS_WIN32 */
402 * Translate errors thrown by the Winsock API used by GstPoll:
403 * WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
406 gst_poll_winsock_error_to_errno (DWORD last_error)
408 switch (last_error) {
409 case WSA_INVALID_HANDLE:
414 case WSA_NOT_ENOUGH_MEMORY:
418 * Anything else, including:
419 * WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
428 gst_poll_free_winsock_event (GstPoll * set, gint idx)
430 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
431 HANDLE event = g_array_index (set->events, HANDLE, idx);
433 WSAEventSelect (wfd->fd, event, 0);
438 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
443 wfd = &g_array_index (set->fds, WinsockFd, idx);
446 wfd->event_mask |= flags;
448 wfd->event_mask &= ~flags;
450 /* reset ignored state if the new mask doesn't overlap at all */
451 if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
452 wfd->ignored_event_mask = 0;
456 gst_poll_prepare_winsock_active_sets (GstPoll * set)
460 g_array_set_size (set->active_fds, 0);
461 g_array_set_size (set->active_fds_ignored, 0);
462 g_array_set_size (set->active_events, 0);
463 g_array_append_val (set->active_events, set->wakeup_event);
465 for (i = 0; i < set->fds->len; i++) {
466 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
467 HANDLE event = g_array_index (set->events, HANDLE, i);
469 if (wfd->ignored_event_mask == 0) {
472 g_array_append_val (set->active_fds, *wfd);
473 g_array_append_val (set->active_events, event);
475 ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
476 if (G_UNLIKELY (ret != 0)) {
477 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
481 g_array_append_val (set->active_fds_ignored, wfd);
489 gst_poll_collect_winsock_events (GstPoll * set)
494 * We need to check which events are signaled, and call
495 * WSAEnumNetworkEvents for those that are, which resets
496 * the event and clears the internal network event records.
499 for (i = 0; i < set->active_fds->len; i++) {
500 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
501 HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
504 wait_ret = WaitForSingleObject (event, 0);
505 if (wait_ret == WAIT_OBJECT_0) {
506 gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
508 if (G_UNLIKELY (enum_ret != 0)) {
510 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
516 /* clear any previously stored result */
517 memset (&wfd->events, 0, sizeof (wfd->events));
521 /* If all went well we also need to reset the ignored fds. */
523 res += set->active_fds_ignored->len;
525 for (i = 0; i < set->active_fds_ignored->len; i++) {
526 WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
528 wfd->ignored_event_mask = 0;
531 g_array_set_size (set->active_fds_ignored, 0);
539 * gst_poll_new: (skip)
540 * @controllable: whether it should be possible to control a wait.
542 * Create a new file descriptor set. If @controllable, it
543 * is possible to restart or flush a call to gst_poll_wait() with
544 * gst_poll_restart() and gst_poll_set_flushing() respectively.
546 * Free-function: gst_poll_free
548 * Returns: (transfer full) (nullable): a new #GstPoll, or %NULL in
549 * case of an error. Free with gst_poll_free().
552 gst_poll_new (gboolean controllable)
556 nset = g_slice_new0 (GstPoll);
557 GST_DEBUG ("%p: new controllable : %d", nset, controllable);
558 g_mutex_init (&nset->lock);
560 nset->mode = GST_POLL_MODE_AUTO;
561 nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
562 nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
563 nset->control_read_fd.fd = -1;
564 nset->control_write_fd.fd = -1;
566 gint control_sock[2];
568 if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
571 fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
572 fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
574 nset->control_read_fd.fd = control_sock[0];
575 nset->control_write_fd.fd = control_sock[1];
577 gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
578 gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
581 nset->mode = GST_POLL_MODE_WINDOWS;
582 nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
583 nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
584 nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
585 nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
586 nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
588 nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
591 /* ensure (re)build, though already sneakily set in non-windows case */
594 nset->controllable = controllable;
602 GST_WARNING ("%p: can't create socket pair !", nset);
603 gst_poll_free (nset);
610 * gst_poll_new_timer: (skip)
612 * Create a new poll object that can be used for scheduling cancellable
615 * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
616 * performed from different threads.
618 * Free-function: gst_poll_free
620 * Returns: (transfer full) (nullable): a new #GstPoll, or %NULL in
621 * case of an error. Free with gst_poll_free().
624 gst_poll_new_timer (void)
628 /* make a new controllable poll set */
629 if (!(poll = gst_poll_new (TRUE)))
641 * @set: (transfer full): a file descriptor set.
643 * Free a file descriptor set.
646 gst_poll_free (GstPoll * set)
648 g_return_if_fail (set != NULL);
650 GST_DEBUG ("%p: freeing", set);
653 if (set->control_write_fd.fd >= 0)
654 close (set->control_write_fd.fd);
655 if (set->control_read_fd.fd >= 0)
656 close (set->control_read_fd.fd);
658 CloseHandle (set->wakeup_event);
663 for (i = 0; i < set->events->len; i++)
664 gst_poll_free_winsock_event (set, i);
667 g_array_free (set->active_events, TRUE);
668 g_array_free (set->events, TRUE);
669 g_array_free (set->active_fds_ignored, TRUE);
672 g_array_free (set->active_fds, TRUE);
673 g_array_free (set->fds, TRUE);
674 g_mutex_clear (&set->lock);
675 g_slice_free (GstPoll, set);
679 * gst_poll_get_read_gpollfd:
683 * Get a GPollFD for the reading part of the control socket. This is useful when
684 * integrating with a GSource and GMainLoop.
687 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
689 g_return_if_fail (set != NULL);
690 g_return_if_fail (fd != NULL);
693 fd->fd = set->control_read_fd.fd;
695 #if GLIB_SIZEOF_VOID_P == 8
696 fd->fd = (gint64) set->wakeup_event;
698 fd->fd = (gint) set->wakeup_event;
701 fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
709 * Initializes @fd. Alternatively you can initialize it with
713 gst_poll_fd_init (GstPollFD * fd)
715 g_return_if_fail (fd != NULL);
722 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
726 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
728 idx = find_index (set->fds, fd);
734 nfd.events = POLLERR | POLLNVAL | POLLHUP;
737 g_array_append_val (set->fds, nfd);
739 fd->idx = set->fds->len - 1;
745 wfd.event_mask = FD_CLOSE;
746 memset (&wfd.events, 0, sizeof (wfd.events));
747 wfd.ignored_event_mask = 0;
748 event = WSACreateEvent ();
750 g_array_append_val (set->fds, wfd);
751 g_array_append_val (set->events, event);
753 fd->idx = set->fds->len - 1;
757 GST_WARNING ("%p: fd already added !", set);
765 * @set: a file descriptor set.
766 * @fd: a file descriptor.
768 * Add a file descriptor to the file descriptor set.
770 * Returns: %TRUE if the file descriptor was successfully added to the set.
773 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
777 g_return_val_if_fail (set != NULL, FALSE);
778 g_return_val_if_fail (fd != NULL, FALSE);
779 g_return_val_if_fail (fd->fd >= 0, FALSE);
781 g_mutex_lock (&set->lock);
783 ret = gst_poll_add_fd_unlocked (set, fd);
785 g_mutex_unlock (&set->lock);
791 * gst_poll_remove_fd:
792 * @set: a file descriptor set.
793 * @fd: a file descriptor.
795 * Remove a file descriptor from the file descriptor set.
797 * Returns: %TRUE if the file descriptor was successfully removed from the set.
800 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
804 g_return_val_if_fail (set != NULL, FALSE);
805 g_return_val_if_fail (fd != NULL, FALSE);
806 g_return_val_if_fail (fd->fd >= 0, FALSE);
809 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
811 g_mutex_lock (&set->lock);
813 /* get the index, -1 is an fd that is not added */
814 idx = find_index (set->fds, fd);
817 gst_poll_free_winsock_event (set, idx);
818 g_array_remove_index_fast (set->events, idx);
821 /* remove the fd at index, we use _remove_index_fast, which copies the last
822 * element of the array to the freed index */
823 g_array_remove_index_fast (set->fds, idx);
825 /* mark fd as removed by setting the index to -1 */
829 GST_WARNING ("%p: couldn't find fd !", set);
832 g_mutex_unlock (&set->lock);
838 * gst_poll_fd_ctl_write:
839 * @set: a file descriptor set.
840 * @fd: a file descriptor.
841 * @active: a new status.
843 * Control whether the descriptor @fd in @set will be monitored for
846 * Returns: %TRUE if the descriptor was successfully updated.
849 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
853 g_return_val_if_fail (set != NULL, FALSE);
854 g_return_val_if_fail (fd != NULL, FALSE);
855 g_return_val_if_fail (fd->fd >= 0, FALSE);
857 GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
858 fd->fd, fd->idx, active);
860 g_mutex_lock (&set->lock);
862 idx = find_index (set->fds, fd);
865 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
868 pfd->events |= POLLOUT;
870 pfd->events &= ~POLLOUT;
872 GST_LOG ("%p: pfd->events now %d (POLLOUT:%d)", set, pfd->events, POLLOUT);
874 gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
879 GST_WARNING ("%p: couldn't find fd !", set);
882 g_mutex_unlock (&set->lock);
888 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
892 GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
893 fd->fd, fd->idx, active);
895 idx = find_index (set->fds, fd);
899 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
902 pfd->events |= (POLLIN | POLLPRI);
904 pfd->events &= ~(POLLIN | POLLPRI);
906 gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
910 GST_WARNING ("%p: couldn't find fd !", set);
917 * gst_poll_fd_ctl_read:
918 * @set: a file descriptor set.
919 * @fd: a file descriptor.
920 * @active: a new status.
922 * Control whether the descriptor @fd in @set will be monitored for
925 * Returns: %TRUE if the descriptor was successfully updated.
928 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
932 g_return_val_if_fail (set != NULL, FALSE);
933 g_return_val_if_fail (fd != NULL, FALSE);
934 g_return_val_if_fail (fd->fd >= 0, FALSE);
936 g_mutex_lock (&set->lock);
938 ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
940 g_mutex_unlock (&set->lock);
946 * gst_poll_fd_ignored:
947 * @set: a file descriptor set.
948 * @fd: a file descriptor.
950 * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
951 * the same result for @fd as last time. This function must be called if no
952 * operation (read/write/recv/send/etc.) will be performed on @fd before
953 * the next call to gst_poll_wait().
955 * The reason why this is needed is because the underlying implementation
956 * might not allow querying the fd more than once between calls to one of
957 * the re-enabling operations.
960 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
965 g_return_if_fail (set != NULL);
966 g_return_if_fail (fd != NULL);
967 g_return_if_fail (fd->fd >= 0);
969 g_mutex_lock (&set->lock);
971 idx = find_index (set->fds, fd);
973 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
975 wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
979 g_mutex_unlock (&set->lock);
984 * gst_poll_fd_has_closed:
985 * @set: a file descriptor set.
986 * @fd: a file descriptor.
988 * Check if @fd in @set has closed the connection.
990 * Returns: %TRUE if the connection was closed.
993 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
995 gboolean res = FALSE;
998 g_return_val_if_fail (set != NULL, FALSE);
999 g_return_val_if_fail (fd != NULL, FALSE);
1000 g_return_val_if_fail (fd->fd >= 0, FALSE);
1002 g_mutex_lock (&((GstPoll *) set)->lock);
1004 idx = find_index (set->active_fds, fd);
1007 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1009 res = (pfd->revents & POLLHUP) != 0;
1011 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1013 res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1016 GST_WARNING ("%p: couldn't find fd !", set);
1018 g_mutex_unlock (&((GstPoll *) set)->lock);
1020 GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1026 * gst_poll_fd_has_error:
1027 * @set: a file descriptor set.
1028 * @fd: a file descriptor.
1030 * Check if @fd in @set has an error.
1032 * Returns: %TRUE if the descriptor has an error.
1035 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1037 gboolean res = FALSE;
1040 g_return_val_if_fail (set != NULL, FALSE);
1041 g_return_val_if_fail (fd != NULL, FALSE);
1042 g_return_val_if_fail (fd->fd >= 0, FALSE);
1044 g_mutex_lock (&((GstPoll *) set)->lock);
1046 idx = find_index (set->active_fds, fd);
1049 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1051 res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1053 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1055 res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1056 (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1057 (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1058 (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1059 (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1062 GST_WARNING ("%p: couldn't find fd !", set);
1064 g_mutex_unlock (&((GstPoll *) set)->lock);
1066 GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1072 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1074 gboolean res = FALSE;
1077 idx = find_index (set->active_fds, fd);
1080 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1082 res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1084 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1086 res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1089 GST_WARNING ("%p: couldn't find fd !", set);
1091 GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1097 * gst_poll_fd_can_read:
1098 * @set: a file descriptor set.
1099 * @fd: a file descriptor.
1101 * Check if @fd in @set has data to be read.
1103 * Returns: %TRUE if the descriptor has data to be read.
1106 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1108 gboolean res = FALSE;
1110 g_return_val_if_fail (set != NULL, FALSE);
1111 g_return_val_if_fail (fd != NULL, FALSE);
1112 g_return_val_if_fail (fd->fd >= 0, FALSE);
1114 g_mutex_lock (&((GstPoll *) set)->lock);
1116 res = gst_poll_fd_can_read_unlocked (set, fd);
1118 g_mutex_unlock (&((GstPoll *) set)->lock);
1124 * gst_poll_fd_can_write:
1125 * @set: a file descriptor set.
1126 * @fd: a file descriptor.
1128 * Check if @fd in @set can be used for writing.
1130 * Returns: %TRUE if the descriptor can be used for writing.
1133 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1135 gboolean res = FALSE;
1138 g_return_val_if_fail (set != NULL, FALSE);
1139 g_return_val_if_fail (fd != NULL, FALSE);
1140 g_return_val_if_fail (fd->fd >= 0, FALSE);
1142 g_mutex_lock (&((GstPoll *) set)->lock);
1144 idx = find_index (set->active_fds, fd);
1147 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1149 res = (pfd->revents & POLLOUT) != 0;
1151 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1153 res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1156 GST_WARNING ("%p: couldn't find fd !", set);
1158 g_mutex_unlock (&((GstPoll *) set)->lock);
1160 GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1168 * @timeout: a timeout in nanoseconds.
1170 * Wait for activity on the file descriptors in @set. This function waits up to
1171 * the specified @timeout. A timeout of #GST_CLOCK_TIME_NONE waits forever.
1173 * For #GstPoll objects created with gst_poll_new(), this function can only be
1174 * called from a single thread at a time. If called from multiple threads,
1175 * -1 will be returned with errno set to EPERM.
1177 * This is not true for timer #GstPoll objects created with
1178 * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1181 * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1182 * activity was detected after @timeout. If an error occurs, -1 is returned
1186 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1188 gboolean restarting;
1193 g_return_val_if_fail (set != NULL, -1);
1195 GST_DEBUG ("%p: timeout :%" GST_TIME_FORMAT, set, GST_TIME_ARGS (timeout));
1197 is_timer = set->timer;
1199 /* add one more waiter */
1200 old_waiting = INC_WAITING (set);
1202 /* we cannot wait from multiple threads unless we are a timer */
1203 if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1204 goto already_waiting;
1206 /* flushing, exit immediately */
1207 if (G_UNLIKELY (IS_FLUSHING (set)))
1216 mode = choose_mode (set, timeout);
1218 if (TEST_REBUILD (set)) {
1219 g_mutex_lock (&set->lock);
1221 g_array_set_size (set->active_fds, set->fds->len);
1222 memcpy (set->active_fds->data, set->fds->data,
1223 set->fds->len * sizeof (struct pollfd));
1225 if (!gst_poll_prepare_winsock_active_sets (set))
1228 g_mutex_unlock (&set->lock);
1232 case GST_POLL_MODE_AUTO:
1233 g_assert_not_reached ();
1235 case GST_POLL_MODE_PPOLL:
1239 struct timespec *tsptr;
1241 if (timeout != GST_CLOCK_TIME_NONE) {
1242 GST_TIME_TO_TIMESPEC (timeout, ts);
1249 ppoll ((struct pollfd *) set->active_fds->data,
1250 set->active_fds->len, tsptr, NULL);
1252 g_assert_not_reached ();
1257 case GST_POLL_MODE_POLL:
1262 if (timeout != GST_CLOCK_TIME_NONE) {
1263 t = GST_TIME_AS_MSECONDS (timeout);
1269 poll ((struct pollfd *) set->active_fds->data,
1270 set->active_fds->len, t);
1272 g_assert_not_reached ();
1277 case GST_POLL_MODE_PSELECT:
1278 #ifndef HAVE_PSELECT
1280 g_assert_not_reached ();
1285 case GST_POLL_MODE_SELECT:
1293 max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1295 if (mode == GST_POLL_MODE_SELECT) {
1297 struct timeval *tvptr;
1299 if (timeout != GST_CLOCK_TIME_NONE) {
1300 GST_TIME_TO_TIMEVAL (timeout, tv);
1306 GST_DEBUG ("%p: Calling select", set);
1307 res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1308 GST_DEBUG ("%p: After select, res:%d", set, res);
1312 struct timespec *tsptr;
1314 if (timeout != GST_CLOCK_TIME_NONE) {
1315 GST_TIME_TO_TIMESPEC (timeout, ts);
1321 GST_DEBUG ("%p: Calling pselect", set);
1323 pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1324 GST_DEBUG ("%p: After pselect, res:%d", set, res);
1329 fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1331 #else /* G_OS_WIN32 */
1332 g_assert_not_reached ();
1337 case GST_POLL_MODE_WINDOWS:
1340 gint ignore_count = set->active_fds_ignored->len;
1343 if (G_LIKELY (ignore_count == 0)) {
1344 if (timeout != GST_CLOCK_TIME_NONE)
1345 t = GST_TIME_AS_MSECONDS (timeout);
1349 /* already one or more ignored fds, so we quickly sweep the others */
1353 if (set->active_events->len != 0) {
1354 wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1355 (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1357 wait_ret = WSA_WAIT_FAILED;
1358 WSASetLastError (WSA_INVALID_PARAMETER);
1361 if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1363 } else if (wait_ret == WSA_WAIT_FAILED) {
1365 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1367 /* the first entry is the wakeup event */
1368 if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1369 res = gst_poll_collect_winsock_events (set);
1371 res = 1; /* wakeup event */
1375 g_assert_not_reached ();
1383 /* Applications needs to clear the control socket themselves for timer
1385 * For other polls, we need to clear the control socket. If there was only
1386 * one socket with activity and it was the control socket, we need to
1388 if (release_all_wakeup (set) > 0 && res == 1)
1392 /* we got woken up and we are flushing, we need to stop */
1393 if (G_UNLIKELY (IS_FLUSHING (set)))
1396 } while (G_UNLIKELY (restarting));
1405 GST_LOG ("%p: we are already waiting", set);
1412 GST_LOG ("%p: we are flushing", set);
1420 GST_LOG ("%p: winsock error", set);
1421 g_mutex_unlock (&set->lock);
1429 * gst_poll_set_controllable:
1431 * @controllable: new controllable state.
1433 * When @controllable is %TRUE, this function ensures that future calls to
1434 * gst_poll_wait() will be affected by gst_poll_restart() and
1435 * gst_poll_set_flushing().
1437 * Returns: %TRUE if the controllability of @set could be updated.
1440 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1442 g_return_val_if_fail (set != NULL, FALSE);
1444 GST_LOG ("%p: controllable : %d", set, controllable);
1446 set->controllable = controllable;
1455 * Restart any gst_poll_wait() that is in progress. This function is typically
1456 * used after adding or removing descriptors to @set.
1458 * If @set is not controllable, then this call will have no effect.
1461 gst_poll_restart (GstPoll * set)
1463 g_return_if_fail (set != NULL);
1465 if (set->controllable && GET_WAITING (set) > 0) {
1466 /* we are controllable and waiting, wake up the waiter. The socket will be
1467 * cleared by the _wait() thread and the poll will be restarted */
1473 * gst_poll_set_flushing:
1475 * @flushing: new flushing state.
1477 * When @flushing is %TRUE, this function ensures that current and future calls
1478 * to gst_poll_wait() will return -1, with errno set to EBUSY.
1480 * Unsetting the flushing state will restore normal operation of @set.
1483 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1485 g_return_if_fail (set != NULL);
1487 GST_LOG ("%p: flushing: %d", set, flushing);
1489 /* update the new state first */
1490 SET_FLUSHING (set, flushing);
1492 if (flushing && set->controllable && GET_WAITING (set) > 0) {
1493 /* we are flushing, controllable and waiting, wake up the waiter. When we
1494 * stop the flushing operation we don't clear the wakeup fd here, this will
1495 * happen in the _wait() thread. */
1501 * gst_poll_write_control:
1504 * Write a byte to the control socket of the controllable @set.
1505 * This function is mostly useful for timer #GstPoll objects created with
1506 * gst_poll_new_timer().
1508 * It will make any current and future gst_poll_wait() function return with
1509 * 1, meaning the control socket is set. After an equal amount of calls to
1510 * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1511 * block again until their timeout expired.
1513 * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
1514 * byte could not be written.
1517 gst_poll_write_control (GstPoll * set)
1521 g_return_val_if_fail (set != NULL, FALSE);
1522 g_return_val_if_fail (set->timer, FALSE);
1524 res = raise_wakeup (set);
1530 * gst_poll_read_control:
1533 * Read a byte from the control socket of the controllable @set.
1534 * This function is mostly useful for timer #GstPoll objects created with
1535 * gst_poll_new_timer().
1537 * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
1538 * was no byte to read.
1541 gst_poll_read_control (GstPoll * set)
1545 g_return_val_if_fail (set != NULL, FALSE);
1546 g_return_val_if_fail (set->timer, FALSE);
1548 res = release_wakeup (set);