caps: _do_simplify() -> _simplify()
[platform/upstream/gstreamer.git] / gst / gstpoll.c
index 8bb282a..7aab1d1 100644 (file)
  * A #GstPoll keeps track of file descriptors much like fd_set (used with
  * select()) or a struct pollfd array (used with poll()). Once created with
  * gst_poll_new(), the set can be used to wait for file descriptors to be
- * readable and/or writeable. It is possible to make this wait be controlled
+ * readable and/or writable. It is possible to make this wait be controlled
  * by specifying %TRUE for the @controllable flag when creating the set (or
  * later calling gst_poll_set_controllable()).
  *
  * New file descriptors are added to the set using gst_poll_add_fd(), and
  * removed using gst_poll_remove_fd(). Controlling which file descriptors
- * should be waited for to become readable and/or writeable are done using
+ * should be waited for to become readable and/or writable are done using
  * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
  *
  * Use gst_poll_wait() to wait for the file descriptors to actually become
- * readable and/or writeable, or to timeout if no file descriptor is available
+ * readable and/or writable, or to timeout if no file descriptor is available
  * in time. The wait can be controlled by calling gst_poll_restart() and
  * gst_poll_set_flushing().
  *
@@ -57,6 +57,7 @@
 #endif
 
 #include "gst_private.h"
+#include "glib-compat-private.h"
 
 #include <sys/types.h>
 
 #define EINPROGRESS WSAEINPROGRESS
 #else
 #define _GNU_SOURCE 1
+#ifdef HAVE_SYS_POLL_H
 #include <sys/poll.h>
+#endif
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
 #include <sys/time.h>
 #include <sys/socket.h>
 #endif
@@ -119,7 +125,7 @@ struct _GstPoll
 {
   GstPollMode mode;
 
-  GMutex *lock;
+  GMutex lock;
   /* array of fds, always written to and read from with lock */
   GArray *fds;
   /* array of active fds, only written to from the waiting thread with the
@@ -154,8 +160,8 @@ static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
 #define IS_FLUSHING(s)      (g_atomic_int_get(&(s)->flushing))
 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
 
-#define INC_WAITING(s)      (g_atomic_int_exchange_and_add(&(s)->waiting, 1))
-#define DEC_WAITING(s)      (g_atomic_int_exchange_and_add(&(s)->waiting, -1))
+#define INC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, 1))
+#define DEC_WAITING(s)      (g_atomic_int_add(&(s)->waiting, -1))
 #define GET_WAITING(s)      (g_atomic_int_get(&(s)->waiting))
 
 #define TEST_REBUILD(s)     (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
@@ -176,8 +182,9 @@ raise_wakeup (GstPoll * set)
 {
   gboolean result = TRUE;
 
-  if (g_atomic_int_exchange_and_add (&set->control_pending, 1) == 0) {
+  if (g_atomic_int_add (&set->control_pending, 1) == 0) {
     /* raise when nothing pending */
+    GST_LOG ("%p: raise", set);
     result = WAKE_EVENT (set);
   }
   return result;
@@ -192,6 +199,7 @@ release_wakeup (GstPoll * set)
   gboolean result = TRUE;
 
   if (g_atomic_int_dec_and_test (&set->control_pending)) {
+    GST_LOG ("%p: release", set);
     result = RELEASE_EVENT (set);
   }
   return result;
@@ -214,7 +222,7 @@ release_all_wakeup (GstPoll * set)
         break;
       else
         /* retry again until we read it successfully */
-        g_atomic_int_exchange_and_add (&set->control_pending, 1);
+        g_atomic_int_add (&set->control_pending, 1);
     }
   }
   return old;
@@ -268,20 +276,20 @@ selectable_fds (const GstPoll * set)
 {
   guint i;
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&set->lock);
   for (i = 0; i < set->fds->len; i++) {
     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
 
     if (pfd->fd >= FD_SETSIZE)
       goto too_many;
   }
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&set->lock);
 
   return TRUE;
 
 too_many:
   {
-    g_mutex_unlock (set->lock);
+    g_mutex_unlock (&set->lock);
     return FALSE;
   }
 }
@@ -344,7 +352,7 @@ pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
   FD_ZERO (writefds);
   FD_ZERO (errorfds);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&set->lock);
 
   for (i = 0; i < set->active_fds->len; i++) {
     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
@@ -361,7 +369,7 @@ pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
     }
   }
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&set->lock);
 
   return max_fd;
 }
@@ -372,7 +380,7 @@ fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
 {
   guint i;
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&set->lock);
 
   for (i = 0; i < set->active_fds->len; i++) {
     struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
@@ -388,7 +396,7 @@ fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
     }
   }
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&set->lock);
 }
 #else /* G_OS_WIN32 */
 /*
@@ -529,7 +537,7 @@ gst_poll_collect_winsock_events (GstPoll * set)
 #endif
 
 /**
- * gst_poll_new:
+ * gst_poll_new: (skip)
  * @controllable: whether it should be possible to control a wait.
  *
  * Create a new file descriptor set. If @controllable, it
@@ -551,7 +559,7 @@ gst_poll_new (gboolean controllable)
   GST_DEBUG ("controllable : %d", controllable);
 
   nset = g_slice_new0 (GstPoll);
-  nset->lock = g_mutex_new ();
+  g_mutex_init (&nset->lock);
 #ifndef G_OS_WIN32
   nset->mode = GST_POLL_MODE_AUTO;
   nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
@@ -603,7 +611,7 @@ no_socket_pair:
 }
 
 /**
- * gst_poll_new_timer:
+ * gst_poll_new_timer: (skip)
  *
  * Create a new poll object that can be used for scheduling cancellable
  * timeouts.
@@ -671,7 +679,7 @@ gst_poll_free (GstPoll * set)
 
   g_array_free (set->active_fds, TRUE);
   g_array_free (set->fds, TRUE);
-  g_mutex_free (set->lock);
+  g_mutex_clear (&set->lock);
   g_slice_free (GstPoll, set);
 }
 
@@ -784,11 +792,11 @@ gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
   g_return_val_if_fail (fd != NULL, FALSE);
   g_return_val_if_fail (fd->fd >= 0, FALSE);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&set->lock);
 
   ret = gst_poll_add_fd_unlocked (set, fd);
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&set->lock);
 
   return ret;
 }
@@ -816,7 +824,7 @@ gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
 
   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&set->lock);
 
   /* get the index, -1 is an fd that is not added */
   idx = find_index (set->fds, fd);
@@ -837,7 +845,7 @@ gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
     GST_WARNING ("%p: couldn't find fd !", set);
   }
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&set->lock);
 
   return idx >= 0;
 }
@@ -867,7 +875,7 @@ gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
   GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
       fd->fd, fd->idx, active);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&set->lock);
 
   idx = find_index (set->fds, fd);
   if (idx >= 0) {
@@ -889,7 +897,7 @@ gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
     GST_WARNING ("%p: couldn't find fd !", set);
   }
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&set->lock);
 
   return idx >= 0;
 }
@@ -945,11 +953,11 @@ gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
   g_return_val_if_fail (fd != NULL, FALSE);
   g_return_val_if_fail (fd->fd >= 0, FALSE);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&set->lock);
 
   ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&set->lock);
 
   return ret;
 }
@@ -980,7 +988,7 @@ gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
   g_return_if_fail (fd != NULL);
   g_return_if_fail (fd->fd >= 0);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&set->lock);
 
   idx = find_index (set->fds, fd);
   if (idx >= 0) {
@@ -990,7 +998,7 @@ gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
     MARK_REBUILD (set);
   }
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&set->lock);
 #endif
 }
 
@@ -1017,7 +1025,7 @@ gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
 
   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&((GstPoll *) set)->lock);
 
   idx = find_index (set->active_fds, fd);
   if (idx >= 0) {
@@ -1034,7 +1042,7 @@ gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
     GST_WARNING ("%p: couldn't find fd !", set);
   }
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&((GstPoll *) set)->lock);
 
   return res;
 }
@@ -1062,7 +1070,7 @@ gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
 
   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&((GstPoll *) set)->lock);
 
   idx = find_index (set->active_fds, fd);
   if (idx >= 0) {
@@ -1083,7 +1091,7 @@ gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
     GST_WARNING ("%p: couldn't find fd !", set);
   }
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&((GstPoll *) set)->lock);
 
   return res;
 }
@@ -1134,11 +1142,11 @@ gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
   g_return_val_if_fail (fd != NULL, FALSE);
   g_return_val_if_fail (fd->fd >= 0, FALSE);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&((GstPoll *) set)->lock);
 
   res = gst_poll_fd_can_read_unlocked (set, fd);
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&((GstPoll *) set)->lock);
 
   return res;
 }
@@ -1166,7 +1174,7 @@ gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
 
   GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
 
-  g_mutex_lock (set->lock);
+  g_mutex_lock (&((GstPoll *) set)->lock);
 
   idx = find_index (set->active_fds, fd);
   if (idx >= 0) {
@@ -1183,7 +1191,7 @@ gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
     GST_WARNING ("%p: couldn't find fd !", set);
   }
 
-  g_mutex_unlock (set->lock);
+  g_mutex_unlock (&((GstPoll *) set)->lock);
 
   return res;
 }
@@ -1231,7 +1239,7 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
   if (G_UNLIKELY (old_waiting > 0 && !is_timer))
     goto already_waiting;
 
-  /* flushing, exit immediatly */
+  /* flushing, exit immediately */
   if (G_UNLIKELY (IS_FLUSHING (set)))
     goto flushing;
 
@@ -1244,7 +1252,7 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
     mode = choose_mode (set, timeout);
 
     if (TEST_REBUILD (set)) {
-      g_mutex_lock (set->lock);
+      g_mutex_lock (&set->lock);
 #ifndef G_OS_WIN32
       g_array_set_size (set->active_fds, set->fds->len);
       memcpy (set->active_fds->data, set->fds->data,
@@ -1253,7 +1261,7 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
       if (!gst_poll_prepare_winsock_active_sets (set))
         goto winsock_error;
 #endif
-      g_mutex_unlock (set->lock);
+      g_mutex_unlock (&set->lock);
     }
 
     switch (mode) {
@@ -1417,12 +1425,10 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
         restarting = TRUE;
     }
 
-    if (G_UNLIKELY (IS_FLUSHING (set))) {
-      /* we got woken up and we are flushing, we need to stop */
-      errno = EBUSY;
-      res = -1;
-      break;
-    }
+    /* we got woken up and we are flushing, we need to stop */
+    if (G_UNLIKELY (IS_FLUSHING (set)))
+      goto flushing;
+
   } while (G_UNLIKELY (restarting));
 
   DEC_WAITING (set);
@@ -1432,12 +1438,14 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
   /* ERRORS */
 already_waiting:
   {
+    GST_LOG ("%p: we are already waiting", set);
     DEC_WAITING (set);
     errno = EPERM;
     return -1;
   }
 flushing:
   {
+    GST_LOG ("%p: we are flushing", set);
     DEC_WAITING (set);
     errno = EBUSY;
     return -1;
@@ -1445,7 +1453,8 @@ flushing:
 #ifdef G_OS_WIN32
 winsock_error:
   {
-    g_mutex_unlock (set->lock);
+    GST_LOG ("%p: winsock error", set);
+    g_mutex_unlock (&set->lock);
     DEC_WAITING (set);
     return -1;
   }
@@ -1517,6 +1526,8 @@ gst_poll_set_flushing (GstPoll * set, gboolean flushing)
 {
   g_return_if_fail (set != NULL);
 
+  GST_LOG ("%p: flushing: %d", set, flushing);
+
   /* update the new state first */
   SET_FLUSHING (set, flushing);