2 * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4 * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5 * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
7 * gstpoll.c: File descriptor set
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Library General Public
11 * License as published by the Free Software Foundation; either
12 * version 2 of the License, or (at your option) any later version.
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Library General Public License for more details.
19 * You should have received a copy of the GNU Library General Public
20 * License along with this library; if not, write to the
21 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22 * Boston, MA 02110-1301, USA.
26 * @short_description: Keep track of file descriptors and make it possible
27 * to wait on them in a cancellable way
29 * A #GstPoll keeps track of file descriptors much like fd_set (used with
30 * select()) or a struct pollfd array (used with poll()). Once created with
31 * gst_poll_new(), the set can be used to wait for file descriptors to be
32 * readable and/or writable. It is possible to make this wait be controlled
33 * by specifying %TRUE for the @controllable flag when creating the set (or
34 * later calling gst_poll_set_controllable()).
36 * New file descriptors are added to the set using gst_poll_add_fd(), and
37 * removed using gst_poll_remove_fd(). Controlling which file descriptors
38 * should be waited for to become readable and/or writable are done using
39 * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
41 * Use gst_poll_wait() to wait for the file descriptors to actually become
42 * readable and/or writable, or to timeout if no file descriptor is available
43 * in time. The wait can be controlled by calling gst_poll_restart() and
44 * gst_poll_set_flushing().
46 * Once the file descriptor set has been waited for, one can use
47 * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48 * gst_poll_fd_has_error() to see if it has generated an error,
49 * gst_poll_fd_can_read() to see if it is possible to read from the file
50 * descriptor, and gst_poll_fd_can_write() to see if it is possible to
59 #include "gst_private.h"
60 #include "glib-compat-private.h"
62 #include <sys/types.h>
77 #ifdef HAVE_SYS_POLL_H
84 #include <sys/socket.h>
89 # define EWOULDBLOCK EAGAIN /* This is just to placate gcc */
91 #endif /* G_OS_WIN32 */
93 /* OS/X needs this because of bad headers */
96 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
97 * so we prefer our own poll emulation.
99 #if defined(BROKEN_POLL)
105 #define GST_CAT_DEFAULT GST_CAT_POLL
108 typedef struct _WinsockFd WinsockFd;
114 WSANETWORKEVENTS events;
115 glong ignored_event_mask;
122 GST_POLL_MODE_SELECT,
123 GST_POLL_MODE_PSELECT,
126 GST_POLL_MODE_WINDOWS
134 /* array of fds, always written to and read from with lock */
136 /* array of active fds, only written to from the waiting thread with the
137 * lock and read from with the lock or without the lock from the waiting
142 GstPollFD control_read_fd;
143 GstPollFD control_write_fd;
145 GArray *active_fds_ignored;
147 GArray *active_events;
152 gboolean controllable;
153 volatile gint waiting;
154 volatile gint control_pending;
155 volatile gint flushing;
157 volatile gint rebuild;
160 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
162 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
164 #define IS_FLUSHING(s) (g_atomic_int_get(&(s)->flushing))
165 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
167 #define INC_WAITING(s) (g_atomic_int_add(&(s)->waiting, 1))
168 #define DEC_WAITING(s) (g_atomic_int_add(&(s)->waiting, -1))
169 #define GET_WAITING(s) (g_atomic_int_get(&(s)->waiting))
171 #define TEST_REBUILD(s) (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
172 #define MARK_REBUILD(s) (g_atomic_int_set(&(s)->rebuild, 1))
177 wake_event (GstPoll * set)
180 while ((num_written = write (set->control_write_fd.fd, "W", 1)) != 1) {
181 if (num_written == -1 && errno != EAGAIN && errno != EINTR) {
182 g_critical ("%p: failed to wake event: %s", set, strerror (errno));
190 release_event (GstPoll * set)
192 gchar buf[1] = { '\0' };
194 while ((num_read = read (set->control_read_fd.fd, buf, 1)) != 1) {
195 if (num_read == -1 && errno != EAGAIN && errno != EINTR) {
196 g_critical ("%p: failed to release event: %s", set, strerror (errno));
206 format_last_error (gchar * buf, size_t buf_len)
208 DWORD flags = FORMAT_MESSAGE_FROM_SYSTEM;
212 id = GetLastError ();
213 FormatMessage (flags, src, id, lang, buf, (DWORD) buf_len, NULL);
218 wake_event (GstPoll * set)
222 if (!SetEvent (set->wakeup_event)) {
223 gchar msg[1024] = "<unknown>";
224 format_last_error (msg, sizeof (msg));
225 g_critical ("%p: failed to set wakup_event: %s", set, msg);
234 release_event (GstPoll * set)
239 if (status = WaitForSingleObject (set->wakeup_event, INFINITE)) {
240 const gchar *reason = "unknown";
241 gchar msg[1024] = "<unknown>";
244 reason = "WAIT_ABANDONED";
247 reason = "WAIT_TIMEOUT";
250 format_last_error (msg, sizeof (msg));
257 g_critical ("%p: failed to block on wakup_event: %s", set, reason);
262 if (!ResetEvent (set->wakeup_event)) {
263 gchar msg[1024] = "<unknown>";
264 format_last_error (msg, sizeof (msg));
265 g_critical ("%p: failed to reset wakup_event: %s", set, msg);
275 /* the poll/select call is also performed on a control socket, that way
276 * we can send special commands to control it */
277 static inline gboolean
278 raise_wakeup (GstPoll * set)
280 gboolean result = TRUE;
282 /* makes testing control_pending and WAKE_EVENT() atomic. */
283 g_mutex_lock (&set->lock);
285 if (set->control_pending == 0) {
286 /* raise when nothing pending */
287 GST_LOG ("%p: raise", set);
288 result = wake_event (set);
292 set->control_pending++;
295 g_mutex_unlock (&set->lock);
300 static inline gboolean
301 release_wakeup (GstPoll * set)
303 gboolean result = FALSE;
305 /* makes testing/modifying control_pending and RELEASE_EVENT() atomic. */
306 g_mutex_lock (&set->lock);
308 if (set->control_pending > 0) {
309 /* release, only if this was the last pending. */
310 if (set->control_pending == 1) {
311 GST_LOG ("%p: release", set);
312 result = release_event (set);
318 set->control_pending--;
324 g_mutex_unlock (&set->lock);
330 release_all_wakeup (GstPoll * set)
334 /* makes testing control_pending and RELEASE_EVENT() atomic. */
335 g_mutex_lock (&set->lock);
337 if ((old = set->control_pending) > 0) {
338 GST_LOG ("%p: releasing %d", set, old);
339 if (release_event (set)) {
340 set->control_pending = 0;
346 g_mutex_unlock (&set->lock);
352 find_index (GArray * array, GstPollFD * fd)
361 /* start by assuming the index found in the fd is still valid */
362 if (fd->idx >= 0 && fd->idx < array->len) {
364 ifd = &g_array_index (array, struct pollfd, fd->idx);
366 ifd = &g_array_index (array, WinsockFd, fd->idx);
369 if (ifd->fd == fd->fd) {
374 /* the pollfd array has changed and we need to lookup the fd again */
375 for (i = 0; i < array->len; i++) {
377 ifd = &g_array_index (array, struct pollfd, i);
379 ifd = &g_array_index (array, WinsockFd, i);
382 if (ifd->fd == fd->fd) {
392 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
393 /* check if all file descriptors will fit in an fd_set */
395 selectable_fds (GstPoll * set)
399 g_mutex_lock (&set->lock);
400 for (i = 0; i < set->fds->len; i++) {
401 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
403 if (pfd->fd >= FD_SETSIZE)
406 g_mutex_unlock (&set->lock);
412 g_mutex_unlock (&set->lock);
417 /* check if the timeout will convert to a timeout value used for poll()
418 * without a loss of precision
421 pollable_timeout (GstClockTime timeout)
423 if (timeout == GST_CLOCK_TIME_NONE)
426 /* not a nice multiple of milliseconds */
427 if (timeout % 1000000)
435 choose_mode (GstPoll * set, GstClockTime timeout)
439 if (set->mode == GST_POLL_MODE_AUTO) {
441 mode = GST_POLL_MODE_PPOLL;
442 #elif defined(HAVE_POLL)
443 if (!selectable_fds (set) || pollable_timeout (timeout)) {
444 mode = GST_POLL_MODE_POLL;
447 mode = GST_POLL_MODE_PSELECT;
449 mode = GST_POLL_MODE_SELECT;
452 #elif defined(HAVE_PSELECT)
453 mode = GST_POLL_MODE_PSELECT;
455 mode = GST_POLL_MODE_SELECT;
465 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
475 g_mutex_lock (&set->lock);
477 for (i = 0; i < set->active_fds->len; i++) {
478 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
480 if (pfd->fd < FD_SETSIZE) {
481 if (pfd->events & POLLIN)
482 FD_SET (pfd->fd, readfds);
483 if (pfd->events & POLLOUT)
484 FD_SET (pfd->fd, writefds);
486 FD_SET (pfd->fd, errorfds);
487 if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
492 g_mutex_unlock (&set->lock);
498 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
503 g_mutex_lock (&set->lock);
505 for (i = 0; i < set->active_fds->len; i++) {
506 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
508 if (pfd->fd < FD_SETSIZE) {
510 if (FD_ISSET (pfd->fd, readfds))
511 pfd->revents |= POLLIN;
512 if (FD_ISSET (pfd->fd, writefds))
513 pfd->revents |= POLLOUT;
514 if (FD_ISSET (pfd->fd, errorfds))
515 pfd->revents |= POLLERR;
519 g_mutex_unlock (&set->lock);
521 #else /* G_OS_WIN32 */
523 * Translate errors thrown by the Winsock API used by GstPoll:
524 * WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
527 gst_poll_winsock_error_to_errno (DWORD last_error)
529 switch (last_error) {
530 case WSA_INVALID_HANDLE:
535 case WSA_NOT_ENOUGH_MEMORY:
539 * Anything else, including:
540 * WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
549 gst_poll_free_winsock_event (GstPoll * set, gint idx)
551 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
552 HANDLE event = g_array_index (set->events, HANDLE, idx);
554 WSAEventSelect (wfd->fd, event, 0);
559 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
564 wfd = &g_array_index (set->fds, WinsockFd, idx);
567 wfd->event_mask |= flags;
569 wfd->event_mask &= ~flags;
571 /* reset ignored state if the new mask doesn't overlap at all */
572 if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
573 wfd->ignored_event_mask = 0;
577 gst_poll_prepare_winsock_active_sets (GstPoll * set)
581 g_array_set_size (set->active_fds, 0);
582 g_array_set_size (set->active_fds_ignored, 0);
583 g_array_set_size (set->active_events, 0);
584 g_array_append_val (set->active_events, set->wakeup_event);
586 for (i = 0; i < set->fds->len; i++) {
587 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
588 HANDLE event = g_array_index (set->events, HANDLE, i);
590 if (wfd->ignored_event_mask == 0) {
593 g_array_append_val (set->active_fds, *wfd);
594 g_array_append_val (set->active_events, event);
596 ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
597 if (G_UNLIKELY (ret != 0)) {
598 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
602 g_array_append_val (set->active_fds_ignored, wfd);
610 gst_poll_collect_winsock_events (GstPoll * set)
615 * We need to check which events are signaled, and call
616 * WSAEnumNetworkEvents for those that are, which resets
617 * the event and clears the internal network event records.
620 for (i = 0; i < set->active_fds->len; i++) {
621 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
622 HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
625 wait_ret = WaitForSingleObject (event, 0);
626 if (wait_ret == WAIT_OBJECT_0) {
627 gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
629 if (G_UNLIKELY (enum_ret != 0)) {
631 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
637 /* clear any previously stored result */
638 memset (&wfd->events, 0, sizeof (wfd->events));
642 /* If all went well we also need to reset the ignored fds. */
644 res += set->active_fds_ignored->len;
646 for (i = 0; i < set->active_fds_ignored->len; i++) {
647 WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
649 wfd->ignored_event_mask = 0;
652 g_array_set_size (set->active_fds_ignored, 0);
660 * gst_poll_new: (skip)
661 * @controllable: whether it should be possible to control a wait.
663 * Create a new file descriptor set. If @controllable, it
664 * is possible to restart or flush a call to gst_poll_wait() with
665 * gst_poll_restart() and gst_poll_set_flushing() respectively.
667 * Free-function: gst_poll_free
669 * Returns: (transfer full) (nullable): a new #GstPoll, or %NULL in
670 * case of an error. Free with gst_poll_free().
673 gst_poll_new (gboolean controllable)
677 nset = g_slice_new0 (GstPoll);
678 GST_DEBUG ("%p: new controllable : %d", nset, controllable);
679 g_mutex_init (&nset->lock);
681 nset->mode = GST_POLL_MODE_AUTO;
682 nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
683 nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
684 nset->control_read_fd.fd = -1;
685 nset->control_write_fd.fd = -1;
687 gint control_sock[2];
689 if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
692 nset->control_read_fd.fd = control_sock[0];
693 nset->control_write_fd.fd = control_sock[1];
695 gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
696 gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
699 nset->mode = GST_POLL_MODE_WINDOWS;
700 nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
701 nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
702 nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
703 nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
704 nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
706 nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
709 /* ensure (re)build, though already sneakily set in non-windows case */
712 nset->controllable = controllable;
713 nset->control_pending = 0;
721 GST_WARNING ("%p: can't create socket pair !", nset);
722 gst_poll_free (nset);
729 * gst_poll_new_timer: (skip)
731 * Create a new poll object that can be used for scheduling cancellable
734 * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
735 * performed from different threads.
737 * Free-function: gst_poll_free
739 * Returns: (transfer full) (nullable): a new #GstPoll, or %NULL in
740 * case of an error. Free with gst_poll_free().
743 gst_poll_new_timer (void)
747 /* make a new controllable poll set */
748 if (!(poll = gst_poll_new (TRUE)))
760 * @set: (transfer full): a file descriptor set.
762 * Free a file descriptor set.
765 gst_poll_free (GstPoll * set)
767 g_return_if_fail (set != NULL);
769 GST_DEBUG ("%p: freeing", set);
772 if (set->control_write_fd.fd >= 0)
773 close (set->control_write_fd.fd);
774 if (set->control_read_fd.fd >= 0)
775 close (set->control_read_fd.fd);
777 CloseHandle (set->wakeup_event);
782 for (i = 0; i < set->events->len; i++)
783 gst_poll_free_winsock_event (set, i);
786 g_array_free (set->active_events, TRUE);
787 g_array_free (set->events, TRUE);
788 g_array_free (set->active_fds_ignored, TRUE);
791 g_array_free (set->active_fds, TRUE);
792 g_array_free (set->fds, TRUE);
793 g_mutex_clear (&set->lock);
794 g_slice_free (GstPoll, set);
798 * gst_poll_get_read_gpollfd:
802 * Get a GPollFD for the reading part of the control socket. This is useful when
803 * integrating with a GSource and GMainLoop.
806 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
808 g_return_if_fail (set != NULL);
809 g_return_if_fail (fd != NULL);
812 fd->fd = set->control_read_fd.fd;
814 #if GLIB_SIZEOF_VOID_P == 8
815 fd->fd = (gint64) set->wakeup_event;
817 fd->fd = (gint) set->wakeup_event;
820 fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
828 * Initializes @fd. Alternatively you can initialize it with
832 gst_poll_fd_init (GstPollFD * fd)
834 g_return_if_fail (fd != NULL);
841 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
845 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
847 idx = find_index (set->fds, fd);
853 nfd.events = POLLERR | POLLNVAL | POLLHUP;
856 g_array_append_val (set->fds, nfd);
858 fd->idx = set->fds->len - 1;
864 wfd.event_mask = FD_CLOSE;
865 memset (&wfd.events, 0, sizeof (wfd.events));
866 wfd.ignored_event_mask = 0;
867 event = WSACreateEvent ();
869 g_array_append_val (set->fds, wfd);
870 g_array_append_val (set->events, event);
872 fd->idx = set->fds->len - 1;
876 GST_WARNING ("%p: fd already added !", set);
884 * @set: a file descriptor set.
885 * @fd: a file descriptor.
887 * Add a file descriptor to the file descriptor set.
889 * Returns: %TRUE if the file descriptor was successfully added to the set.
892 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
896 g_return_val_if_fail (set != NULL, FALSE);
897 g_return_val_if_fail (fd != NULL, FALSE);
898 g_return_val_if_fail (fd->fd >= 0, FALSE);
900 g_mutex_lock (&set->lock);
902 ret = gst_poll_add_fd_unlocked (set, fd);
904 g_mutex_unlock (&set->lock);
910 * gst_poll_remove_fd:
911 * @set: a file descriptor set.
912 * @fd: a file descriptor.
914 * Remove a file descriptor from the file descriptor set.
916 * Returns: %TRUE if the file descriptor was successfully removed from the set.
919 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
923 g_return_val_if_fail (set != NULL, FALSE);
924 g_return_val_if_fail (fd != NULL, FALSE);
925 g_return_val_if_fail (fd->fd >= 0, FALSE);
928 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
930 g_mutex_lock (&set->lock);
932 /* get the index, -1 is an fd that is not added */
933 idx = find_index (set->fds, fd);
936 gst_poll_free_winsock_event (set, idx);
937 g_array_remove_index_fast (set->events, idx);
940 /* remove the fd at index, we use _remove_index_fast, which copies the last
941 * element of the array to the freed index */
942 g_array_remove_index_fast (set->fds, idx);
944 /* mark fd as removed by setting the index to -1 */
948 GST_WARNING ("%p: couldn't find fd !", set);
951 g_mutex_unlock (&set->lock);
957 * gst_poll_fd_ctl_write:
958 * @set: a file descriptor set.
959 * @fd: a file descriptor.
960 * @active: a new status.
962 * Control whether the descriptor @fd in @set will be monitored for
965 * Returns: %TRUE if the descriptor was successfully updated.
968 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
972 g_return_val_if_fail (set != NULL, FALSE);
973 g_return_val_if_fail (fd != NULL, FALSE);
974 g_return_val_if_fail (fd->fd >= 0, FALSE);
976 GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
977 fd->fd, fd->idx, active);
979 g_mutex_lock (&set->lock);
981 idx = find_index (set->fds, fd);
984 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
987 pfd->events |= POLLOUT;
989 pfd->events &= ~POLLOUT;
991 GST_LOG ("%p: pfd->events now %d (POLLOUT:%d)", set, pfd->events, POLLOUT);
993 gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
998 GST_WARNING ("%p: couldn't find fd !", set);
1001 g_mutex_unlock (&set->lock);
1007 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
1011 GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
1012 fd->fd, fd->idx, active);
1014 idx = find_index (set->fds, fd);
1018 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
1021 pfd->events |= (POLLIN | POLLPRI);
1023 pfd->events &= ~(POLLIN | POLLPRI);
1025 gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
1029 GST_WARNING ("%p: couldn't find fd !", set);
1036 * gst_poll_fd_ctl_read:
1037 * @set: a file descriptor set.
1038 * @fd: a file descriptor.
1039 * @active: a new status.
1041 * Control whether the descriptor @fd in @set will be monitored for
1044 * Returns: %TRUE if the descriptor was successfully updated.
1047 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
1051 g_return_val_if_fail (set != NULL, FALSE);
1052 g_return_val_if_fail (fd != NULL, FALSE);
1053 g_return_val_if_fail (fd->fd >= 0, FALSE);
1055 g_mutex_lock (&set->lock);
1057 ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
1059 g_mutex_unlock (&set->lock);
1065 * gst_poll_fd_ignored:
1066 * @set: a file descriptor set.
1067 * @fd: a file descriptor.
1069 * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
1070 * the same result for @fd as last time. This function must be called if no
1071 * operation (read/write/recv/send/etc.) will be performed on @fd before
1072 * the next call to gst_poll_wait().
1074 * The reason why this is needed is because the underlying implementation
1075 * might not allow querying the fd more than once between calls to one of
1076 * the re-enabling operations.
1079 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
1084 g_return_if_fail (set != NULL);
1085 g_return_if_fail (fd != NULL);
1086 g_return_if_fail (fd->fd >= 0);
1088 g_mutex_lock (&set->lock);
1090 idx = find_index (set->fds, fd);
1092 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
1094 wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
1098 g_mutex_unlock (&set->lock);
1103 * gst_poll_fd_has_closed:
1104 * @set: a file descriptor set.
1105 * @fd: a file descriptor.
1107 * Check if @fd in @set has closed the connection.
1109 * Returns: %TRUE if the connection was closed.
1112 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
1114 gboolean res = FALSE;
1117 g_return_val_if_fail (set != NULL, FALSE);
1118 g_return_val_if_fail (fd != NULL, FALSE);
1119 g_return_val_if_fail (fd->fd >= 0, FALSE);
1121 g_mutex_lock (&((GstPoll *) set)->lock);
1123 idx = find_index (set->active_fds, fd);
1126 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1128 res = (pfd->revents & POLLHUP) != 0;
1130 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1132 res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1135 GST_WARNING ("%p: couldn't find fd !", set);
1137 g_mutex_unlock (&((GstPoll *) set)->lock);
1139 GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1145 * gst_poll_fd_has_error:
1146 * @set: a file descriptor set.
1147 * @fd: a file descriptor.
1149 * Check if @fd in @set has an error.
1151 * Returns: %TRUE if the descriptor has an error.
1154 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1156 gboolean res = FALSE;
1159 g_return_val_if_fail (set != NULL, FALSE);
1160 g_return_val_if_fail (fd != NULL, FALSE);
1161 g_return_val_if_fail (fd->fd >= 0, FALSE);
1163 g_mutex_lock (&((GstPoll *) set)->lock);
1165 idx = find_index (set->active_fds, fd);
1168 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1170 res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1172 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1174 res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1175 (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1176 (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1177 (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1178 (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1181 GST_WARNING ("%p: couldn't find fd !", set);
1183 g_mutex_unlock (&((GstPoll *) set)->lock);
1185 GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1191 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1193 gboolean res = FALSE;
1196 idx = find_index (set->active_fds, fd);
1199 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1201 res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1203 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1205 res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1208 GST_WARNING ("%p: couldn't find fd !", set);
1210 GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1216 * gst_poll_fd_can_read:
1217 * @set: a file descriptor set.
1218 * @fd: a file descriptor.
1220 * Check if @fd in @set has data to be read.
1222 * Returns: %TRUE if the descriptor has data to be read.
1225 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1227 gboolean res = FALSE;
1229 g_return_val_if_fail (set != NULL, FALSE);
1230 g_return_val_if_fail (fd != NULL, FALSE);
1231 g_return_val_if_fail (fd->fd >= 0, FALSE);
1233 g_mutex_lock (&((GstPoll *) set)->lock);
1235 res = gst_poll_fd_can_read_unlocked (set, fd);
1237 g_mutex_unlock (&((GstPoll *) set)->lock);
1243 * gst_poll_fd_can_write:
1244 * @set: a file descriptor set.
1245 * @fd: a file descriptor.
1247 * Check if @fd in @set can be used for writing.
1249 * Returns: %TRUE if the descriptor can be used for writing.
1252 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1254 gboolean res = FALSE;
1257 g_return_val_if_fail (set != NULL, FALSE);
1258 g_return_val_if_fail (fd != NULL, FALSE);
1259 g_return_val_if_fail (fd->fd >= 0, FALSE);
1261 g_mutex_lock (&((GstPoll *) set)->lock);
1263 idx = find_index (set->active_fds, fd);
1266 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1268 res = (pfd->revents & POLLOUT) != 0;
1270 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1272 res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1275 GST_WARNING ("%p: couldn't find fd !", set);
1277 g_mutex_unlock (&((GstPoll *) set)->lock);
1279 GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1287 * @timeout: a timeout in nanoseconds.
1289 * Wait for activity on the file descriptors in @set. This function waits up to
1290 * the specified @timeout. A timeout of #GST_CLOCK_TIME_NONE waits forever.
1292 * For #GstPoll objects created with gst_poll_new(), this function can only be
1293 * called from a single thread at a time. If called from multiple threads,
1294 * -1 will be returned with errno set to EPERM.
1296 * This is not true for timer #GstPoll objects created with
1297 * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1300 * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1301 * activity was detected after @timeout. If an error occurs, -1 is returned
1305 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1307 gboolean restarting;
1312 g_return_val_if_fail (set != NULL, -1);
1314 GST_DEBUG ("%p: timeout :%" GST_TIME_FORMAT, set, GST_TIME_ARGS (timeout));
1316 is_timer = set->timer;
1318 /* add one more waiter */
1319 old_waiting = INC_WAITING (set);
1321 /* we cannot wait from multiple threads unless we are a timer */
1322 if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1323 goto already_waiting;
1325 /* flushing, exit immediately */
1326 if (G_UNLIKELY (IS_FLUSHING (set)))
1335 mode = choose_mode (set, timeout);
1337 if (TEST_REBUILD (set)) {
1338 g_mutex_lock (&set->lock);
1340 g_array_set_size (set->active_fds, set->fds->len);
1341 memcpy (set->active_fds->data, set->fds->data,
1342 set->fds->len * sizeof (struct pollfd));
1344 if (!gst_poll_prepare_winsock_active_sets (set))
1347 g_mutex_unlock (&set->lock);
1351 case GST_POLL_MODE_AUTO:
1352 g_assert_not_reached ();
1354 case GST_POLL_MODE_PPOLL:
1358 struct timespec *tsptr;
1360 if (timeout != GST_CLOCK_TIME_NONE) {
1361 GST_TIME_TO_TIMESPEC (timeout, ts);
1368 ppoll ((struct pollfd *) set->active_fds->data,
1369 set->active_fds->len, tsptr, NULL);
1371 g_assert_not_reached ();
1376 case GST_POLL_MODE_POLL:
1381 if (timeout != GST_CLOCK_TIME_NONE) {
1382 t = GST_TIME_AS_MSECONDS (timeout);
1388 poll ((struct pollfd *) set->active_fds->data,
1389 set->active_fds->len, t);
1391 g_assert_not_reached ();
1396 case GST_POLL_MODE_PSELECT:
1397 #ifndef HAVE_PSELECT
1399 g_assert_not_reached ();
1404 case GST_POLL_MODE_SELECT:
1412 max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1414 if (mode == GST_POLL_MODE_SELECT) {
1416 struct timeval *tvptr;
1418 if (timeout != GST_CLOCK_TIME_NONE) {
1419 GST_TIME_TO_TIMEVAL (timeout, tv);
1425 GST_DEBUG ("%p: Calling select", set);
1426 res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1427 GST_DEBUG ("%p: After select, res:%d", set, res);
1431 struct timespec *tsptr;
1433 if (timeout != GST_CLOCK_TIME_NONE) {
1434 GST_TIME_TO_TIMESPEC (timeout, ts);
1440 GST_DEBUG ("%p: Calling pselect", set);
1442 pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1443 GST_DEBUG ("%p: After pselect, res:%d", set, res);
1448 fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1450 #else /* G_OS_WIN32 */
1451 g_assert_not_reached ();
1456 case GST_POLL_MODE_WINDOWS:
1459 gint ignore_count = set->active_fds_ignored->len;
1462 if (G_LIKELY (ignore_count == 0)) {
1463 if (timeout != GST_CLOCK_TIME_NONE)
1464 t = GST_TIME_AS_MSECONDS (timeout);
1468 /* already one or more ignored fds, so we quickly sweep the others */
1472 if (set->active_events->len != 0) {
1473 wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1474 (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1476 wait_ret = WSA_WAIT_FAILED;
1477 WSASetLastError (WSA_INVALID_PARAMETER);
1480 if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1482 } else if (wait_ret == WSA_WAIT_FAILED) {
1484 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1486 /* the first entry is the wakeup event */
1487 if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1488 res = gst_poll_collect_winsock_events (set);
1490 res = 1; /* wakeup event */
1494 g_assert_not_reached ();
1502 /* Applications needs to clear the control socket themselves for timer
1504 * For other polls, we need to clear the control socket. If there was only
1505 * one socket with activity and it was the control socket, we need to
1507 if (release_all_wakeup (set) > 0 && res == 1)
1511 /* we got woken up and we are flushing, we need to stop */
1512 if (G_UNLIKELY (IS_FLUSHING (set)))
1515 } while (G_UNLIKELY (restarting));
1524 GST_LOG ("%p: we are already waiting", set);
1531 GST_LOG ("%p: we are flushing", set);
1539 GST_LOG ("%p: winsock error", set);
1540 g_mutex_unlock (&set->lock);
1548 * gst_poll_set_controllable:
1550 * @controllable: new controllable state.
1552 * When @controllable is %TRUE, this function ensures that future calls to
1553 * gst_poll_wait() will be affected by gst_poll_restart() and
1554 * gst_poll_set_flushing().
1556 * This function only works for non-timer #GstPoll objects created with
1559 * Returns: %TRUE if the controllability of @set could be updated.
1562 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1564 g_return_val_if_fail (set != NULL, FALSE);
1565 g_return_val_if_fail (!set->timer, FALSE);
1567 GST_LOG ("%p: controllable : %d", set, controllable);
1569 set->controllable = controllable;
1578 * Restart any gst_poll_wait() that is in progress. This function is typically
1579 * used after adding or removing descriptors to @set.
1581 * If @set is not controllable, then this call will have no effect.
1583 * This function only works for non-timer #GstPoll objects created with
1587 gst_poll_restart (GstPoll * set)
1589 g_return_if_fail (set != NULL);
1590 g_return_if_fail (!set->timer);
1592 if (set->controllable && GET_WAITING (set) > 0) {
1593 /* we are controllable and waiting, wake up the waiter. The socket will be
1594 * cleared by the _wait() thread and the poll will be restarted */
1600 * gst_poll_set_flushing:
1602 * @flushing: new flushing state.
1604 * When @flushing is %TRUE, this function ensures that current and future calls
1605 * to gst_poll_wait() will return -1, with errno set to EBUSY.
1607 * Unsetting the flushing state will restore normal operation of @set.
1609 * This function only works for non-timer #GstPoll objects created with
1613 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1615 g_return_if_fail (set != NULL);
1616 g_return_if_fail (!set->timer);
1618 GST_LOG ("%p: flushing: %d", set, flushing);
1620 /* update the new state first */
1621 SET_FLUSHING (set, flushing);
1623 if (flushing && set->controllable && GET_WAITING (set) > 0) {
1624 /* we are flushing, controllable and waiting, wake up the waiter. When we
1625 * stop the flushing operation we don't clear the wakeup fd here, this will
1626 * happen in the _wait() thread. */
1632 * gst_poll_write_control:
1635 * Write a byte to the control socket of the controllable @set.
1636 * This function is mostly useful for timer #GstPoll objects created with
1637 * gst_poll_new_timer().
1639 * It will make any current and future gst_poll_wait() function return with
1640 * 1, meaning the control socket is set. After an equal amount of calls to
1641 * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1642 * block again until their timeout expired.
1644 * This function only works for timer #GstPoll objects created with
1645 * gst_poll_new_timer().
1647 * Returns: %TRUE on success. %FALSE when when the byte could not be written.
1648 * errno contains the detailed error code but will never be EAGAIN, EINTR or
1649 * EWOULDBLOCK. %FALSE always signals a critical error.
1652 gst_poll_write_control (GstPoll * set)
1656 g_return_val_if_fail (set != NULL, FALSE);
1657 g_return_val_if_fail (set->timer, FALSE);
1659 res = raise_wakeup (set);
1665 * gst_poll_read_control:
1668 * Read a byte from the control socket of the controllable @set.
1670 * This function only works for timer #GstPoll objects created with
1671 * gst_poll_new_timer().
1673 * Returns: %TRUE on success. %FALSE when when there was no byte to read or
1674 * reading the byte failed. If there was no byte to read, and only then, errno
1675 * will contain EWOULDBLOCK or EAGAIN. For all other values of errno this always signals a
1679 gst_poll_read_control (GstPoll * set)
1683 g_return_val_if_fail (set != NULL, FALSE);
1684 g_return_val_if_fail (set->timer, FALSE);
1686 res = release_wakeup (set);