2 * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4 * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
6 * gstpoll.c: File descriptor set
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21 * Boston, MA 02111-1307, USA.
25 * @short_description: Keep track of file descriptors and make it possible
26 * to wait on them in a cancelable way
28 * A #GstPoll keeps track of file descriptors much like fd_set (used with
29 * select()) or a struct pollfd array (used with poll()). Once created with
30 * gst_poll_new(), the set can be used to wait for file descriptors to be
31 * readable and/or writeable. It is possible to make this wait be controlled
32 * by specifying %TRUE for the @controllable flag when creating the set (or
33 * later calling gst_poll_set_controllable()).
35 * New file descriptors are added to the set using gst_poll_add_fd(), and
36 * removed using gst_poll_remove_fd(). Controlling which file descriptors
37 * should be waited for to become readable and/or writeable are done using
38 * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
40 * Use gst_poll_wait() to wait for the file descriptors to actually become
41 * readable and/or writeable, or to timeout if no file descriptor is available
42 * in time. The wait can be controlled by calling gst_poll_restart() and
43 * gst_poll_set_flushing().
45 * Once the file descriptor set has been waited for, one can use
46 * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
47 * gst_poll_fd_has_error() to see if it has generated an error,
48 * gst_poll_fd_can_read() to see if it is possible to read from the file
49 * descriptor, and gst_poll_fd_can_write() to see if it is possible to
61 #include <sys/types.h>
68 #define EINPROGRESS WSAEINPROGRESS
70 #include <sys/socket.h>
73 /* OS/X needs this because of bad headers */
76 #include "gst_private.h"
80 /* the poll/select call is also performed on a control socket, that way
81 * we can send special commands to control it
83 #define SEND_COMMAND(set, command) \
85 unsigned char c = command; \
86 write (set->control_write_fd.fd, &c, 1); \
89 #define READ_COMMAND(set, command, res) \
91 res = read (set->control_read_fd.fd, &command, 1); \
94 #define GST_POLL_CMD_WAKEUP 'W' /* restart the poll/select call */
97 #define CLOSE_SOCKET(sock) closesocket (sock)
99 #define CLOSE_SOCKET(sock) close (sock)
110 gboolean controllable;
111 gboolean new_controllable;
115 GstPollFD control_read_fd;
116 GstPollFD control_write_fd;
120 find_index (GArray * array, GstPollFD * fd)
125 /* start by assuming the index found in the fd is still valid */
126 if (fd->idx >= 0 && fd->idx < array->len) {
127 pfd = &g_array_index (array, struct pollfd, fd->idx);
129 if (pfd->fd == fd->fd) {
134 /* the pollfd array has changed and we need to lookup the fd again */
135 for (i = 0; i < array->len; i++) {
136 pfd = &g_array_index (array, struct pollfd, i);
138 if (pfd->fd == fd->fd) {
148 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
149 /* check if all file descriptors will fit in an fd_set */
151 selectable_fds (const GstPoll * set)
155 for (i = 0; i < set->fds->len; i++) {
156 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
158 if (pfd->fd >= FD_SETSIZE)
165 /* check if the timeout will convert to a timeout value used for poll()
166 * without a loss of precision
169 pollable_timeout (GstClockTime timeout)
171 if (timeout == GST_CLOCK_TIME_NONE)
174 /* not a nice multiple of milliseconds */
175 if (timeout % 1000000)
183 choose_mode (const GstPoll * set, GstClockTime timeout)
187 if (set->mode == GST_POLL_MODE_AUTO) {
189 mode = GST_POLL_MODE_PPOLL;
190 #elif defined(HAVE_POLL)
191 if (!selectable_fds (set) || pollable_timeout (timeout)) {
192 mode = GST_POLL_MODE_POLL;
195 mode = GST_POLL_MODE_PSELECT;
197 mode = GST_POLL_MODE_SELECT;
200 #elif defined(HAVE_PSELECT)
201 mode = GST_POLL_MODE_PSELECT;
203 mode = GST_POLL_MODE_SELECT;
212 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds)
220 g_mutex_lock (set->lock);
222 for (i = 0; i < set->active_fds->len; i++) {
223 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
225 if (pfd->fd < FD_SETSIZE) {
226 if (pfd->events & POLLIN)
227 FD_SET (pfd->fd, readfds);
228 if (pfd->events & POLLOUT)
229 FD_SET (pfd->fd, writefds);
230 if (pfd->fd > max_fd)
235 g_mutex_unlock (set->lock);
241 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds)
245 g_mutex_lock (set->lock);
247 for (i = 0; i < set->active_fds->len; i++) {
248 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
250 if (pfd->fd < FD_SETSIZE) {
251 if (FD_ISSET (pfd->fd, readfds))
252 pfd->revents |= POLLIN;
253 if (FD_ISSET (pfd->fd, writefds))
254 pfd->revents |= POLLOUT;
258 g_mutex_unlock (set->lock);
263 * @mode: the mode of the file descriptor set.
264 * @controllable: whether it should be possible to control a wait.
266 * Create a new file descriptor set with the given @mode. If @controllable, it
267 * is possible to restart or flush a call to gst_poll_wait() with
268 * gst_poll_restart() and gst_poll_set_flushing() respectively.
270 * Returns: a new #GstPoll, or %NULL in case of an error. Free with
276 gst_poll_new (GstPollMode mode, gboolean controllable)
280 nset = g_new0 (GstPoll, 1);
282 nset->lock = g_mutex_new ();
283 nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
284 nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
285 nset->control_read_fd.fd = -1;
286 nset->control_write_fd.fd = -1;
288 if (!gst_poll_set_controllable (nset, controllable))
289 goto not_controllable;
296 gst_poll_free (nset);
303 * @set: a file descriptor set.
305 * Free a file descriptor set.
310 gst_poll_free (GstPoll * set)
312 g_return_if_fail (set != NULL);
314 if (set->control_write_fd.fd >= 0)
315 CLOSE_SOCKET (set->control_write_fd.fd);
316 if (set->control_read_fd.fd >= 0)
317 CLOSE_SOCKET (set->control_read_fd.fd);
319 g_array_free (set->active_fds, TRUE);
320 g_array_free (set->fds, TRUE);
321 g_mutex_free (set->lock);
327 * @set: a file descriptor set.
328 * @mode: the mode of the file descriptor set.
330 * Set the mode to use to determine how to wait for the file descriptor set.
335 gst_poll_set_mode (GstPoll * set, GstPollMode mode)
337 g_return_if_fail (set != NULL);
339 g_mutex_lock (set->lock);
341 g_mutex_unlock (set->lock);
346 * @set: a file descriptor set.
348 * Get the mode used to determine how to wait for the file descriptor set.
350 * Returns: the currently used mode.
355 gst_poll_get_mode (const GstPoll * set)
359 g_return_val_if_fail (set != NULL, GST_POLL_MODE_AUTO);
361 g_mutex_lock (set->lock);
363 g_mutex_unlock (set->lock);
372 * Initializes @fd. Alternatively you can initialize it with
376 gst_poll_fd_init (GstPollFD * fd)
378 g_return_if_fail (fd != NULL);
385 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
389 idx = find_index (set->fds, fd);
394 nfd.events = POLLERR | POLLNVAL | POLLHUP;
397 g_array_append_val (set->fds, nfd);
398 fd->idx = set->fds->len - 1;
406 * @set: a file descriptor set.
407 * @fd: a file descriptor.
409 * Add a file descriptor to the file descriptor set.
411 * Returns: %TRUE if the file descriptor was successfully added to the set.
416 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
420 g_return_val_if_fail (set != NULL, FALSE);
421 g_return_val_if_fail (fd != NULL, FALSE);
422 g_return_val_if_fail (fd->fd >= 0, FALSE);
424 g_mutex_lock (set->lock);
426 ret = gst_poll_add_fd_unlocked (set, fd);
428 g_mutex_unlock (set->lock);
434 * gst_poll_remove_fd:
435 * @set: a file descriptor set.
436 * @fd: a file descriptor.
438 * Remove a file descriptor from the file descriptor set.
440 * Returns: %TRUE if the file descriptor was successfully removed from the set.
445 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
449 g_return_val_if_fail (set != NULL, FALSE);
450 g_return_val_if_fail (fd != NULL, FALSE);
451 g_return_val_if_fail (fd->fd >= 0, FALSE);
453 g_mutex_lock (set->lock);
455 /* get the index, -1 is an fd that is not added */
456 idx = find_index (set->fds, fd);
458 /* remove the fd at index, we use _remove_index_fast, which copies the last
459 * element of the array to the freed index */
460 g_array_remove_index_fast (set->fds, idx);
462 /* mark fd as removed by setting the index to -1 */
466 g_mutex_unlock (set->lock);
472 * gst_poll_fd_ctl_write:
473 * @set: a file descriptor set.
474 * @fd: a file descriptor.
475 * @active: a new status.
477 * Control whether the descriptor @fd in @set will be monitored for
480 * Returns: %TRUE if the descriptor was successfully updated.
485 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
489 g_return_val_if_fail (set != NULL, FALSE);
490 g_return_val_if_fail (fd != NULL, FALSE);
491 g_return_val_if_fail (fd->fd >= 0, FALSE);
493 g_mutex_lock (set->lock);
495 idx = find_index (set->fds, fd);
497 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
500 pfd->events |= POLLOUT;
502 pfd->events &= ~POLLOUT;
505 g_mutex_unlock (set->lock);
511 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
515 idx = find_index (set->fds, fd);
517 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
520 pfd->events |= (POLLIN | POLLPRI);
522 pfd->events &= ~(POLLIN | POLLPRI);
529 * gst_poll_fd_ctl_read:
530 * @set: a file descriptor set.
531 * @fd: a file descriptor.
532 * @active: a new status.
534 * Control whether the descriptor @fd in @set will be monitored for
537 * Returns: %TRUE if the descriptor was successfully updated.
542 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
546 g_return_val_if_fail (set != NULL, FALSE);
547 g_return_val_if_fail (fd != NULL, FALSE);
548 g_return_val_if_fail (fd->fd >= 0, FALSE);
550 g_mutex_lock (set->lock);
552 ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
554 g_mutex_unlock (set->lock);
560 * gst_poll_fd_has_closed:
561 * @set: a file descriptor set.
562 * @fd: a file descriptor.
564 * Check if @fd in @set has closed the connection.
566 * Returns: %TRUE if the connection was closed.
571 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
573 gboolean res = FALSE;
576 g_return_val_if_fail (set != NULL, FALSE);
577 g_return_val_if_fail (fd != NULL, FALSE);
578 g_return_val_if_fail (fd->fd >= 0, FALSE);
580 g_mutex_lock (set->lock);
582 idx = find_index (set->active_fds, fd);
584 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
586 res = (pfd->revents & POLLHUP) != 0;
589 g_mutex_unlock (set->lock);
595 * gst_poll_fd_has_error:
596 * @set: a file descriptor set.
597 * @fd: a file descriptor.
599 * Check if @fd in @set has an error.
601 * Returns: %TRUE if the descriptor has an error.
606 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
608 gboolean res = FALSE;
611 g_return_val_if_fail (set != NULL, FALSE);
612 g_return_val_if_fail (fd != NULL, FALSE);
613 g_return_val_if_fail (fd->fd >= 0, FALSE);
615 g_mutex_lock (set->lock);
617 idx = find_index (set->active_fds, fd);
619 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
621 res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
624 g_mutex_unlock (set->lock);
630 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
632 gboolean res = FALSE;
635 idx = find_index (set->active_fds, fd);
637 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
639 res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
646 * gst_poll_fd_can_read:
647 * @set: a file descriptor set.
648 * @fd: a file descriptor.
650 * Check if @fd in @set has data to be read.
652 * Returns: %TRUE if the descriptor has data to be read.
657 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
659 gboolean res = FALSE;
661 g_return_val_if_fail (set != NULL, FALSE);
662 g_return_val_if_fail (fd != NULL, FALSE);
663 g_return_val_if_fail (fd->fd >= 0, FALSE);
665 g_mutex_lock (set->lock);
667 res = gst_poll_fd_can_read_unlocked (set, fd);
669 g_mutex_unlock (set->lock);
675 * gst_poll_fd_can_write:
676 * @set: a file descriptor set.
677 * @fd: a file descriptor.
679 * Check if @fd in @set can be used for writing.
681 * Returns: %TRUE if the descriptor can be used for writing.
686 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
688 gboolean res = FALSE;
691 g_return_val_if_fail (set != NULL, FALSE);
692 g_return_val_if_fail (fd != NULL, FALSE);
693 g_return_val_if_fail (fd->fd >= 0, FALSE);
695 g_mutex_lock (set->lock);
697 idx = find_index (set->active_fds, fd);
699 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
701 res = (pfd->revents & POLLOUT) != 0;
704 g_mutex_unlock (set->lock);
712 * @timeout: a timeout in nanoseconds.
714 * Wait for activity on the file descriptors in @set. This function waits up to
715 * the specified @timeout. A timeout of #GST_CLOCK_TIME_NONE waits forever.
717 * When this function is called from multiple threads, -1 will be returned with
718 * errno set to EPERM.
720 * Returns: The number of #GstPollFD in @set that have activity or 0 when no
721 * activity was detected after @timeout. If an error occurs, -1 is returned
727 gst_poll_wait (GstPoll * set, GstClockTime timeout)
732 g_return_val_if_fail (set != NULL, -1);
734 g_mutex_lock (set->lock);
736 /* we cannot wait from multiple threads */
738 goto already_waiting;
740 /* flushing, exit immediatly */
752 mode = choose_mode (set, timeout);
754 g_array_set_size (set->active_fds, set->fds->len);
755 memcpy (set->active_fds->data, set->fds->data,
756 set->fds->len * sizeof (struct pollfd));
757 g_mutex_unlock (set->lock);
760 case GST_POLL_MODE_AUTO:
761 g_assert_not_reached ();
763 case GST_POLL_MODE_PPOLL:
767 struct timespec *tsptr;
769 if (timeout != GST_CLOCK_TIME_NONE) {
770 GST_TIME_TO_TIMESPEC (timeout, ts);
777 ppoll ((struct pollfd *) set->active_fds->data,
778 set->active_fds->len, tsptr, NULL);
780 g_assert_not_reached ();
785 case GST_POLL_MODE_POLL:
790 if (timeout != GST_CLOCK_TIME_NONE) {
791 t = GST_TIME_AS_MSECONDS (timeout);
797 poll ((struct pollfd *) set->active_fds->data,
798 set->active_fds->len, t);
800 g_assert_not_reached ();
805 case GST_POLL_MODE_PSELECT:
808 g_assert_not_reached ();
813 case GST_POLL_MODE_SELECT:
819 max_fd = pollfd_to_fd_set (set, &readfds, &writefds);
821 if (mode == GST_POLL_MODE_SELECT) {
823 struct timeval *tvptr;
825 if (timeout != GST_CLOCK_TIME_NONE) {
826 GST_TIME_TO_TIMEVAL (timeout, tv);
832 res = select (max_fd + 1, &readfds, &writefds, NULL, tvptr);
836 struct timespec *tsptr;
838 if (timeout != GST_CLOCK_TIME_NONE) {
839 GST_TIME_TO_TIMESPEC (timeout, ts);
845 res = pselect (max_fd + 1, &readfds, &writefds, NULL, tsptr, NULL);
850 fd_set_to_pollfd (set, &readfds, &writefds);
857 g_mutex_lock (set->lock);
859 /* check if the poll/select was aborted due to a command */
860 if (res > 0 && set->controllable) {
865 /* we do not check the read status of the control socket here because
866 * there may have been a write to the socket between the time the
867 * poll/select finished and before we got the mutex back, and we need
868 * to clear out the control socket before leaving */
869 READ_COMMAND (set, cmd, result);
871 /* no more commands, quit the loop */
875 /* if the control socket is the only socket with activity when we get
876 * here, we restart the _wait operation, else we allow the caller to
877 * process the other file descriptors */
879 gst_poll_fd_can_read_unlocked (set, &set->control_read_fd))
884 /* update the controllable state if needed */
885 set->controllable = set->new_controllable;
888 /* we got woken up and we are flushing, we need to stop */
893 } while (restarting);
895 set->waiting = FALSE;
897 g_mutex_unlock (set->lock);
904 g_mutex_unlock (set->lock);
910 g_mutex_unlock (set->lock);
917 * gst_poll_set_controllable:
919 * @controllable: new controllable state.
921 * When @controllable is %TRUE, this function ensures that future calls to
922 * gst_poll_wait() will be affected by gst_poll_restart() and
923 * gst_poll_set_flushing().
925 * Returns: %TRUE if the controllability of @set could be updated.
930 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
932 g_return_val_if_fail (set != NULL, FALSE);
934 g_mutex_lock (set->lock);
936 if (controllable && set->control_read_fd.fd < 0) {
937 gint control_sock[2];
942 if (_pipe (control_sock, 4096, _O_BINARY) < 0)
945 ioctlsocket (control_sock[0], FIONBIO, &flags);
946 ioctlsocket (control_sock[1], FIONBIO, &flags);
948 if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
951 fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
952 fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
954 set->control_read_fd.fd = control_sock[0];
955 set->control_write_fd.fd = control_sock[1];
957 gst_poll_add_fd_unlocked (set, &set->control_read_fd);
960 if (set->control_read_fd.fd >= 0)
961 gst_poll_fd_ctl_read_unlocked (set, &set->control_read_fd, controllable);
963 /* delay the change of the controllable state if we are waiting */
964 set->new_controllable = controllable;
966 set->controllable = controllable;
968 g_mutex_unlock (set->lock);
975 g_mutex_unlock (set->lock);
984 * Restart any gst_poll_wait() that is in progress. This function is typically
985 * used after adding or removing descriptors to @set.
987 * If @set is not controllable, then this call will have no effect.
992 gst_poll_restart (GstPoll * set)
994 g_return_if_fail (set != NULL);
996 g_mutex_lock (set->lock);
998 if (set->controllable && set->waiting) {
999 /* if we are waiting, we can send the command, else we do not have to
1000 * bother, future calls will automatically pick up the new fdset */
1001 SEND_COMMAND (set, GST_POLL_CMD_WAKEUP);
1004 g_mutex_unlock (set->lock);
1008 * gst_poll_set_flushing:
1010 * @flushing: new flushing state.
1012 * When @flushing is %TRUE, this function ensures that current and future calls
1013 * to gst_poll_wait() will return -1, with errno set to EBUSY.
1015 * Unsetting the flushing state will restore normal operation of @set.
1020 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1022 g_return_if_fail (set != NULL);
1024 g_mutex_lock (set->lock);
1026 /* update the new state first */
1027 set->flushing = flushing;
1029 if (flushing && set->controllable && set->waiting) {
1030 /* we are flushing, controllable and waiting, wake up the waiter */
1031 SEND_COMMAND (set, GST_POLL_CMD_WAKEUP);
1034 g_mutex_unlock (set->lock);