* A #GstPoll keeps track of file descriptors much like fd_set (used with
* select()) or a struct pollfd array (used with poll()). Once created with
* gst_poll_new(), the set can be used to wait for file descriptors to be
- * readable and/or writeable. It is possible to make this wait be controlled
+ * readable and/or writable. It is possible to make this wait be controlled
* by specifying %TRUE for the @controllable flag when creating the set (or
* later calling gst_poll_set_controllable()).
*
* New file descriptors are added to the set using gst_poll_add_fd(), and
* removed using gst_poll_remove_fd(). Controlling which file descriptors
- * should be waited for to become readable and/or writeable are done using
+ * should be waited for to become readable and/or writable are done using
* gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
*
* Use gst_poll_wait() to wait for the file descriptors to actually become
- * readable and/or writeable, or to timeout if no file descriptor is available
+ * readable and/or writable, or to timeout if no file descriptor is available
* in time. The wait can be controlled by calling gst_poll_restart() and
* gst_poll_set_flushing().
*
#endif
#include "gst_private.h"
+#include "glib-compat-private.h"
#include <sys/types.h>
#define EINPROGRESS WSAEINPROGRESS
#else
#define _GNU_SOURCE 1
+#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
+#endif
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
#include <sys/time.h>
#include <sys/socket.h>
#endif
#define GST_CAT_DEFAULT GST_CAT_POLL
-#ifndef G_OS_WIN32
-/* the poll/select call is also performed on a control socket, that way
- * we can send special commands to control it
- */
-/* FIXME: Shouldn't we check or return the return value
- * of write()?
- */
-#define SEND_COMMAND(set, command, result) \
-G_STMT_START { \
- unsigned char c = command; \
- result = write (set->control_write_fd.fd, &c, 1); \
- if (result > 0) \
- set->control_pending++; \
-} G_STMT_END
-
-#define READ_COMMAND(set, command, res) \
-G_STMT_START { \
- if (set->control_pending > 0) { \
- res = read (set->control_read_fd.fd, &command, 1); \
- if (res == 1) \
- set->control_pending--; \
- } else \
- res = 0; \
-} G_STMT_END
-
-#define GST_POLL_CMD_WAKEUP 'W' /* restart the poll/select call */
-
-#else /* G_OS_WIN32 */
+#ifdef G_OS_WIN32
typedef struct _WinsockFd WinsockFd;
struct _WinsockFd
{
GstPollMode mode;
- GMutex *lock;
-
+ GMutex lock;
+ /* array of fds, always written to and read from with lock */
GArray *fds;
+ /* array of active fds, only written to from the waiting thread with the
+ * lock and read from with the lock or without the lock from the waiting
+ * thread */
GArray *active_fds;
+
#ifndef G_OS_WIN32
+ gchar buf[1];
GstPollFD control_read_fd;
GstPollFD control_write_fd;
#else
#endif
gboolean controllable;
- gboolean new_controllable;
- guint waiting;
- guint control_pending;
- gboolean flushing;
+ volatile gint waiting;
+ volatile gint control_pending;
+ volatile gint flushing;
gboolean timer;
+ volatile gint rebuild;
};
+static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
+ gboolean active);
+static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
+
+#define IS_FLUSHING(s) (g_atomic_int_get(&(s)->flushing))
+#define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
+
+#define INC_WAITING(s) (g_atomic_int_add(&(s)->waiting, 1))
+#define DEC_WAITING(s) (g_atomic_int_add(&(s)->waiting, -1))
+#define GET_WAITING(s) (g_atomic_int_get(&(s)->waiting))
+
+#define TEST_REBUILD(s) (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
+#define MARK_REBUILD(s) (g_atomic_int_set(&(s)->rebuild, 1))
+
+#ifndef G_OS_WIN32
+#define WAKE_EVENT(s) (write ((s)->control_write_fd.fd, "W", 1) == 1)
+#define RELEASE_EVENT(s) (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1)
+#else
+#define WAKE_EVENT(s) (SetEvent ((s)->wakeup_event), errno = GetLastError () == NO_ERROR ? 0 : EACCES, errno == 0 ? 1 : 0)
+#define RELEASE_EVENT(s) (ResetEvent ((s)->wakeup_event))
+#endif
+
+/* the poll/select call is also performed on a control socket, that way
+ * we can send special commands to control it */
+static inline gboolean
+raise_wakeup (GstPoll * set)
+{
+ gboolean result = TRUE;
+
+ if (g_atomic_int_add (&set->control_pending, 1) == 0) {
+ /* raise when nothing pending */
+ GST_LOG ("%p: raise", set);
+ result = WAKE_EVENT (set);
+ }
+ return result;
+}
+
+/* note how bad things can happen when the 2 threads both raise and release the
+ * wakeup. This is however not a problem because you must always pair a raise
+ * with a release */
+static inline gboolean
+release_wakeup (GstPoll * set)
+{
+ gboolean result = TRUE;
+
+ if (g_atomic_int_dec_and_test (&set->control_pending)) {
+ GST_LOG ("%p: release", set);
+ result = RELEASE_EVENT (set);
+ }
+ return result;
+}
+
+static inline gint
+release_all_wakeup (GstPoll * set)
+{
+ gint old;
+
+ while (TRUE) {
+ if (!(old = g_atomic_int_get (&set->control_pending)))
+ /* nothing pending, just exit */
+ break;
+
+ /* try to remove all pending control messages */
+ if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
+ /* we managed to remove all messages, read the control socket */
+ if (RELEASE_EVENT (set))
+ break;
+ else
+ /* retry again until we read it successfully */
+ g_atomic_int_add (&set->control_pending, 1);
+ }
+ }
+ return old;
+}
+
static gint
find_index (GArray * array, GstPollFD * fd)
{
{
guint i;
+ g_mutex_lock (&set->lock);
for (i = 0; i < set->fds->len; i++) {
struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
if (pfd->fd >= FD_SETSIZE)
- return FALSE;
+ goto too_many;
}
+ g_mutex_unlock (&set->lock);
return TRUE;
+
+too_many:
+ {
+ g_mutex_unlock (&set->lock);
+ return FALSE;
+ }
}
/* check if the timeout will convert to a timeout value used for poll()
FD_ZERO (writefds);
FD_ZERO (errorfds);
- g_mutex_lock (set->lock);
+ g_mutex_lock (&set->lock);
for (i = 0; i < set->active_fds->len; i++) {
struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
FD_SET (pfd->fd, writefds);
if (pfd->events)
FD_SET (pfd->fd, errorfds);
- if (pfd->fd > max_fd)
+ if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
max_fd = pfd->fd;
}
}
- g_mutex_unlock (set->lock);
+ g_mutex_unlock (&set->lock);
return max_fd;
}
{
guint i;
- g_mutex_lock (set->lock);
+ g_mutex_lock (&set->lock);
for (i = 0; i < set->active_fds->len; i++) {
struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
if (pfd->fd < FD_SETSIZE) {
+ pfd->revents = 0;
if (FD_ISSET (pfd->fd, readfds))
pfd->revents |= POLLIN;
if (FD_ISSET (pfd->fd, writefds))
}
}
- g_mutex_unlock (set->lock);
+ g_mutex_unlock (&set->lock);
}
#else /* G_OS_WIN32 */
/*
#endif
/**
- * gst_poll_new:
+ * gst_poll_new: (skip)
* @controllable: whether it should be possible to control a wait.
*
* Create a new file descriptor set. If @controllable, it
* is possible to restart or flush a call to gst_poll_wait() with
* gst_poll_restart() and gst_poll_set_flushing() respectively.
*
- * Returns: a new #GstPoll, or %NULL in case of an error. Free with
- * gst_poll_free().
+ * Free-function: gst_poll_free
+ *
+ * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
+ * Free with gst_poll_free().
*
* Since: 0.10.18
*/
GST_DEBUG ("controllable : %d", controllable);
nset = g_slice_new0 (GstPoll);
- nset->lock = g_mutex_new ();
+ g_mutex_init (&nset->lock);
#ifndef G_OS_WIN32
nset->mode = GST_POLL_MODE_AUTO;
nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
nset->control_read_fd.fd = -1;
nset->control_write_fd.fd = -1;
+ {
+ gint control_sock[2];
+
+ if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
+ goto no_socket_pair;
+
+ fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
+ fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
+
+ nset->control_read_fd.fd = control_sock[0];
+ nset->control_write_fd.fd = control_sock[1];
+
+ gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
+ gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
+ }
#else
nset->mode = GST_POLL_MODE_WINDOWS;
nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
#endif
- if (!gst_poll_set_controllable (nset, controllable))
- goto not_controllable;
+ /* ensure (re)build, though already sneakily set in non-windows case */
+ MARK_REBUILD (nset);
+
+ nset->controllable = controllable;
return nset;
/* ERRORS */
-not_controllable:
+#ifndef G_OS_WIN32
+no_socket_pair:
{
+ GST_WARNING ("%p: can't create socket pair !", nset);
gst_poll_free (nset);
return NULL;
}
+#endif
}
/**
- * gst_poll_new_timer:
+ * gst_poll_new_timer: (skip)
*
* Create a new poll object that can be used for scheduling cancellable
* timeouts.
* A timeout is performed with gst_poll_wait(). Multiple timeouts can be
* performed from different threads.
*
- * Returns: a new #GstPoll, or %NULL in case of an error. Free with
- * gst_poll_free().
+ * Free-function: gst_poll_free
+ *
+ * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
+ * Free with gst_poll_free().
*
* Since: 0.10.23
*/
/**
* gst_poll_free:
- * @set: a file descriptor set.
+ * @set: (transfer full): a file descriptor set.
*
* Free a file descriptor set.
*
{
g_return_if_fail (set != NULL);
- GST_DEBUG_OBJECT (set, "Freeing");
+ GST_DEBUG ("%p: freeing", set);
#ifndef G_OS_WIN32
if (set->control_write_fd.fd >= 0)
g_array_free (set->active_fds, TRUE);
g_array_free (set->fds, TRUE);
- g_mutex_free (set->lock);
+ g_mutex_clear (&set->lock);
g_slice_free (GstPoll, set);
}
/**
+ * gst_poll_get_read_gpollfd:
+ * @set: a #GstPoll
+ * @fd: a #GPollFD
+ *
+ * Get a GPollFD for the reading part of the control socket. This is useful when
+ * integrating with a GSource and GMainLoop.
+ *
+ * Since: 0.10.32
+ */
+void
+gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
+{
+ g_return_if_fail (set != NULL);
+ g_return_if_fail (fd != NULL);
+
+#ifndef G_OS_WIN32
+ fd->fd = set->control_read_fd.fd;
+#else
+#if GLIB_SIZEOF_VOID_P == 8
+ fd->fd = (gint64) set->wakeup_event;
+#else
+ fd->fd = (gint) set->wakeup_event;
+#endif
+#endif
+ fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
+ fd->revents = 0;
+}
+
+/**
* gst_poll_fd_init:
* @fd: a #GstPollFD
*
{
gint idx;
- GST_DEBUG_OBJECT (set, "fd (fd:%d, idx:%d)", fd->fd, fd->idx);
+ GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
idx = find_index (set->fds, fd);
if (idx < 0) {
fd->idx = set->fds->len - 1;
#endif
+ MARK_REBUILD (set);
} else {
- GST_WARNING_OBJECT (set, "Couldn't find fd !");
+ GST_WARNING ("%p: couldn't find fd !", set);
}
return TRUE;
g_return_val_if_fail (fd != NULL, FALSE);
g_return_val_if_fail (fd->fd >= 0, FALSE);
- g_mutex_lock (set->lock);
+ g_mutex_lock (&set->lock);
ret = gst_poll_add_fd_unlocked (set, fd);
- g_mutex_unlock (set->lock);
+ g_mutex_unlock (&set->lock);
return ret;
}
g_return_val_if_fail (fd->fd >= 0, FALSE);
- GST_DEBUG_OBJECT (set, "fd (fd:%d, idx:%d)", fd->fd, fd->idx);
+ GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
- g_mutex_lock (set->lock);
+ g_mutex_lock (&set->lock);
/* get the index, -1 is an fd that is not added */
idx = find_index (set->fds, fd);
/* mark fd as removed by setting the index to -1 */
fd->idx = -1;
+ MARK_REBUILD (set);
} else {
- GST_WARNING_OBJECT (set, "Couldn't find fd !");
+ GST_WARNING ("%p: couldn't find fd !", set);
}
- g_mutex_unlock (set->lock);
+ g_mutex_unlock (&set->lock);
return idx >= 0;
}
g_return_val_if_fail (fd != NULL, FALSE);
g_return_val_if_fail (fd->fd >= 0, FALSE);
- GST_DEBUG_OBJECT (set, "fd (fd:%d, idx:%d), active : %d",
+ GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
fd->fd, fd->idx, active);
- g_mutex_lock (set->lock);
+ g_mutex_lock (&set->lock);
idx = find_index (set->fds, fd);
if (idx >= 0) {
gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
active);
#endif
+ MARK_REBUILD (set);
} else {
- GST_WARNING_OBJECT (set, "Couldn't find fd !");
+ GST_WARNING ("%p: couldn't find fd !", set);
}
- g_mutex_unlock (set->lock);
+ g_mutex_unlock (&set->lock);
return idx >= 0;
}
{
gint idx;
- GST_DEBUG_OBJECT (set, "fd (fd:%d, idx:%d), active : %d",
+ GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
fd->fd, fd->idx, active);
idx = find_index (set->fds, fd);
#else
gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
#endif
+ MARK_REBUILD (set);
} else {
- GST_WARNING_OBJECT (set, "Couldn't find fd !");
+ GST_WARNING ("%p: couldn't find fd !", set);
}
return idx >= 0;
g_return_val_if_fail (fd != NULL, FALSE);
g_return_val_if_fail (fd->fd >= 0, FALSE);
- g_mutex_lock (set->lock);
+ g_mutex_lock (&set->lock);
ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
- g_mutex_unlock (set->lock);
+ g_mutex_unlock (&set->lock);
return ret;
}
g_return_if_fail (fd != NULL);
g_return_if_fail (fd->fd >= 0);
- g_mutex_lock (set->lock);
+ g_mutex_lock (&set->lock);
idx = find_index (set->fds, fd);
if (idx >= 0) {
WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
+ MARK_REBUILD (set);
}
- g_mutex_unlock (set->lock);
+ g_mutex_unlock (&set->lock);
#endif
}
g_return_val_if_fail (fd != NULL, FALSE);
g_return_val_if_fail (fd->fd >= 0, FALSE);
- GST_DEBUG_OBJECT (set, "fd (fd:%d, idx:%d)", fd->fd, fd->idx);
+ GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
- g_mutex_lock (set->lock);
+ g_mutex_lock (&((GstPoll *) set)->lock);
idx = find_index (set->active_fds, fd);
if (idx >= 0) {
res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
#endif
} else {
- GST_WARNING_OBJECT (set, "Couldn't find fd !");
+ GST_WARNING ("%p: couldn't find fd !", set);
}
- g_mutex_unlock (set->lock);
+ g_mutex_unlock (&((GstPoll *) set)->lock);
return res;
}
g_return_val_if_fail (fd != NULL, FALSE);
g_return_val_if_fail (fd->fd >= 0, FALSE);
- GST_DEBUG_OBJECT (set, "fd (fd:%d, idx:%d)", fd->fd, fd->idx);
+ GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
- g_mutex_lock (set->lock);
+ g_mutex_lock (&((GstPoll *) set)->lock);
idx = find_index (set->active_fds, fd);
if (idx >= 0) {
(wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
#endif
} else {
- GST_WARNING_OBJECT (set, "Couldn't find fd !");
+ GST_WARNING ("%p: couldn't find fd !", set);
}
- g_mutex_unlock (set->lock);
+ g_mutex_unlock (&((GstPoll *) set)->lock);
return res;
}
gboolean res = FALSE;
gint idx;
- GST_DEBUG_OBJECT (set, "fd (fd:%d, idx:%d)", fd->fd, fd->idx);
+ GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
idx = find_index (set->active_fds, fd);
if (idx >= 0) {
res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
#endif
} else {
- GST_WARNING_OBJECT (set, "Couldn't find fd !");
+ GST_WARNING ("%p: couldn't find fd !", set);
}
return res;
g_return_val_if_fail (fd != NULL, FALSE);
g_return_val_if_fail (fd->fd >= 0, FALSE);
- g_mutex_lock (set->lock);
+ g_mutex_lock (&((GstPoll *) set)->lock);
res = gst_poll_fd_can_read_unlocked (set, fd);
- g_mutex_unlock (set->lock);
+ g_mutex_unlock (&((GstPoll *) set)->lock);
return res;
}
g_return_val_if_fail (fd != NULL, FALSE);
g_return_val_if_fail (fd->fd >= 0, FALSE);
- GST_DEBUG_OBJECT (set, "fd (fd:%d, idx:%d)", fd->fd, fd->idx);
+ GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
- g_mutex_lock (set->lock);
+ g_mutex_lock (&((GstPoll *) set)->lock);
idx = find_index (set->active_fds, fd);
if (idx >= 0) {
res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
#endif
} else {
- GST_WARNING_OBJECT (set, "Couldn't find fd !");
+ GST_WARNING ("%p: couldn't find fd !", set);
}
- g_mutex_unlock (set->lock);
+ g_mutex_unlock (&((GstPoll *) set)->lock);
return res;
}
-static void
-gst_poll_check_ctrl_commands (GstPoll * set, gint res, gboolean * restarting)
-{
- /* check if the poll/select was aborted due to a command */
- if (set->controllable) {
-#ifndef G_OS_WIN32
- while (TRUE) {
- guchar cmd;
- gint result;
-
- /* we do not check the read status of the control socket here because
- * there may have been a write to the socket between the time the
- * poll/select finished and before we got the mutex back, and we need
- * to clear out the control socket before leaving */
- READ_COMMAND (set, cmd, result);
- if (result <= 0) {
- /* no more commands, quit the loop */
- break;
- }
-
- /* if the control socket is the only socket with activity when we get
- * here, we restart the _wait operation, else we allow the caller to
- * process the other file descriptors */
- if (res == 1 &&
- gst_poll_fd_can_read_unlocked (set, &set->control_read_fd))
- *restarting = TRUE;
- }
-#else
- if (WaitForSingleObject (set->wakeup_event, 0) == WAIT_OBJECT_0) {
- ResetEvent (set->wakeup_event);
- *restarting = TRUE;
- }
-#endif
- }
-}
-
/**
* gst_poll_wait:
* @set: a #GstPoll.
gst_poll_wait (GstPoll * set, GstClockTime timeout)
{
gboolean restarting;
+ gboolean is_timer;
int res;
+ gint old_waiting;
g_return_val_if_fail (set != NULL, -1);
- g_mutex_lock (set->lock);
-
GST_DEBUG ("timeout :%" GST_TIME_FORMAT, GST_TIME_ARGS (timeout));
+ is_timer = set->timer;
+
+ /* add one more waiter */
+ old_waiting = INC_WAITING (set);
+
/* we cannot wait from multiple threads unless we are a timer */
- if (G_UNLIKELY (set->waiting > 0 && !set->timer))
+ if (G_UNLIKELY (old_waiting > 0 && !is_timer))
goto already_waiting;
- /* flushing, exit immediatly */
- if (G_UNLIKELY (set->flushing))
+ /* flushing, exit immediately */
+ if (G_UNLIKELY (IS_FLUSHING (set)))
goto flushing;
- /* add one more waiter */
- set->waiting++;
-
do {
GstPollMode mode;
mode = choose_mode (set, timeout);
+ if (TEST_REBUILD (set)) {
+ g_mutex_lock (&set->lock);
#ifndef G_OS_WIN32
- g_array_set_size (set->active_fds, set->fds->len);
- memcpy (set->active_fds->data, set->fds->data,
- set->fds->len * sizeof (struct pollfd));
+ g_array_set_size (set->active_fds, set->fds->len);
+ memcpy (set->active_fds->data, set->fds->data,
+ set->fds->len * sizeof (struct pollfd));
#else
- if (!gst_poll_prepare_winsock_active_sets (set))
- goto winsock_error;
+ if (!gst_poll_prepare_winsock_active_sets (set))
+ goto winsock_error;
#endif
-
- g_mutex_unlock (set->lock);
+ g_mutex_unlock (&set->lock);
+ }
switch (mode) {
case GST_POLL_MODE_AUTO:
t = 0;
}
- wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
- (HANDLE *) set->active_events->data, FALSE, t, FALSE);
+ if (set->active_events->len != 0) {
+ wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
+ (HANDLE *) set->active_events->data, FALSE, t, FALSE);
+ } else {
+ wait_ret = WSA_WAIT_FAILED;
+ WSASetLastError (WSA_INVALID_PARAMETER);
+ }
if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
res = 0;
}
}
- g_mutex_lock (set->lock);
-
- if (!set->timer)
- gst_poll_check_ctrl_commands (set, res, &restarting);
+ if (!is_timer) {
+ /* Applications needs to clear the control socket themselves for timer
+ * polls.
+ * For other polls, we need to clear the control socket. If there was only
+ * one socket with activity and it was the control socket, we need to
+ * restart */
+ if (release_all_wakeup (set) > 0 && res == 1)
+ restarting = TRUE;
+ }
- /* update the controllable state if needed */
- set->controllable = set->new_controllable;
+ /* we got woken up and we are flushing, we need to stop */
+ if (G_UNLIKELY (IS_FLUSHING (set)))
+ goto flushing;
- if (G_UNLIKELY (set->flushing)) {
- /* we got woken up and we are flushing, we need to stop */
- errno = EBUSY;
- res = -1;
- break;
- }
} while (G_UNLIKELY (restarting));
- set->waiting--;
-
- g_mutex_unlock (set->lock);
+ DEC_WAITING (set);
return res;
/* ERRORS */
already_waiting:
{
- g_mutex_unlock (set->lock);
+ GST_LOG ("%p: we are already waiting", set);
+ DEC_WAITING (set);
errno = EPERM;
return -1;
}
flushing:
{
- g_mutex_unlock (set->lock);
+ GST_LOG ("%p: we are flushing", set);
+ DEC_WAITING (set);
errno = EBUSY;
return -1;
}
#ifdef G_OS_WIN32
winsock_error:
{
- set->waiting--;
- g_mutex_unlock (set->lock);
+ GST_LOG ("%p: winsock error", set);
+ g_mutex_unlock (&set->lock);
+ DEC_WAITING (set);
return -1;
}
#endif
{
g_return_val_if_fail (set != NULL, FALSE);
- GST_LOG_OBJECT (set, "controllable : %d", controllable);
-
- g_mutex_lock (set->lock);
-
-#ifndef G_OS_WIN32
- if (controllable && set->control_read_fd.fd < 0) {
- gint control_sock[2];
-
- if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
- goto no_socket_pair;
-
- fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
- fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
-
- set->control_read_fd.fd = control_sock[0];
- set->control_write_fd.fd = control_sock[1];
-
- gst_poll_add_fd_unlocked (set, &set->control_read_fd);
- }
-
- if (set->control_read_fd.fd >= 0)
- gst_poll_fd_ctl_read_unlocked (set, &set->control_read_fd, controllable);
-#endif
-
- /* delay the change of the controllable state if we are waiting */
- set->new_controllable = controllable;
- if (set->waiting == 0)
- set->controllable = controllable;
+ GST_LOG ("%p: controllable : %d", set, controllable);
- g_mutex_unlock (set->lock);
+ set->controllable = controllable;
return TRUE;
-
- /* ERRORS */
-#ifndef G_OS_WIN32
-no_socket_pair:
- {
- GST_WARNING_OBJECT (set, "Can't create socket pair !");
- g_mutex_unlock (set->lock);
- return FALSE;
- }
-#endif
}
/**
{
g_return_if_fail (set != NULL);
- g_mutex_lock (set->lock);
-
- if (set->controllable && set->waiting > 0) {
-#ifndef G_OS_WIN32
- gint result;
-
- /* if we are waiting, we can send the command, else we do not have to
- * bother, future calls will automatically pick up the new fdset */
- SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result);
-#else
- SetEvent (set->wakeup_event);
-#endif
+ if (set->controllable && GET_WAITING (set) > 0) {
+ /* we are controllable and waiting, wake up the waiter. The socket will be
+ * cleared by the _wait() thread and the poll will be restarted */
+ raise_wakeup (set);
}
-
- g_mutex_unlock (set->lock);
}
/**
{
g_return_if_fail (set != NULL);
- g_mutex_lock (set->lock);
+ GST_LOG ("%p: flushing: %d", set, flushing);
/* update the new state first */
- set->flushing = flushing;
+ SET_FLUSHING (set, flushing);
- if (flushing && set->controllable && set->waiting > 0) {
+ if (flushing && set->controllable && GET_WAITING (set) > 0) {
/* we are flushing, controllable and waiting, wake up the waiter. When we
* stop the flushing operation we don't clear the wakeup fd here, this will
* happen in the _wait() thread. */
-#ifndef G_OS_WIN32
- gint result;
-
- SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result);
-#else
- SetEvent (set->wakeup_event);
-#endif
+ raise_wakeup (set);
}
-
- g_mutex_unlock (set->lock);
}
/**
gboolean
gst_poll_write_control (GstPoll * set)
{
- gboolean res = FALSE;
+ gboolean res;
g_return_val_if_fail (set != NULL, FALSE);
+ g_return_val_if_fail (set->timer, FALSE);
- g_mutex_lock (set->lock);
- if (set->controllable) {
-#ifndef G_OS_WIN32
- gint result;
-
- SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result);
- res = (result > 0);
-#else
- res = SetEvent (set->wakeup_event);
-#endif
- }
- g_mutex_unlock (set->lock);
+ res = raise_wakeup (set);
return res;
}
gboolean
gst_poll_read_control (GstPoll * set)
{
- gboolean res = FALSE;
+ gboolean res;
g_return_val_if_fail (set != NULL, FALSE);
+ g_return_val_if_fail (set->timer, FALSE);
- g_mutex_lock (set->lock);
- if (set->controllable) {
-#ifndef G_OS_WIN32
- guchar cmd;
- gint result;
- READ_COMMAND (set, cmd, result);
- res = (result > 0);
-#else
- res = ResetEvent (set->wakeup_event);
-#endif
- }
- g_mutex_unlock (set->lock);
+ res = release_wakeup (set);
return res;
}