structure: Update doc error in ARRAY/LIST helpers
[platform/upstream/gstreamer.git] / gst / gstpoll.c
index b58e403..5e0a668 100644 (file)
@@ -2,7 +2,7 @@
  * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
  * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
  * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
- * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
+ * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
  *
  * gstpoll.c: File descriptor set
  *
  *
  * You should have received a copy of the GNU Library General Public
  * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
  */
 /**
  * SECTION:gstpoll
+ * @title: GstPoll
  * @short_description: Keep track of file descriptors and make it possible
- *                     to wait on them in a cancelable way
+ *                     to wait on them in a cancellable way
  *
  * A #GstPoll keeps track of file descriptors much like fd_set (used with
  * select()) or a struct pollfd array (used with poll()). Once created with
  * gst_poll_new(), the set can be used to wait for file descriptors to be
- * readable and/or writeable. It is possible to make this wait be controlled
+ * readable and/or writable. It is possible to make this wait be controlled
  * by specifying %TRUE for the @controllable flag when creating the set (or
  * later calling gst_poll_set_controllable()).
  *
  * New file descriptors are added to the set using gst_poll_add_fd(), and
  * removed using gst_poll_remove_fd(). Controlling which file descriptors
- * should be waited for to become readable and/or writeable are done using
- * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
+ * should be waited for to become readable and/or writable are done using
+ * gst_poll_fd_ctl_read(), gst_poll_fd_ctl_write() and gst_poll_fd_ctl_pri().
  *
  * Use gst_poll_wait() to wait for the file descriptors to actually become
- * readable and/or writeable, or to timeout if no file descriptor is available
+ * readable and/or writable, or to timeout if no file descriptor is available
  * in time. The wait can be controlled by calling gst_poll_restart() and
  * gst_poll_set_flushing().
  *
 #include "config.h"
 #endif
 
-#ifndef _MSC_VER
-#define _GNU_SOURCE 1
-#include <sys/poll.h>
-#include <sys/time.h>
-#endif
+#include "gst_private.h"
+#include "glib-compat-private.h"
 
 #include <sys/types.h>
 
 
 #ifdef G_OS_WIN32
 #include <winsock2.h>
-#define EINPROGRESS WSAEINPROGRESS
 #else
+#define _GNU_SOURCE 1
+#ifdef HAVE_SYS_POLL_H
+#include <sys/poll.h>
+#endif
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
+#include <sys/time.h>
 #include <sys/socket.h>
 #endif
 
+#ifdef G_OS_WIN32
+#  ifndef EWOULDBLOCK
+#  define EWOULDBLOCK EAGAIN    /* This is just to placate gcc */
+#  endif
+#endif /* G_OS_WIN32 */
+
 /* OS/X needs this because of bad headers */
 #include <string.h>
 
-#include "gst_private.h"
-
-#include "gstpoll.h"
-
-#ifndef G_OS_WIN32
-/* the poll/select call is also performed on a control socket, that way
- * we can send special commands to control it
+/* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
+ * so we prefer our own poll emulation.
  */
-#define SEND_COMMAND(set, command)                   \
-G_STMT_START {                                       \
-  unsigned char c = command;                         \
-  write (set->control_write_fd.fd, &c, 1);           \
-} G_STMT_END
+#if defined(BROKEN_POLL)
+#undef HAVE_POLL
+#endif
 
-#define READ_COMMAND(set, command, res)              \
-G_STMT_START {                                       \
-  res = read (set->control_read_fd.fd, &command, 1); \
-} G_STMT_END
+#include "gstpoll.h"
 
-#define GST_POLL_CMD_WAKEUP  'W'        /* restart the poll/select call */
+#define GST_CAT_DEFAULT GST_CAT_POLL
 
-#else /* G_OS_WIN32 */
+#ifdef G_OS_WIN32
 typedef struct _WinsockFd WinsockFd;
 
 struct _WinsockFd
@@ -130,16 +131,19 @@ struct _GstPoll
 {
   GstPollMode mode;
 
-  GMutex *lock;
-
+  GMutex lock;
+  /* array of fds, always written to and read from with lock */
   GArray *fds;
+  /* array of active fds, only written to from the waiting thread with the
+   * lock and read from with the lock or without the lock from the waiting
+   * thread */
   GArray *active_fds;
+
 #ifndef G_OS_WIN32
   GstPollFD control_read_fd;
   GstPollFD control_write_fd;
 #else
   GArray *active_fds_ignored;
-
   GArray *events;
   GArray *active_events;
 
@@ -147,11 +151,206 @@ struct _GstPoll
 #endif
 
   gboolean controllable;
-  gboolean new_controllable;
-  gboolean waiting;
-  gboolean flushing;
+  volatile gint waiting;
+  volatile gint control_pending;
+  volatile gint flushing;
+  gboolean timer;
+  volatile gint rebuild;
 };
 
+static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
+    gboolean active);
+static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
+
+#define IS_FLUSHING(s)      (g_atomic_int_get(&(s)->flushing))
+#define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
+
+#define INC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, 1))
+#define DEC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, -1))
+#define GET_WAITING(s)      (g_atomic_int_get(&(s)->waiting))
+
+#define TEST_REBUILD(s)     (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
+#define MARK_REBUILD(s)     (g_atomic_int_set(&(s)->rebuild, 1))
+
+#ifndef G_OS_WIN32
+
+static gboolean
+wake_event (GstPoll * set)
+{
+  ssize_t num_written;
+  while ((num_written = write (set->control_write_fd.fd, "W", 1)) != 1) {
+    if (num_written == -1 && errno != EAGAIN && errno != EINTR) {
+      g_critical ("%p: failed to wake event: %s", set, strerror (errno));
+      return FALSE;
+    }
+  }
+  return TRUE;
+}
+
+static gboolean
+release_event (GstPoll * set)
+{
+  gchar buf[1] = { '\0' };
+  ssize_t num_read;
+  while ((num_read = read (set->control_read_fd.fd, buf, 1)) != 1) {
+    if (num_read == -1 && errno != EAGAIN && errno != EINTR) {
+      g_critical ("%p: failed to release event: %s", set, strerror (errno));
+      return FALSE;
+    }
+  }
+  return TRUE;
+}
+
+#else
+
+static void
+format_last_error (gchar * buf, size_t buf_len)
+{
+  DWORD flags = FORMAT_MESSAGE_FROM_SYSTEM;
+  LPCVOID src = NULL;
+  DWORD lang = 0;
+  DWORD id;
+  id = GetLastError ();
+  FormatMessage (flags, src, id, lang, buf, (DWORD) buf_len, NULL);
+  SetLastError (id);
+}
+
+static gboolean
+wake_event (GstPoll * set)
+{
+  SetLastError (0);
+  errno = 0;
+  if (!SetEvent (set->wakeup_event)) {
+    gchar msg[1024] = "<unknown>";
+    format_last_error (msg, sizeof (msg));
+    g_critical ("%p: failed to set wakup_event: %s", set, msg);
+    errno = EBADF;
+    return FALSE;
+  }
+
+  return TRUE;
+}
+
+static gboolean
+release_event (GstPoll * set)
+{
+  DWORD status;
+  SetLastError (0);
+  errno = 0;
+
+  status = WaitForSingleObject (set->wakeup_event, INFINITE);
+  if (status) {
+    const gchar *reason = "unknown";
+    gchar msg[1024] = "<unknown>";
+    switch (status) {
+      case WAIT_ABANDONED:
+        reason = "WAIT_ABANDONED";
+        break;
+      case WAIT_TIMEOUT:
+        reason = "WAIT_TIMEOUT";
+        break;
+      case WAIT_FAILED:
+        format_last_error (msg, sizeof (msg));
+        reason = msg;
+        break;
+      default:
+        reason = "other";
+        break;
+    }
+    g_critical ("%p: failed to block on wakup_event: %s", set, reason);
+    errno = EBADF;
+    return FALSE;
+  }
+
+  if (!ResetEvent (set->wakeup_event)) {
+    gchar msg[1024] = "<unknown>";
+    format_last_error (msg, sizeof (msg));
+    g_critical ("%p: failed to reset wakup_event: %s", set, msg);
+    errno = EBADF;
+    return FALSE;
+  }
+
+  return TRUE;
+}
+
+#endif
+
+/* the poll/select call is also performed on a control socket, that way
+ * we can send special commands to control it */
+static inline gboolean
+raise_wakeup (GstPoll * set)
+{
+  gboolean result = TRUE;
+
+  /* makes testing control_pending and WAKE_EVENT() atomic. */
+  g_mutex_lock (&set->lock);
+
+  if (set->control_pending == 0) {
+    /* raise when nothing pending */
+    GST_LOG ("%p: raise", set);
+    result = wake_event (set);
+  }
+
+  if (result) {
+    set->control_pending++;
+  }
+
+  g_mutex_unlock (&set->lock);
+
+  return result;
+}
+
+static inline gboolean
+release_wakeup (GstPoll * set)
+{
+  gboolean result = FALSE;
+
+  /* makes testing/modifying control_pending and RELEASE_EVENT() atomic. */
+  g_mutex_lock (&set->lock);
+
+  if (set->control_pending > 0) {
+    /* release, only if this was the last pending. */
+    if (set->control_pending == 1) {
+      GST_LOG ("%p: release", set);
+      result = release_event (set);
+    } else {
+      result = TRUE;
+    }
+
+    if (result) {
+      set->control_pending--;
+    }
+  } else {
+    errno = EWOULDBLOCK;
+  }
+
+  g_mutex_unlock (&set->lock);
+
+  return result;
+}
+
+static inline gint
+release_all_wakeup (GstPoll * set)
+{
+  gint old;
+
+  /* makes testing control_pending and RELEASE_EVENT() atomic. */
+  g_mutex_lock (&set->lock);
+
+  if ((old = set->control_pending) > 0) {
+    GST_LOG ("%p: releasing %d", set, old);
+    if (release_event (set)) {
+      set->control_pending = 0;
+    } else {
+      old = 0;
+    }
+  }
+
+  g_mutex_unlock (&set->lock);
+
+  return old;
+}
+
 static gint
 find_index (GArray * array, GstPollFD * fd)
 {
@@ -196,18 +395,26 @@ find_index (GArray * array, GstPollFD * fd)
 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
 /* check if all file descriptors will fit in an fd_set */
 static gboolean
-selectable_fds (const GstPoll * set)
+selectable_fds (GstPoll * set)
 {
   guint i;
 
+  g_mutex_lock (&set->lock);
   for (i = 0; i < set->fds->len; i++) {
     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
 
     if (pfd->fd >= FD_SETSIZE)
-      return FALSE;
+      goto too_many;
   }
+  g_mutex_unlock (&set->lock);
 
   return TRUE;
+
+too_many:
+  {
+    g_mutex_unlock (&set->lock);
+    return FALSE;
+  }
 }
 
 /* check if the timeout will convert to a timeout value used for poll()
@@ -228,7 +435,7 @@ pollable_timeout (GstClockTime timeout)
 #endif
 
 static GstPollMode
-choose_mode (const GstPoll * set, GstClockTime timeout)
+choose_mode (GstPoll * set, GstClockTime timeout)
 {
   GstPollMode mode;
 
@@ -258,15 +465,17 @@ choose_mode (const GstPoll * set, GstClockTime timeout)
 
 #ifndef G_OS_WIN32
 static gint
-pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds)
+pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
+    fd_set * errorfds)
 {
   gint max_fd = -1;
   guint i;
 
   FD_ZERO (readfds);
   FD_ZERO (writefds);
+  FD_ZERO (errorfds);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&set->lock);
 
   for (i = 0; i < set->active_fds->len; i++) {
     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
@@ -276,35 +485,41 @@ pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds)
         FD_SET (pfd->fd, readfds);
       if (pfd->events & POLLOUT)
         FD_SET (pfd->fd, writefds);
-      if (pfd->fd > max_fd)
+      if (pfd->events)
+        FD_SET (pfd->fd, errorfds);
+      if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
         max_fd = pfd->fd;
     }
   }
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&set->lock);
 
   return max_fd;
 }
 
 static void
-fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds)
+fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
+    fd_set * errorfds)
 {
   guint i;
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&set->lock);
 
   for (i = 0; i < set->active_fds->len; i++) {
     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
 
     if (pfd->fd < FD_SETSIZE) {
+      pfd->revents = 0;
       if (FD_ISSET (pfd->fd, readfds))
         pfd->revents |= POLLIN;
       if (FD_ISSET (pfd->fd, writefds))
         pfd->revents |= POLLOUT;
+      if (FD_ISSET (pfd->fd, errorfds))
+        pfd->revents |= POLLERR;
     }
   }
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&set->lock);
 }
 #else /* G_OS_WIN32 */
 /*
@@ -445,31 +660,44 @@ gst_poll_collect_winsock_events (GstPoll * set)
 #endif
 
 /**
- * gst_poll_new:
+ * gst_poll_new: (skip)
  * @controllable: whether it should be possible to control a wait.
  *
  * Create a new file descriptor set. If @controllable, it
  * is possible to restart or flush a call to gst_poll_wait() with
  * gst_poll_restart() and gst_poll_set_flushing() respectively.
  *
- * Returns: a new #GstPoll, or %NULL in case of an error. Free with
- * gst_poll_free().
+ * Free-function: gst_poll_free
  *
- * Since: 0.10.18
+ * Returns: (transfer full) (nullable): a new #GstPoll, or %NULL in
+ *     case of an error.  Free with gst_poll_free().
  */
 GstPoll *
 gst_poll_new (gboolean controllable)
 {
   GstPoll *nset;
 
-  nset = g_new0 (GstPoll, 1);
-  nset->lock = g_mutex_new ();
+  nset = g_slice_new0 (GstPoll);
+  GST_DEBUG ("%p: new controllable : %d", nset, controllable);
+  g_mutex_init (&nset->lock);
 #ifndef G_OS_WIN32
   nset->mode = GST_POLL_MODE_AUTO;
   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
   nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
   nset->control_read_fd.fd = -1;
   nset->control_write_fd.fd = -1;
+  {
+    gint control_sock[2];
+
+    if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
+      goto no_socket_pair;
+
+    nset->control_read_fd.fd = control_sock[0];
+    nset->control_write_fd.fd = control_sock[1];
+
+    gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
+    gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
+  }
 #else
   nset->mode = GST_POLL_MODE_WINDOWS;
   nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
@@ -481,32 +709,68 @@ gst_poll_new (gboolean controllable)
   nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
 #endif
 
-  if (!gst_poll_set_controllable (nset, controllable))
-    goto not_controllable;
+  /* ensure (re)build, though already sneakily set in non-windows case */
+  MARK_REBUILD (nset);
+
+  nset->controllable = controllable;
+  nset->control_pending = 0;
 
   return nset;
 
   /* ERRORS */
-not_controllable:
+#ifndef G_OS_WIN32
+no_socket_pair:
   {
+    GST_WARNING ("%p: can't create socket pair !", nset);
     gst_poll_free (nset);
     return NULL;
   }
+#endif
+}
+
+/**
+ * gst_poll_new_timer: (skip)
+ *
+ * Create a new poll object that can be used for scheduling cancellable
+ * timeouts.
+ *
+ * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
+ * performed from different threads.
+ *
+ * Free-function: gst_poll_free
+ *
+ * Returns: (transfer full) (nullable): a new #GstPoll, or %NULL in
+ *     case of an error.  Free with gst_poll_free().
+ */
+GstPoll *
+gst_poll_new_timer (void)
+{
+  GstPoll *poll;
+
+  /* make a new controllable poll set */
+  if (!(poll = gst_poll_new (TRUE)))
+    goto done;
+
+  /* we are a timer */
+  poll->timer = TRUE;
+
+done:
+  return poll;
 }
 
 /**
  * gst_poll_free:
- * @set: a file descriptor set.
+ * @set: (transfer full): a file descriptor set.
  *
  * Free a file descriptor set.
- *
- * Since: 0.10.18
  */
 void
 gst_poll_free (GstPoll * set)
 {
   g_return_if_fail (set != NULL);
 
+  GST_DEBUG ("%p: freeing", set);
+
 #ifndef G_OS_WIN32
   if (set->control_write_fd.fd >= 0)
     close (set->control_write_fd.fd);
@@ -529,8 +793,35 @@ gst_poll_free (GstPoll * set)
 
   g_array_free (set->active_fds, TRUE);
   g_array_free (set->fds, TRUE);
-  g_mutex_free (set->lock);
-  g_free (set);
+  g_mutex_clear (&set->lock);
+  g_slice_free (GstPoll, set);
+}
+
+/**
+ * gst_poll_get_read_gpollfd:
+ * @set: a #GstPoll
+ * @fd: a #GPollFD
+ *
+ * Get a GPollFD for the reading part of the control socket. This is useful when
+ * integrating with a GSource and GMainLoop.
+ */
+void
+gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
+{
+  g_return_if_fail (set != NULL);
+  g_return_if_fail (fd != NULL);
+
+#ifndef G_OS_WIN32
+  fd->fd = set->control_read_fd.fd;
+#else
+#if GLIB_SIZEOF_VOID_P == 8
+  fd->fd = (gint64) set->wakeup_event;
+#else
+  fd->fd = (gint) set->wakeup_event;
+#endif
+#endif
+  fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
+  fd->revents = 0;
 }
 
 /**
@@ -539,8 +830,6 @@ gst_poll_free (GstPoll * set)
  *
  * Initializes @fd. Alternatively you can initialize it with
  * #GST_POLL_FD_INIT.
- *
- * Since: 0.10.18
  */
 void
 gst_poll_fd_init (GstPollFD * fd)
@@ -556,6 +845,8 @@ gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
 {
   gint idx;
 
+  GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
+
   idx = find_index (set->fds, fd);
   if (idx < 0) {
 #ifndef G_OS_WIN32
@@ -583,6 +874,9 @@ gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
 
     fd->idx = set->fds->len - 1;
 #endif
+    MARK_REBUILD (set);
+  } else {
+    GST_WARNING ("%p: fd already added !", set);
   }
 
   return TRUE;
@@ -596,8 +890,6 @@ gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
  * Add a file descriptor to the file descriptor set.
  *
  * Returns: %TRUE if the file descriptor was successfully added to the set.
- *
- * Since: 0.10.18
  */
 gboolean
 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
@@ -608,11 +900,11 @@ gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
   g_return_val_if_fail (fd != NULL, FALSE);
   g_return_val_if_fail (fd->fd >= 0, FALSE);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&set->lock);
 
   ret = gst_poll_add_fd_unlocked (set, fd);
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&set->lock);
 
   return ret;
 }
@@ -625,8 +917,6 @@ gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
  * Remove a file descriptor from the file descriptor set.
  *
  * Returns: %TRUE if the file descriptor was successfully removed from the set.
- *
- * Since: 0.10.18
  */
 gboolean
 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
@@ -637,7 +927,10 @@ gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
   g_return_val_if_fail (fd != NULL, FALSE);
   g_return_val_if_fail (fd->fd >= 0, FALSE);
 
-  g_mutex_lock (set->lock);
+
+  GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
+
+  g_mutex_lock (&set->lock);
 
   /* get the index, -1 is an fd that is not added */
   idx = find_index (set->fds, fd);
@@ -653,9 +946,12 @@ gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
 
     /* mark fd as removed by setting the index to -1 */
     fd->idx = -1;
+    MARK_REBUILD (set);
+  } else {
+    GST_WARNING ("%p: couldn't find fd !", set);
   }
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&set->lock);
 
   return idx >= 0;
 }
@@ -670,8 +966,6 @@ gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
  * writability.
  *
  * Returns: %TRUE if the descriptor was successfully updated.
- *
- * Since: 0.10.18
  */
 gboolean
 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
@@ -682,7 +976,10 @@ gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
   g_return_val_if_fail (fd != NULL, FALSE);
   g_return_val_if_fail (fd->fd >= 0, FALSE);
 
-  g_mutex_lock (set->lock);
+  GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
+      fd->fd, fd->idx, active);
+
+  g_mutex_lock (&set->lock);
 
   idx = find_index (set->fds, fd);
   if (idx >= 0) {
@@ -693,12 +990,18 @@ gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
       pfd->events |= POLLOUT;
     else
       pfd->events &= ~POLLOUT;
+
+    GST_LOG ("%p: pfd->events now %d (POLLOUT:%d)", set, pfd->events, POLLOUT);
 #else
-    gst_poll_update_winsock_event_mask (set, idx, FD_WRITE, active);
+    gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
+        active);
 #endif
+    MARK_REBUILD (set);
+  } else {
+    GST_WARNING ("%p: couldn't find fd !", set);
   }
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&set->lock);
 
   return idx >= 0;
 }
@@ -708,6 +1011,9 @@ gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
 {
   gint idx;
 
+  GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
+      fd->fd, fd->idx, active);
+
   idx = find_index (set->fds, fd);
 
   if (idx >= 0) {
@@ -715,12 +1021,15 @@ gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
 
     if (active)
-      pfd->events |= (POLLIN | POLLPRI);
+      pfd->events |= POLLIN;
     else
-      pfd->events &= ~(POLLIN | POLLPRI);
+      pfd->events &= ~POLLIN;
 #else
     gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
 #endif
+    MARK_REBUILD (set);
+  } else {
+    GST_WARNING ("%p: couldn't find fd !", set);
   }
 
   return idx >= 0;
@@ -736,8 +1045,6 @@ gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
  * readability.
  *
  * Returns: %TRUE if the descriptor was successfully updated.
- *
- * Since: 0.10.18
  */
 gboolean
 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
@@ -748,16 +1055,69 @@ gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
   g_return_val_if_fail (fd != NULL, FALSE);
   g_return_val_if_fail (fd->fd >= 0, FALSE);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&set->lock);
 
   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&set->lock);
 
   return ret;
 }
 
 /**
+ * gst_poll_fd_ctl_pri:
+ * @set: a file descriptor set.
+ * @fd: a file descriptor.
+ * @active: a new status.
+ *
+ * Control whether the descriptor @fd in @set will be monitored for
+ * exceptional conditions (POLLPRI).
+ *
+ * Not implemented on Windows (will just return %FALSE there).
+ *
+ * Returns: %TRUE if the descriptor was successfully updated.
+ *
+ * Since: 1.16
+ */
+gboolean
+gst_poll_fd_ctl_pri (GstPoll * set, GstPollFD * fd, gboolean active)
+{
+#ifdef G_OS_WIN32
+  return FALSE;
+#else
+  gint idx;
+
+  g_return_val_if_fail (set != NULL, FALSE);
+  g_return_val_if_fail (fd != NULL, FALSE);
+  g_return_val_if_fail (fd->fd >= 0, FALSE);
+
+  GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
+      fd->fd, fd->idx, active);
+
+  g_mutex_lock (&set->lock);
+
+  idx = find_index (set->fds, fd);
+  if (idx >= 0) {
+    struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
+
+    if (active)
+      pfd->events |= POLLPRI;
+    else
+      pfd->events &= ~POLLPRI;
+
+    GST_LOG ("%p: pfd->events now %d (POLLPRI:%d)", set, pfd->events, POLLOUT);
+    MARK_REBUILD (set);
+  } else {
+    GST_WARNING ("%p: couldn't find fd !", set);
+  }
+
+  g_mutex_unlock (&set->lock);
+
+  return idx >= 0;
+#endif
+}
+
+/**
  * gst_poll_fd_ignored:
  * @set: a file descriptor set.
  * @fd: a file descriptor.
@@ -770,8 +1130,6 @@ gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
  * The reason why this is needed is because the underlying implementation
  * might not allow querying the fd more than once between calls to one of
  * the re-enabling operations.
- *
- * Since: 0.10.18
  */
 void
 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
@@ -783,16 +1141,17 @@ gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
   g_return_if_fail (fd != NULL);
   g_return_if_fail (fd->fd >= 0);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&set->lock);
 
   idx = find_index (set->fds, fd);
   if (idx >= 0) {
     WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
 
     wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
+    MARK_REBUILD (set);
   }
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&set->lock);
 #endif
 }
 
@@ -804,8 +1163,6 @@ gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
  * Check if @fd in @set has closed the connection.
  *
  * Returns: %TRUE if the connection was closed.
- *
- * Since: 0.10.18
  */
 gboolean
 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
@@ -817,7 +1174,7 @@ gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
   g_return_val_if_fail (fd != NULL, FALSE);
   g_return_val_if_fail (fd->fd >= 0, FALSE);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&((GstPoll *) set)->lock);
 
   idx = find_index (set->active_fds, fd);
   if (idx >= 0) {
@@ -830,9 +1187,12 @@ gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
 
     res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
 #endif
+  } else {
+    GST_WARNING ("%p: couldn't find fd !", set);
   }
+  g_mutex_unlock (&((GstPoll *) set)->lock);
 
-  g_mutex_unlock (set->lock);
+  GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
 
   return res;
 }
@@ -845,8 +1205,6 @@ gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
  * Check if @fd in @set has an error.
  *
  * Returns: %TRUE if the descriptor has an error.
- *
- * Since: 0.10.18
  */
 gboolean
 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
@@ -858,7 +1216,7 @@ gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
   g_return_val_if_fail (fd != NULL, FALSE);
   g_return_val_if_fail (fd->fd >= 0, FALSE);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&((GstPoll *) set)->lock);
 
   idx = find_index (set->active_fds, fd);
   if (idx >= 0) {
@@ -872,11 +1230,15 @@ gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
     res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
         (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
         (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
-        (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0);
+        (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
+        (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
 #endif
+  } else {
+    GST_WARNING ("%p: couldn't find fd !", set);
   }
+  g_mutex_unlock (&((GstPoll *) set)->lock);
 
-  g_mutex_unlock (set->lock);
+  GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
 
   return res;
 }
@@ -892,13 +1254,16 @@ gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
 #ifndef G_OS_WIN32
     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
 
-    res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
+    res = (pfd->revents & POLLIN) != 0;
 #else
     WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
 
     res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
 #endif
+  } else {
+    GST_WARNING ("%p: couldn't find fd !", set);
   }
+  GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
 
   return res;
 }
@@ -911,8 +1276,6 @@ gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
  * Check if @fd in @set has data to be read.
  *
  * Returns: %TRUE if the descriptor has data to be read.
- *
- * Since: 0.10.18
  */
 gboolean
 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
@@ -923,11 +1286,11 @@ gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
   g_return_val_if_fail (fd != NULL, FALSE);
   g_return_val_if_fail (fd->fd >= 0, FALSE);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&((GstPoll *) set)->lock);
 
   res = gst_poll_fd_can_read_unlocked (set, fd);
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&((GstPoll *) set)->lock);
 
   return res;
 }
@@ -940,8 +1303,6 @@ gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
  * Check if @fd in @set can be used for writing.
  *
  * Returns: %TRUE if the descriptor can be used for writing.
- *
- * Since: 0.10.18
  */
 gboolean
 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
@@ -953,7 +1314,7 @@ gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
   g_return_val_if_fail (fd != NULL, FALSE);
   g_return_val_if_fail (fd->fd >= 0, FALSE);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&((GstPoll *) set)->lock);
 
   idx = find_index (set->active_fds, fd);
   if (idx >= 0) {
@@ -966,47 +1327,58 @@ gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
 
     res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
 #endif
+  } else {
+    GST_WARNING ("%p: couldn't find fd !", set);
   }
+  g_mutex_unlock (&((GstPoll *) set)->lock);
 
-  g_mutex_unlock (set->lock);
+  GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
 
   return res;
 }
 
-static void
-gst_poll_check_ctrl_commands (GstPoll * set, gint res, gboolean * restarting)
+/**
+ * gst_poll_fd_has_pri:
+ * @set: a file descriptor set.
+ * @fd: a file descriptor.
+ *
+ * Check if @fd in @set has an exceptional condition (POLLPRI).
+ *
+ * Not implemented on Windows (will just return %FALSE there).
+ *
+ * Returns: %TRUE if the descriptor has an exceptional condition.
+ *
+ * Since: 1.16
+ */
+gboolean
+gst_poll_fd_has_pri (const GstPoll * set, GstPollFD * fd)
 {
-  /* check if the poll/select was aborted due to a command */
-  if (set->controllable) {
-#ifndef G_OS_WIN32
-    while (TRUE) {
-      guchar cmd;
-      gint result;
-
-      /* we do not check the read status of the control socket here because
-       * there may have been a write to the socket between the time the
-       * poll/select finished and before we got the mutex back, and we need
-       * to clear out the control socket before leaving */
-      READ_COMMAND (set, cmd, result);
-      if (result <= 0) {
-        /* no more commands, quit the loop */
-        break;
-      }
-
-      /* if the control socket is the only socket with activity when we get
-       * here, we restart the _wait operation, else we allow the caller to
-       * process the other file descriptors */
-      if (res == 1 &&
-          gst_poll_fd_can_read_unlocked (set, &set->control_read_fd))
-        *restarting = TRUE;
-    }
+#ifdef G_OS_WIN32
+  return FALSE;
 #else
-    if (WaitForSingleObject (set->wakeup_event, 0) == WAIT_OBJECT_0) {
-      ResetEvent (set->wakeup_event);
-      *restarting = TRUE;
-    }
-#endif
+  gboolean res = FALSE;
+  gint idx;
+
+  g_return_val_if_fail (set != NULL, FALSE);
+  g_return_val_if_fail (fd != NULL, FALSE);
+  g_return_val_if_fail (fd->fd >= 0, FALSE);
+
+  g_mutex_lock (&((GstPoll *) set)->lock);
+
+  idx = find_index (set->active_fds, fd);
+  if (idx >= 0) {
+    struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
+
+    res = (pfd->revents & POLLPRI) != 0;
+  } else {
+    GST_WARNING ("%p: couldn't find fd !", set);
   }
+  g_mutex_unlock (&((GstPoll *) set)->lock);
+
+  GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
+
+  return res;
+#endif
 }
 
 /**
@@ -1017,35 +1389,43 @@ gst_poll_check_ctrl_commands (GstPoll * set, gint res, gboolean * restarting)
  * Wait for activity on the file descriptors in @set. This function waits up to
  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
  *
- * When this function is called from multiple threads, -1 will be returned with
- * errno set to EPERM.
+ * For #GstPoll objects created with gst_poll_new(), this function can only be
+ * called from a single thread at a time.  If called from multiple threads,
+ * -1 will be returned with errno set to EPERM.
+ *
+ * This is not true for timer #GstPoll objects created with
+ * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
+ * simultaneously.
  *
  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
  * activity was detected after @timeout. If an error occurs, -1 is returned
  * and errno is set.
- *
- * Since: 0.10.18
  */
 gint
 gst_poll_wait (GstPoll * set, GstClockTime timeout)
 {
   gboolean restarting;
+  gboolean is_timer;
   int res;
+  gint old_waiting;
 
   g_return_val_if_fail (set != NULL, -1);
 
-  g_mutex_lock (set->lock);
+  GST_DEBUG ("%p: timeout :%" GST_TIME_FORMAT, set, GST_TIME_ARGS (timeout));
+
+  is_timer = set->timer;
 
-  /* we cannot wait from multiple threads */
-  if (set->waiting)
+  /* add one more waiter */
+  old_waiting = INC_WAITING (set);
+
+  /* we cannot wait from multiple threads unless we are a timer */
+  if (G_UNLIKELY (old_waiting > 0 && !is_timer))
     goto already_waiting;
 
-  /* flushing, exit immediatly */
-  if (set->flushing)
+  /* flushing, exit immediately */
+  if (G_UNLIKELY (IS_FLUSHING (set)))
     goto flushing;
 
-  set->waiting = TRUE;
-
   do {
     GstPollMode mode;
 
@@ -1054,16 +1434,18 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
 
     mode = choose_mode (set, timeout);
 
+    if (TEST_REBUILD (set)) {
+      g_mutex_lock (&set->lock);
 #ifndef G_OS_WIN32
-    g_array_set_size (set->active_fds, set->fds->len);
-    memcpy (set->active_fds->data, set->fds->data,
-        set->fds->len * sizeof (struct pollfd));
+      g_array_set_size (set->active_fds, set->fds->len);
+      memcpy (set->active_fds->data, set->fds->data,
+          set->fds->len * sizeof (struct pollfd));
 #else
-    if (!gst_poll_prepare_winsock_active_sets (set))
-      goto winsock_error;
+      if (!gst_poll_prepare_winsock_active_sets (set))
+        goto winsock_error;
 #endif
-
-    g_mutex_unlock (set->lock);
+      g_mutex_unlock (&set->lock);
+    }
 
     switch (mode) {
       case GST_POLL_MODE_AUTO:
@@ -1124,9 +1506,10 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
 #ifndef G_OS_WIN32
         fd_set readfds;
         fd_set writefds;
+        fd_set errorfds;
         gint max_fd;
 
-        max_fd = pollfd_to_fd_set (set, &readfds, &writefds);
+        max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
 
         if (mode == GST_POLL_MODE_SELECT) {
           struct timeval tv;
@@ -1139,7 +1522,9 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
             tvptr = NULL;
           }
 
-          res = select (max_fd + 1, &readfds, &writefds, NULL, tvptr);
+          GST_DEBUG ("%p: Calling select", set);
+          res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
+          GST_DEBUG ("%p: After select, res:%d", set, res);
         } else {
 #ifdef HAVE_PSELECT
           struct timespec ts;
@@ -1152,12 +1537,15 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
             tsptr = NULL;
           }
 
-          res = pselect (max_fd + 1, &readfds, &writefds, NULL, tsptr, NULL);
+          GST_DEBUG ("%p: Calling pselect", set);
+          res =
+              pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
+          GST_DEBUG ("%p: After pselect, res:%d", set, res);
 #endif
         }
 
-        if (res > 0) {
-          fd_set_to_pollfd (set, &readfds, &writefds);
+        if (res >= 0) {
+          fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
         }
 #else /* G_OS_WIN32 */
         g_assert_not_reached ();
@@ -1181,8 +1569,13 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
           t = 0;
         }
 
-        wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
-            (HANDLE *) set->active_events->data, FALSE, t, FALSE);
+        if (set->active_events->len != 0) {
+          wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
+              (HANDLE *) set->active_events->data, FALSE, t, FALSE);
+        } else {
+          wait_ret = WSA_WAIT_FAILED;
+          WSASetLastError (WSA_INVALID_PARAMETER);
+        }
 
         if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
           res = 0;
@@ -1205,44 +1598,47 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
       }
     }
 
-    g_mutex_lock (set->lock);
-
-    gst_poll_check_ctrl_commands (set, res, &restarting);
-
-    /* update the controllable state if needed */
-    set->controllable = set->new_controllable;
-
-    if (set->flushing) {
-      /* we got woken up and we are flushing, we need to stop */
-      errno = EBUSY;
-      res = -1;
-      break;
+    if (!is_timer) {
+      /* Applications needs to clear the control socket themselves for timer
+       * polls.
+       * For other polls, we need to clear the control socket. If there was only
+       * one socket with activity and it was the control socket, we need to
+       * restart */
+      if (release_all_wakeup (set) > 0 && res == 1)
+        restarting = TRUE;
     }
-  } while (restarting);
 
-  set->waiting = FALSE;
+    /* we got woken up and we are flushing, we need to stop */
+    if (G_UNLIKELY (IS_FLUSHING (set)))
+      goto flushing;
+
+  } while (G_UNLIKELY (restarting));
 
-  g_mutex_unlock (set->lock);
+  DEC_WAITING (set);
 
   return res;
 
   /* ERRORS */
 already_waiting:
   {
-    g_mutex_unlock (set->lock);
+    GST_LOG ("%p: we are already waiting", set);
+    DEC_WAITING (set);
     errno = EPERM;
     return -1;
   }
 flushing:
   {
-    g_mutex_unlock (set->lock);
+    GST_LOG ("%p: we are flushing", set);
+    DEC_WAITING (set);
     errno = EBUSY;
     return -1;
   }
 #ifdef G_OS_WIN32
 winsock_error:
   {
-    g_mutex_unlock (set->lock);
+    GST_LOG ("%p: winsock error", set);
+    g_mutex_unlock (&set->lock);
+    DEC_WAITING (set);
     return -1;
   }
 #endif
@@ -1257,54 +1653,22 @@ winsock_error:
  * gst_poll_wait() will be affected by gst_poll_restart() and
  * gst_poll_set_flushing().
  *
- * Returns: %TRUE if the controllability of @set could be updated.
+ * This function only works for non-timer #GstPoll objects created with
+ * gst_poll_new().
  *
- * Since: 0.10.18
+ * Returns: %TRUE if the controllability of @set could be updated.
  */
 gboolean
 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
 {
   g_return_val_if_fail (set != NULL, FALSE);
+  g_return_val_if_fail (!set->timer, FALSE);
 
-  g_mutex_lock (set->lock);
-
-#ifndef G_OS_WIN32
-  if (controllable && set->control_read_fd.fd < 0) {
-    gint control_sock[2];
+  GST_LOG ("%p: controllable : %d", set, controllable);
 
-    if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
-      goto no_socket_pair;
-
-    fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
-    fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
-
-    set->control_read_fd.fd = control_sock[0];
-    set->control_write_fd.fd = control_sock[1];
-
-    gst_poll_add_fd_unlocked (set, &set->control_read_fd);
-  }
-
-  if (set->control_read_fd.fd >= 0)
-    gst_poll_fd_ctl_read_unlocked (set, &set->control_read_fd, controllable);
-#endif
-
-  /* delay the change of the controllable state if we are waiting */
-  set->new_controllable = controllable;
-  if (!set->waiting)
-    set->controllable = controllable;
-
-  g_mutex_unlock (set->lock);
+  set->controllable = controllable;
 
   return TRUE;
-
-  /* ERRORS */
-#ifndef G_OS_WIN32
-no_socket_pair:
-  {
-    g_mutex_unlock (set->lock);
-    return FALSE;
-  }
-#endif
 }
 
 /**
@@ -1316,26 +1680,20 @@ no_socket_pair:
  *
  * If @set is not controllable, then this call will have no effect.
  *
- * Since: 0.10.18
+ * This function only works for non-timer #GstPoll objects created with
+ * gst_poll_new().
  */
 void
 gst_poll_restart (GstPoll * set)
 {
   g_return_if_fail (set != NULL);
+  g_return_if_fail (!set->timer);
 
-  g_mutex_lock (set->lock);
-
-  if (set->controllable && set->waiting) {
-#ifndef G_OS_WIN32
-    /* if we are waiting, we can send the command, else we do not have to
-     * bother, future calls will automatically pick up the new fdset */
-    SEND_COMMAND (set, GST_POLL_CMD_WAKEUP);
-#else
-    SetEvent (set->wakeup_event);
-#endif
+  if (set->controllable && GET_WAITING (set) > 0) {
+    /* we are controllable and waiting, wake up the waiter. The socket will be
+     * cleared by the _wait() thread and the poll will be restarted */
+    raise_wakeup (set);
   }
-
-  g_mutex_unlock (set->lock);
 }
 
 /**
@@ -1348,28 +1706,84 @@ gst_poll_restart (GstPoll * set)
  *
  * Unsetting the flushing state will restore normal operation of @set.
  *
- * Since: 0.10.18
+ * This function only works for non-timer #GstPoll objects created with
+ * gst_poll_new().
  */
 void
 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
 {
   g_return_if_fail (set != NULL);
+  g_return_if_fail (!set->timer);
 
-  g_mutex_lock (set->lock);
+  GST_LOG ("%p: flushing: %d", set, flushing);
 
   /* update the new state first */
-  set->flushing = flushing;
+  SET_FLUSHING (set, flushing);
 
-  if (flushing && set->controllable && set->waiting) {
+  if (flushing && set->controllable && GET_WAITING (set) > 0) {
     /* we are flushing, controllable and waiting, wake up the waiter. When we
      * stop the flushing operation we don't clear the wakeup fd here, this will
      * happen in the _wait() thread. */
-#ifndef G_OS_WIN32
-    SEND_COMMAND (set, GST_POLL_CMD_WAKEUP);
-#else
-    SetEvent (set->wakeup_event);
-#endif
+    raise_wakeup (set);
   }
+}
 
-  g_mutex_unlock (set->lock);
+/**
+ * gst_poll_write_control:
+ * @set: a #GstPoll.
+ *
+ * Write a byte to the control socket of the controllable @set.
+ * This function is mostly useful for timer #GstPoll objects created with
+ * gst_poll_new_timer().
+ *
+ * It will make any current and future gst_poll_wait() function return with
+ * 1, meaning the control socket is set. After an equal amount of calls to
+ * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
+ * block again until their timeout expired.
+ *
+ * This function only works for timer #GstPoll objects created with
+ * gst_poll_new_timer().
+ *
+ * Returns: %TRUE on success. %FALSE when when the byte could not be written.
+ * errno contains the detailed error code but will never be EAGAIN, EINTR or
+ * EWOULDBLOCK. %FALSE always signals a critical error.
+ */
+gboolean
+gst_poll_write_control (GstPoll * set)
+{
+  gboolean res;
+
+  g_return_val_if_fail (set != NULL, FALSE);
+  g_return_val_if_fail (set->timer, FALSE);
+
+  res = raise_wakeup (set);
+
+  return res;
+}
+
+/**
+ * gst_poll_read_control:
+ * @set: a #GstPoll.
+ *
+ * Read a byte from the control socket of the controllable @set.
+ *
+ * This function only works for timer #GstPoll objects created with
+ * gst_poll_new_timer().
+ *
+ * Returns: %TRUE on success. %FALSE when when there was no byte to read or
+ * reading the byte failed. If there was no byte to read, and only then, errno
+ * will contain EWOULDBLOCK or EAGAIN. For all other values of errno this always signals a
+ * critical error.
+ */
+gboolean
+gst_poll_read_control (GstPoll * set)
+{
+  gboolean res;
+
+  g_return_val_if_fail (set != NULL, FALSE);
+  g_return_val_if_fail (set->timer, FALSE);
+
+  res = release_wakeup (set);
+
+  return res;
 }