Add INIT macro and _init method for initializing the GstPollFD.
[platform/upstream/gstreamer.git] / gst / gstpoll.c
1 /* GStreamer
2  * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4  * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5  *
6  * gstpoll.c: File descriptor set
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23 /**
24  * SECTION:gstpoll
25  * @short_description: Keep track of file descriptors and make it possible
26  *                     to wait on them in a cancelable way
27  *
28  * A #GstPoll keeps track of file descriptors much like fd_set (used with
29  * select()) or a struct pollfd array (used with poll()). Once created with
30  * gst_poll_new(), the set can be used to wait for file descriptors to be
31  * readable and/or writeable. It is possible to make this wait be controlled
32  * by specifying %TRUE for the @controllable flag when creating the set (or
33  * later calling gst_poll_set_controllable()).
34  *
35  * New file descriptors are added to the set using gst_poll_add_fd(), and
36  * removed using gst_poll_remove_fd(). Controlling which file descriptors
37  * should be waited for to become readable and/or writeable are done using
38  * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
39  *
40  * Use gst_poll_wait() to wait for the file descriptors to actually become
41  * readable and/or writeable, or to timeout if no file descriptor is available
42  * in time. The wait can be controlled by calling gst_poll_restart() and
43  * gst_poll_set_flushing().
44  *
45  * Once the file descriptor set has been waited for, one can use
46  * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
47  * gst_poll_fd_has_error() to see if it has generated an error,
48  * gst_poll_fd_can_read() to see if it is possible to read from the file
49  * descriptor, and gst_poll_fd_can_write() to see if it is possible to
50  * write to it.
51  *
52  */
53
54 #ifdef HAVE_CONFIG_H
55 #include "config.h"
56 #endif
57
58 #define _GNU_SOURCE 1
59 #include <sys/poll.h>
60 #include <sys/time.h>
61 #include <sys/types.h>
62 #include <unistd.h>
63 #include <errno.h>
64 #include <fcntl.h>
65
66 #ifdef G_OS_WIN32
67 #include <winsock2.h>
68 #define EINPROGRESS WSAEINPROGRESS
69 #else
70 #include <sys/socket.h>
71 #endif
72
73 /* OS/X needs this because of bad headers */
74 #include <string.h>
75
76 #include "gst_private.h"
77
78 #include "gstpoll.h"
79
80 /* the poll/select call is also performed on a control socket, that way
81  * we can send special commands to control it
82  */
83 #define SEND_COMMAND(set, command)                   \
84 G_STMT_START {                                       \
85   unsigned char c = command;                         \
86   write (set->control_write_fd.fd, &c, 1);           \
87 } G_STMT_END
88
89 #define READ_COMMAND(set, command, res)              \
90 G_STMT_START {                                       \
91   res = read (set->control_read_fd.fd, &command, 1); \
92 } G_STMT_END
93
94 #define GST_POLL_CMD_WAKEUP  'W'        /* restart the poll/select call */
95
96 #ifdef G_OS_WIN32
97 #define CLOSE_SOCKET(sock) closesocket (sock)
98 #else
99 #define CLOSE_SOCKET(sock) close (sock)
100 #endif
101
102 struct _GstPoll
103 {
104   GstPollMode mode;
105
106   GMutex *lock;
107
108   GArray *fds;
109   GArray *active_fds;
110   gboolean controllable;
111   gboolean new_controllable;
112   gboolean waiting;
113   gboolean flushing;
114
115   GstPollFD control_read_fd;
116   GstPollFD control_write_fd;
117 };
118
119 static gint
120 find_index (GArray * array, GstPollFD * fd)
121 {
122   struct pollfd *pfd;
123   guint i;
124
125   /* start by assuming the index found in the fd is still valid */
126   if (fd->idx >= 0 && fd->idx < array->len) {
127     pfd = &g_array_index (array, struct pollfd, fd->idx);
128
129     if (pfd->fd == fd->fd) {
130       return fd->idx;
131     }
132   }
133
134   /* the pollfd array has changed and we need to lookup the fd again */
135   for (i = 0; i < array->len; i++) {
136     pfd = &g_array_index (array, struct pollfd, i);
137
138     if (pfd->fd == fd->fd) {
139       fd->idx = (gint) i;
140       return fd->idx;
141     }
142   }
143
144   fd->idx = -1;
145   return fd->idx;
146 }
147
148 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
149 /* check if all file descriptors will fit in an fd_set */
150 static gboolean
151 selectable_fds (const GstPoll * set)
152 {
153   guint i;
154
155   for (i = 0; i < set->fds->len; i++) {
156     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
157
158     if (pfd->fd >= FD_SETSIZE)
159       return FALSE;
160   }
161
162   return TRUE;
163 }
164
165 /* check if the timeout will convert to a timeout value used for poll()
166  * without a loss of precision
167  */
168 static gboolean
169 pollable_timeout (GstClockTime timeout)
170 {
171   if (timeout == GST_CLOCK_TIME_NONE)
172     return TRUE;
173
174   /* not a nice multiple of milliseconds */
175   if (timeout % 1000000)
176     return FALSE;
177
178   return TRUE;
179 }
180 #endif
181
182 static GstPollMode
183 choose_mode (const GstPoll * set, GstClockTime timeout)
184 {
185   GstPollMode mode;
186
187   if (set->mode == GST_POLL_MODE_AUTO) {
188 #ifdef HAVE_PPOLL
189     mode = GST_POLL_MODE_PPOLL;
190 #elif defined(HAVE_POLL)
191     if (!selectable_fds (set) || pollable_timeout (timeout)) {
192       mode = GST_POLL_MODE_POLL;
193     } else {
194 #ifdef HAVE_PSELECT
195       mode = GST_POLL_MODE_PSELECT;
196 #else
197       mode = GST_POLL_MODE_SELECT;
198 #endif
199     }
200 #elif defined(HAVE_PSELECT)
201     mode = GST_POLL_MODE_PSELECT;
202 #else
203     mode = GST_POLL_MODE_SELECT;
204 #endif
205   } else {
206     mode = set->mode;
207   }
208   return mode;
209 }
210
211 static gint
212 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds)
213 {
214   gint max_fd = -1;
215   guint i;
216
217   FD_ZERO (readfds);
218   FD_ZERO (writefds);
219
220   g_mutex_lock (set->lock);
221
222   for (i = 0; i < set->active_fds->len; i++) {
223     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
224
225     if (pfd->fd < FD_SETSIZE) {
226       if (pfd->events & POLLIN)
227         FD_SET (pfd->fd, readfds);
228       if (pfd->events & POLLOUT)
229         FD_SET (pfd->fd, writefds);
230       if (pfd->fd > max_fd)
231         max_fd = pfd->fd;
232     }
233   }
234
235   g_mutex_unlock (set->lock);
236
237   return max_fd;
238 }
239
240 static void
241 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds)
242 {
243   guint i;
244
245   g_mutex_lock (set->lock);
246
247   for (i = 0; i < set->active_fds->len; i++) {
248     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
249
250     if (pfd->fd < FD_SETSIZE) {
251       if (FD_ISSET (pfd->fd, readfds))
252         pfd->revents |= POLLIN;
253       if (FD_ISSET (pfd->fd, writefds))
254         pfd->revents |= POLLOUT;
255     }
256   }
257
258   g_mutex_unlock (set->lock);
259 }
260
261 /**
262  * gst_poll_new:
263  * @mode: the mode of the file descriptor set.
264  * @controllable: whether it should be possible to control a wait.
265  *
266  * Create a new file descriptor set with the given @mode. If @controllable, it
267  * is possible to restart or flush a call to gst_poll_wait() with
268  * gst_poll_restart() and gst_poll_set_flushing() respectively.
269  *
270  * Returns: a new #GstPoll, or %NULL in case of an error. Free with
271  * gst_poll_free().
272  *
273  * Since: 0.10.18
274  */
275 GstPoll *
276 gst_poll_new (GstPollMode mode, gboolean controllable)
277 {
278   GstPoll *nset;
279
280   nset = g_new0 (GstPoll, 1);
281   nset->mode = mode;
282   nset->lock = g_mutex_new ();
283   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
284   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
285   nset->control_read_fd.fd = -1;
286   nset->control_write_fd.fd = -1;
287
288   if (!gst_poll_set_controllable (nset, controllable))
289     goto not_controllable;
290
291   return nset;
292
293   /* ERRORS */
294 not_controllable:
295   {
296     gst_poll_free (nset);
297     return NULL;
298   }
299 }
300
301 /**
302  * gst_poll_free:
303  * @set: a file descriptor set.
304  *
305  * Free a file descriptor set.
306  *
307  * Since: 0.10.18
308  */
309 void
310 gst_poll_free (GstPoll * set)
311 {
312   g_return_if_fail (set != NULL);
313
314   if (set->control_write_fd.fd >= 0)
315     CLOSE_SOCKET (set->control_write_fd.fd);
316   if (set->control_read_fd.fd >= 0)
317     CLOSE_SOCKET (set->control_read_fd.fd);
318
319   g_array_free (set->active_fds, TRUE);
320   g_array_free (set->fds, TRUE);
321   g_mutex_free (set->lock);
322   g_free (set);
323 }
324
325 /**
326  * gst_poll_set_mode:
327  * @set: a file descriptor set.
328  * @mode: the mode of the file descriptor set.
329  *
330  * Set the mode to use to determine how to wait for the file descriptor set.
331  *
332  * Since: 0.10.18
333  */
334 void
335 gst_poll_set_mode (GstPoll * set, GstPollMode mode)
336 {
337   g_return_if_fail (set != NULL);
338
339   g_mutex_lock (set->lock);
340   set->mode = mode;
341   g_mutex_unlock (set->lock);
342 }
343
344 /**
345  * gst_poll_get_mode:
346  * @set: a file descriptor set.
347  *
348  * Get the mode used to determine how to wait for the file descriptor set.
349  *
350  * Returns: the currently used mode.
351  *
352  * Since: 0.10.18
353  */
354 GstPollMode
355 gst_poll_get_mode (const GstPoll * set)
356 {
357   GstPollMode mode;
358
359   g_return_val_if_fail (set != NULL, GST_POLL_MODE_AUTO);
360
361   g_mutex_lock (set->lock);
362   mode = set->mode;
363   g_mutex_unlock (set->lock);
364
365   return mode;
366 }
367
368 /**
369  * gst_poll_fd_init:
370  * @fd: a #GstPollFD
371  *
372  * Initializes @fd. Alternatively you can initialize it with
373  * #GST_POLL_FD_INIT.
374  */
375 void
376 gst_poll_fd_init (GstPollFD * fd)
377 {
378   g_return_if_fail (fd != NULL);
379
380   fd->fd = -1;
381   fd->idx = -1;
382 }
383
384 static gboolean
385 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
386 {
387   gint idx;
388
389   idx = find_index (set->fds, fd);
390   if (idx < 0) {
391     struct pollfd nfd;
392
393     nfd.fd = fd->fd;
394     nfd.events = POLLERR | POLLNVAL | POLLHUP;
395     nfd.revents = 0;
396
397     g_array_append_val (set->fds, nfd);
398     fd->idx = set->fds->len - 1;
399   }
400
401   return TRUE;
402 }
403
404 /**
405  * gst_poll_add_fd:
406  * @set: a file descriptor set.
407  * @fd: a file descriptor.
408  *
409  * Add a file descriptor to the file descriptor set.
410  *
411  * Returns: %TRUE if the file descriptor was successfully added to the set.
412  *
413  * Since: 0.10.18
414  */
415 gboolean
416 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
417 {
418   gboolean ret;
419
420   g_return_val_if_fail (set != NULL, FALSE);
421   g_return_val_if_fail (fd != NULL, FALSE);
422   g_return_val_if_fail (fd->fd >= 0, FALSE);
423
424   g_mutex_lock (set->lock);
425
426   ret = gst_poll_add_fd_unlocked (set, fd);
427
428   g_mutex_unlock (set->lock);
429
430   return ret;
431 }
432
433 /**
434  * gst_poll_remove_fd:
435  * @set: a file descriptor set.
436  * @fd: a file descriptor.
437  *
438  * Remove a file descriptor from the file descriptor set.
439  *
440  * Returns: %TRUE if the file descriptor was successfully removed from the set.
441  *
442  * Since: 0.10.18
443  */
444 gboolean
445 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
446 {
447   gint idx;
448
449   g_return_val_if_fail (set != NULL, FALSE);
450   g_return_val_if_fail (fd != NULL, FALSE);
451   g_return_val_if_fail (fd->fd >= 0, FALSE);
452
453   g_mutex_lock (set->lock);
454
455   /* get the index, -1 is an fd that is not added */
456   idx = find_index (set->fds, fd);
457   if (idx >= 0) {
458     /* remove the fd at index, we use _remove_index_fast, which copies the last
459      * element of the array to the freed index */
460     g_array_remove_index_fast (set->fds, idx);
461
462     /* mark fd as removed by setting the index to -1 */
463     fd->idx = -1;
464   }
465
466   g_mutex_unlock (set->lock);
467
468   return idx >= 0;
469 }
470
471 /**
472  * gst_poll_fd_ctl_write:
473  * @set: a file descriptor set.
474  * @fd: a file descriptor.
475  * @active: a new status.
476  *
477  * Control whether the descriptor @fd in @set will be monitored for
478  * writability.
479  *
480  * Returns: %TRUE if the descriptor was successfully updated.
481  *
482  * Since: 0.10.18
483  */
484 gboolean
485 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
486 {
487   gint idx;
488
489   g_return_val_if_fail (set != NULL, FALSE);
490   g_return_val_if_fail (fd != NULL, FALSE);
491   g_return_val_if_fail (fd->fd >= 0, FALSE);
492
493   g_mutex_lock (set->lock);
494
495   idx = find_index (set->fds, fd);
496   if (idx >= 0) {
497     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
498
499     if (active)
500       pfd->events |= POLLOUT;
501     else
502       pfd->events &= ~POLLOUT;
503   }
504
505   g_mutex_unlock (set->lock);
506
507   return idx >= 0;
508 }
509
510 static gboolean
511 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
512 {
513   gint idx;
514
515   idx = find_index (set->fds, fd);
516   if (idx >= 0) {
517     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
518
519     if (active)
520       pfd->events |= (POLLIN | POLLPRI);
521     else
522       pfd->events &= ~(POLLIN | POLLPRI);
523   }
524
525   return idx >= 0;
526 }
527
528 /**
529  * gst_poll_fd_ctl_read:
530  * @set: a file descriptor set.
531  * @fd: a file descriptor.
532  * @active: a new status.
533  *
534  * Control whether the descriptor @fd in @set will be monitored for
535  * readability.
536  *
537  * Returns: %TRUE if the descriptor was successfully updated.
538  *
539  * Since: 0.10.18
540  */
541 gboolean
542 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
543 {
544   gboolean ret;
545
546   g_return_val_if_fail (set != NULL, FALSE);
547   g_return_val_if_fail (fd != NULL, FALSE);
548   g_return_val_if_fail (fd->fd >= 0, FALSE);
549
550   g_mutex_lock (set->lock);
551
552   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
553
554   g_mutex_unlock (set->lock);
555
556   return ret;
557 }
558
559 /**
560  * gst_poll_fd_has_closed:
561  * @set: a file descriptor set.
562  * @fd: a file descriptor.
563  *
564  * Check if @fd in @set has closed the connection.
565  *
566  * Returns: %TRUE if the connection was closed.
567  *
568  * Since: 0.10.18
569  */
570 gboolean
571 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
572 {
573   gboolean res = FALSE;
574   gint idx;
575
576   g_return_val_if_fail (set != NULL, FALSE);
577   g_return_val_if_fail (fd != NULL, FALSE);
578   g_return_val_if_fail (fd->fd >= 0, FALSE);
579
580   g_mutex_lock (set->lock);
581
582   idx = find_index (set->active_fds, fd);
583   if (idx >= 0) {
584     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
585
586     res = (pfd->revents & POLLHUP) != 0;
587   }
588
589   g_mutex_unlock (set->lock);
590
591   return res;
592 }
593
594 /**
595  * gst_poll_fd_has_error:
596  * @set: a file descriptor set.
597  * @fd: a file descriptor.
598  *
599  * Check if @fd in @set has an error.
600  *
601  * Returns: %TRUE if the descriptor has an error.
602  *
603  * Since: 0.10.18
604  */
605 gboolean
606 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
607 {
608   gboolean res = FALSE;
609   gint idx;
610
611   g_return_val_if_fail (set != NULL, FALSE);
612   g_return_val_if_fail (fd != NULL, FALSE);
613   g_return_val_if_fail (fd->fd >= 0, FALSE);
614
615   g_mutex_lock (set->lock);
616
617   idx = find_index (set->active_fds, fd);
618   if (idx >= 0) {
619     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
620
621     res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
622   }
623
624   g_mutex_unlock (set->lock);
625
626   return res;
627 }
628
629 static gboolean
630 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
631 {
632   gboolean res = FALSE;
633   gint idx;
634
635   idx = find_index (set->active_fds, fd);
636   if (idx >= 0) {
637     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
638
639     res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
640   }
641
642   return res;
643 }
644
645 /**
646  * gst_poll_fd_can_read:
647  * @set: a file descriptor set.
648  * @fd: a file descriptor.
649  *
650  * Check if @fd in @set has data to be read.
651  *
652  * Returns: %TRUE if the descriptor has data to be read.
653  *
654  * Since: 0.10.18
655  */
656 gboolean
657 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
658 {
659   gboolean res = FALSE;
660
661   g_return_val_if_fail (set != NULL, FALSE);
662   g_return_val_if_fail (fd != NULL, FALSE);
663   g_return_val_if_fail (fd->fd >= 0, FALSE);
664
665   g_mutex_lock (set->lock);
666
667   res = gst_poll_fd_can_read_unlocked (set, fd);
668
669   g_mutex_unlock (set->lock);
670
671   return res;
672 }
673
674 /**
675  * gst_poll_fd_can_write:
676  * @set: a file descriptor set.
677  * @fd: a file descriptor.
678  *
679  * Check if @fd in @set can be used for writing.
680  *
681  * Returns: %TRUE if the descriptor can be used for writing.
682  *
683  * Since: 0.10.18
684  */
685 gboolean
686 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
687 {
688   gboolean res = FALSE;
689   gint idx;
690
691   g_return_val_if_fail (set != NULL, FALSE);
692   g_return_val_if_fail (fd != NULL, FALSE);
693   g_return_val_if_fail (fd->fd >= 0, FALSE);
694
695   g_mutex_lock (set->lock);
696
697   idx = find_index (set->active_fds, fd);
698   if (idx >= 0) {
699     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
700
701     res = (pfd->revents & POLLOUT) != 0;
702   }
703
704   g_mutex_unlock (set->lock);
705
706   return res;
707 }
708
709 /**
710  * gst_poll_wait:
711  * @set: a #GstPoll.
712  * @timeout: a timeout in nanoseconds.
713  *
714  * Wait for activity on the file descriptors in @set. This function waits up to
715  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
716  *
717  * When this function is called from multiple threads, -1 will be returned with
718  * errno set to EPERM.
719  *
720  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
721  * activity was detected after @timeout. If an error occurs, -1 is returned
722  * and errno is set.
723  *
724  * Since: 0.10.18
725  */
726 gint
727 gst_poll_wait (GstPoll * set, GstClockTime timeout)
728 {
729   gboolean restarting;
730   int res;
731
732   g_return_val_if_fail (set != NULL, -1);
733
734   g_mutex_lock (set->lock);
735
736   /* we cannot wait from multiple threads */
737   if (set->waiting)
738     goto already_waiting;
739
740   /* flushing, exit immediatly */
741   if (set->flushing)
742     goto flushing;
743
744   set->waiting = TRUE;
745
746   do {
747     GstPollMode mode;
748
749     res = -1;
750     restarting = FALSE;
751
752     mode = choose_mode (set, timeout);
753
754     g_array_set_size (set->active_fds, set->fds->len);
755     memcpy (set->active_fds->data, set->fds->data,
756         set->fds->len * sizeof (struct pollfd));
757     g_mutex_unlock (set->lock);
758
759     switch (mode) {
760       case GST_POLL_MODE_AUTO:
761         g_assert_not_reached ();
762         break;
763       case GST_POLL_MODE_PPOLL:
764       {
765 #ifdef HAVE_PPOLL
766         struct timespec ts;
767         struct timespec *tsptr;
768
769         if (timeout != GST_CLOCK_TIME_NONE) {
770           GST_TIME_TO_TIMESPEC (timeout, ts);
771           tsptr = &ts;
772         } else {
773           tsptr = NULL;
774         }
775
776         res =
777             ppoll ((struct pollfd *) set->active_fds->data,
778             set->active_fds->len, tsptr, NULL);
779 #else
780         g_assert_not_reached ();
781         errno = ENOSYS;
782 #endif
783         break;
784       }
785       case GST_POLL_MODE_POLL:
786       {
787 #ifdef HAVE_POLL
788         gint t;
789
790         if (timeout != GST_CLOCK_TIME_NONE) {
791           t = GST_TIME_AS_MSECONDS (timeout);
792         } else {
793           t = -1;
794         }
795
796         res =
797             poll ((struct pollfd *) set->active_fds->data,
798             set->active_fds->len, t);
799 #else
800         g_assert_not_reached ();
801         errno = ENOSYS;
802 #endif
803         break;
804       }
805       case GST_POLL_MODE_PSELECT:
806 #ifndef HAVE_PSELECT
807       {
808         g_assert_not_reached ();
809         errno = ENOSYS;
810         break;
811       }
812 #endif
813       case GST_POLL_MODE_SELECT:
814       {
815         fd_set readfds;
816         fd_set writefds;
817         gint max_fd;
818
819         max_fd = pollfd_to_fd_set (set, &readfds, &writefds);
820
821         if (mode == GST_POLL_MODE_SELECT) {
822           struct timeval tv;
823           struct timeval *tvptr;
824
825           if (timeout != GST_CLOCK_TIME_NONE) {
826             GST_TIME_TO_TIMEVAL (timeout, tv);
827             tvptr = &tv;
828           } else {
829             tvptr = NULL;
830           }
831
832           res = select (max_fd + 1, &readfds, &writefds, NULL, tvptr);
833         } else {
834 #ifdef HAVE_PSELECT
835           struct timespec ts;
836           struct timespec *tsptr;
837
838           if (timeout != GST_CLOCK_TIME_NONE) {
839             GST_TIME_TO_TIMESPEC (timeout, ts);
840             tsptr = &ts;
841           } else {
842             tsptr = NULL;
843           }
844
845           res = pselect (max_fd + 1, &readfds, &writefds, NULL, tsptr, NULL);
846 #endif
847         }
848
849         if (res > 0) {
850           fd_set_to_pollfd (set, &readfds, &writefds);
851         }
852
853         break;
854       }
855     }
856
857     g_mutex_lock (set->lock);
858
859     /* check if the poll/select was aborted due to a command */
860     if (res > 0 && set->controllable) {
861       while (TRUE) {
862         guchar cmd;
863         gint result;
864
865         /* we do not check the read status of the control socket here because
866          * there may have been a write to the socket between the time the
867          * poll/select finished and before we got the mutex back, and we need
868          * to clear out the control socket before leaving */
869         READ_COMMAND (set, cmd, result);
870         if (result <= 0) {
871           /* no more commands, quit the loop */
872           break;
873         }
874
875         /* if the control socket is the only socket with activity when we get
876          * here, we restart the _wait operation, else we allow the caller to
877          * process the other file descriptors */
878         if (res == 1 &&
879             gst_poll_fd_can_read_unlocked (set, &set->control_read_fd))
880           restarting = TRUE;
881       }
882     }
883
884     /* update the controllable state if needed */
885     set->controllable = set->new_controllable;
886
887     if (set->flushing) {
888       /* we got woken up and we are flushing, we need to stop */
889       errno = EBUSY;
890       res = -1;
891       break;
892     }
893   } while (restarting);
894
895   set->waiting = FALSE;
896
897   g_mutex_unlock (set->lock);
898
899   return res;
900
901   /* ERRORS */
902 already_waiting:
903   {
904     g_mutex_unlock (set->lock);
905     errno = EPERM;
906     return -1;
907   }
908 flushing:
909   {
910     g_mutex_unlock (set->lock);
911     errno = EBUSY;
912     return -1;
913   }
914 }
915
916 /**
917  * gst_poll_set_controllable:
918  * @set: a #GstPoll.
919  * @controllable: new controllable state.
920  *
921  * When @controllable is %TRUE, this function ensures that future calls to
922  * gst_poll_wait() will be affected by gst_poll_restart() and
923  * gst_poll_set_flushing().
924  *
925  * Returns: %TRUE if the controllability of @set could be updated.
926  *
927  * Since: 0.10.18
928  */
929 gboolean
930 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
931 {
932   g_return_val_if_fail (set != NULL, FALSE);
933
934   g_mutex_lock (set->lock);
935
936   if (controllable && set->control_read_fd.fd < 0) {
937     gint control_sock[2];
938
939 #ifdef G_OS_WIN32
940     gulong flags = 1;
941
942     if (_pipe (control_sock, 4096, _O_BINARY) < 0)
943       goto no_socket_pair;
944
945     ioctlsocket (control_sock[0], FIONBIO, &flags);
946     ioctlsocket (control_sock[1], FIONBIO, &flags);
947 #else
948     if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
949       goto no_socket_pair;
950
951     fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
952     fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
953 #endif
954     set->control_read_fd.fd = control_sock[0];
955     set->control_write_fd.fd = control_sock[1];
956
957     gst_poll_add_fd_unlocked (set, &set->control_read_fd);
958   }
959
960   if (set->control_read_fd.fd >= 0)
961     gst_poll_fd_ctl_read_unlocked (set, &set->control_read_fd, controllable);
962
963   /* delay the change of the controllable state if we are waiting */
964   set->new_controllable = controllable;
965   if (!set->waiting)
966     set->controllable = controllable;
967
968   g_mutex_unlock (set->lock);
969
970   return TRUE;
971
972   /* ERRORS */
973 no_socket_pair:
974   {
975     g_mutex_unlock (set->lock);
976     return FALSE;
977   }
978 }
979
980 /**
981  * gst_poll_restart:
982  * @set: a #GstPoll.
983  *
984  * Restart any gst_poll_wait() that is in progress. This function is typically
985  * used after adding or removing descriptors to @set.
986  *
987  * If @set is not controllable, then this call will have no effect.
988  *
989  * Since: 0.10.18
990  */
991 void
992 gst_poll_restart (GstPoll * set)
993 {
994   g_return_if_fail (set != NULL);
995
996   g_mutex_lock (set->lock);
997
998   if (set->controllable && set->waiting) {
999     /* if we are waiting, we can send the command, else we do not have to
1000      * bother, future calls will automatically pick up the new fdset */
1001     SEND_COMMAND (set, GST_POLL_CMD_WAKEUP);
1002   }
1003
1004   g_mutex_unlock (set->lock);
1005 }
1006
1007 /**
1008  * gst_poll_set_flushing:
1009  * @set: a #GstPoll.
1010  * @flushing: new flushing state.
1011  *
1012  * When @flushing is %TRUE, this function ensures that current and future calls
1013  * to gst_poll_wait() will return -1, with errno set to EBUSY.
1014  *
1015  * Unsetting the flushing state will restore normal operation of @set.
1016  *
1017  * Since: 0.10.18
1018  */
1019 void
1020 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1021 {
1022   g_return_if_fail (set != NULL);
1023
1024   g_mutex_lock (set->lock);
1025
1026   /* update the new state first */
1027   set->flushing = flushing;
1028
1029   if (flushing && set->controllable && set->waiting) {
1030     /* we are flushing, controllable and waiting, wake up the waiter */
1031     SEND_COMMAND (set, GST_POLL_CMD_WAKEUP);
1032   }
1033
1034   g_mutex_unlock (set->lock);
1035 }