#define GST_SYSTEM_CLOCK_UNLOCK(clock) g_mutex_unlock(GST_SYSTEM_CLOCK_GET_LOCK(clock))
#define GST_SYSTEM_CLOCK_GET_COND(clock) (&GST_SYSTEM_CLOCK_CAST(clock)->priv->entries_changed)
#define GST_SYSTEM_CLOCK_WAIT(clock) g_cond_wait(GST_SYSTEM_CLOCK_GET_COND(clock),GST_SYSTEM_CLOCK_GET_LOCK(clock))
-#define GST_SYSTEM_CLOCK_WAIT_UNTIL(clock,us) g_cond_wait_until(GST_SYSTEM_CLOCK_GET_COND(clock),GST_SYSTEM_CLOCK_GET_LOCK(clock),us)
#define GST_SYSTEM_CLOCK_BROADCAST(clock) g_cond_broadcast(GST_SYSTEM_CLOCK_GET_COND(clock))
+#if defined(HAVE_FUTEX)
+#include <unistd.h>
+#include <linux/futex.h>
+#include <sys/syscall.h>
+
+#ifndef FUTEX_WAIT_BITSET_PRIVATE
+#define FUTEX_WAIT_BITSET_PRIVATE FUTEX_WAIT_BITSET
+#endif
+#ifndef FUTEX_WAKE_PRIVATE
+#define FUTEX_WAKE_PRIVATE FUTEX_WAKE
+#endif
+
+#define GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry) (&(entry)->lock)
+#define GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry) (&(entry)->cond_val)
+#define GST_SYSTEM_CLOCK_ENTRY_LOCK(entry) (g_mutex_lock(GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry)))
+#define GST_SYSTEM_CLOCK_ENTRY_UNLOCK(entry) (g_mutex_unlock(GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry)))
+#define GST_SYSTEM_CLOCK_ENTRY_WAIT_UNTIL(entry,ns) gst_futex_cond_wait_until(GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry),GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry),ns)
+#define GST_SYSTEM_CLOCK_ENTRY_BROADCAST(entry) gst_futex_cond_broadcast(GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry))
+
+typedef struct _GstClockEntryFutex GstClockEntryImpl;
+struct _GstClockEntryFutex
+{
+ GstClockEntry entry;
+ GWeakRef clock;
+ GDestroyNotify destroy_entry;
+
+ gboolean initialized;
+
+ GMutex lock;
+ guint cond_val;
+};
+
+static void
+clear_entry (GstClockEntryImpl * entry)
+{
+ g_mutex_clear (&entry->lock);
+}
+
+static void
+init_entry (GstClockEntryImpl * entry)
+{
+ g_mutex_init (&entry->lock);
+
+ entry->destroy_entry = (GDestroyNotify) clear_entry;
+}
+
+static void
+gst_futex_cond_broadcast (guint * cond_val)
+{
+ g_atomic_int_inc (cond_val);
+
+ syscall (__NR_futex, cond_val, (gsize) FUTEX_WAKE_PRIVATE, (gsize) INT_MAX,
+ NULL);
+}
+
+static gboolean
+gst_futex_cond_wait_until (guint * cond_val, GMutex * mutex, gint64 end_time)
+{
+ struct timespec end;
+ guint sampled;
+ int res;
+ gboolean success;
+
+ if (end_time < 0)
+ return FALSE;
+
+ end.tv_sec = end_time / 1000000000;
+ end.tv_nsec = end_time % 1000000000;
+
+ sampled = *cond_val;
+ g_mutex_unlock (mutex);
+ /* we use FUTEX_WAIT_BITSET_PRIVATE rather than FUTEX_WAIT_PRIVATE to be
+ * able to use absolute time */
+ res =
+ syscall (__NR_futex, cond_val, (gsize) FUTEX_WAIT_BITSET_PRIVATE,
+ (gsize) sampled, &end, NULL, FUTEX_BITSET_MATCH_ANY);
+ success = (res < 0 && errno == ETIMEDOUT) ? FALSE : TRUE;
+ g_mutex_lock (mutex);
+
+ return success;
+}
+
+#elif defined (G_OS_UNIX)
+#define GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry) (&(entry)->lock)
+#define GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry) (&(entry)->cond)
+#define GST_SYSTEM_CLOCK_ENTRY_LOCK(entry) (pthread_mutex_lock(GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry)))
+#define GST_SYSTEM_CLOCK_ENTRY_UNLOCK(entry) (pthread_mutex_unlock(GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry)))
+#define GST_SYSTEM_CLOCK_ENTRY_WAIT_UNTIL(entry,ns) gst_pthread_cond_wait_until(GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry),GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry),(ns))
+#define GST_SYSTEM_CLOCK_ENTRY_BROADCAST(entry) pthread_cond_broadcast(GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry))
+
+typedef struct _GstClockEntryPThread GstClockEntryImpl;
+struct _GstClockEntryPThread
+{
+ GstClockEntry entry;
+ GWeakRef clock;
+ GDestroyNotify destroy_entry;
+
+ gboolean initialized;
+
+ pthread_cond_t cond;
+ pthread_mutex_t lock;
+};
+
+static gboolean
+gst_pthread_cond_wait_until (pthread_cond_t * cond, pthread_mutex_t * lock,
+ guint64 end_time)
+{
+ struct timespec ts;
+ gint status;
+
+#if defined (HAVE_PTHREAD_CONDATTR_SETCLOCK) && defined (CLOCK_MONOTONIC)
+ /* This is the exact check we used during init to set the clock to
+ * monotonic, so if we're in this branch, timedwait() will already be
+ * expecting a monotonic clock.
+ */
+ {
+ ts.tv_sec = end_time / 1000000000;
+ ts.tv_nsec = end_time % 1000000000;
+
+ if ((status = pthread_cond_timedwait (cond, lock, &ts)) == 0)
+ return TRUE;
+ }
+#elif defined (HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP)
+ /* end_time is given relative to the monotonic clock as returned by
+ * g_get_monotonic_time().
+ *
+ * Since this pthreads wants the relative time, convert it back again.
+ */
+ {
+ gint64 now = g_get_monotonic_time () * 1000;
+ gint64 relative;
+
+ if (end_time <= now)
+ return FALSE;
+
+ relative = end_time - now;
+
+ ts.tv_sec = relative / 1000000000;
+ ts.tv_nsec = relative % 1000000000;
+
+ if ((status = pthread_cond_timedwait_relative_np (cond, lock, &ts)) == 0)
+ return TRUE;
+ }
+#else
+#error Cannot use pthread condition variables on your platform.
+#endif
+
+ if (G_UNLIKELY (status != ETIMEDOUT)) {
+ g_error ("pthread_cond_timedwait returned %d", status);
+ }
+
+ return FALSE;
+}
+
+static void
+clear_entry (GstClockEntryImpl * entry)
+{
+ pthread_cond_destroy (&entry->cond);
+ pthread_mutex_destroy (&entry->lock);
+}
+
+static void
+init_entry (GstClockEntryImpl * entry)
+{
+ pthread_mutexattr_t *m_pattr = NULL;
+#ifdef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
+ pthread_mutexattr_t m_attr;
+#endif
+ pthread_condattr_t c_attr;
+ gint status;
+
+ pthread_condattr_init (&c_attr);
+
+#if defined (HAVE_PTHREAD_CONDATTR_SETCLOCK) && defined (CLOCK_MONOTONIC)
+ status = pthread_condattr_setclock (&c_attr, CLOCK_MONOTONIC);
+ if (G_UNLIKELY (status != 0)) {
+ g_error ("pthread_condattr_setclock returned %d", status);
+ }
+#elif defined (HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP)
+#else
+#error Cannot use pthread condition variables on your platform.
+#endif
+
+ status = pthread_cond_init (&entry->cond, &c_attr);
+ if (G_UNLIKELY (status != 0)) {
+ g_error ("pthread_cond_init returned %d", status);
+ }
+
+ pthread_condattr_destroy (&c_attr);
+
+#ifdef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
+ pthread_mutexattr_init (&m_attr);
+ pthread_mutexattr_settype (&m_attr, PTHREAD_MUTEX_ADAPTIVE_NP);
+ m_pattr = &m_attr;
+#endif
+
+ status = pthread_mutex_init (&entry->lock, m_pattr);
+ if (G_UNLIKELY (status != 0)) {
+ g_error ("pthread_mutex_init returned %d", status);
+ }
+#ifdef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
+ pthread_mutexattr_destroy (&m_attr);
+#endif
+
+ entry->destroy_entry = (GDestroyNotify) clear_entry;
+}
+#else
+#define GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry) (&(entry)->lock)
+#define GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry) (&(entry)->cond)
+#define GST_SYSTEM_CLOCK_ENTRY_LOCK(entry) (g_mutex_lock(GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry)))
+#define GST_SYSTEM_CLOCK_ENTRY_UNLOCK(entry) (g_mutex_unlock(GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry)))
+#define GST_SYSTEM_CLOCK_ENTRY_WAIT_UNTIL(entry,ns) g_cond_wait_until(GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry),GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry),(ns / 1000))
+#define GST_SYSTEM_CLOCK_ENTRY_BROADCAST(entry) g_cond_broadcast(GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry))
+
+typedef struct _GstClockEntryGLib GstClockEntryImpl;
+struct _GstClockEntryGLib
+{
+ GstClockEntry entry;
+ GWeakRef clock;
+ GDestroyNotify destroy_entry;
+
+ gboolean initialized;
+
+ GMutex lock;
+ GCond cond;
+};
+
+static void
+clear_entry (GstClockEntryImpl * entry)
+{
+ g_cond_clear (&entry->cond);
+ g_mutex_clear (&entry->lock);
+}
+
+static void
+init_entry (GstClockEntryImpl * entry)
+{
+ g_cond_init (&entry->cond);
+ g_mutex_init (&entry->lock);
+
+ entry->destroy_entry = (GDestroyNotify) clear_entry;
+}
+#endif
+
+/* check that our impl is smaller than what will be allocated by gstclock.c */
+G_STATIC_ASSERT (sizeof (GstClockEntryImpl) <=
+ sizeof (struct _GstClockEntryImpl));
+
+static inline void
+ensure_entry_initialized (GstClockEntryImpl * entry_impl)
+{
+ if (!entry_impl->initialized) {
+ init_entry (entry_impl);
+ entry_impl->initialized = TRUE;
+ }
+}
+
struct _GstSystemClockPrivate
{
GThread *thread; /* thread for async notify */
SET_ENTRY_STATUS (entry, GST_CLOCK_UNSCHEDULED);
}
GST_SYSTEM_CLOCK_BROADCAST (clock);
+ if (priv->entries) {
+ GstClockEntryImpl *entry = (GstClockEntryImpl *) priv->entries->data;
+ ensure_entry_initialized (entry);
+ GST_SYSTEM_CLOCK_ENTRY_BROADCAST (entry);
+ }
GST_SYSTEM_CLOCK_UNLOCK (clock);
if (priv->thread)
requested = entry->time;
+ GST_CAT_DEBUG_OBJECT (GST_CAT_CLOCK, clock, "waiting on entry %p", entry);
+
+ GST_SYSTEM_CLOCK_UNLOCK (clock);
+
/* now wait for the entry */
res =
gst_system_clock_id_wait_jitter_unlocked (clock, (GstClockID) entry,
NULL, FALSE);
+ GST_SYSTEM_CLOCK_LOCK (clock);
+
switch (res) {
case GST_CLOCK_UNSCHEDULED:
/* entry was unscheduled, move to the next */
/* synchronously wait on the given GstClockEntry.
*
- * We do this by blocking on the global GstPoll timer with
- * the requested timeout. This allows us to unblock the
- * entry by writing on the control fd.
- *
- * Note that writing the global GstPoll unlocks all waiting entries. So
- * we need to check if an unlocked entry has changed when it unlocks.
+ * We do this by blocking on the entry specifically rather than a global
+ * condition variable so that each possible thread may be woken up
+ * individually. This ensures that we don't wake up possibly multiple threads
+ * when unscheduling an entry.
*
* Entries that arrive too late are simply not waited on and a
* GST_CLOCK_EARLY result is returned.
/* now wait on the entry, it either times out or the cond is signalled.
* The status of the entry is BUSY only around the wait. */
- waitret = GST_SYSTEM_CLOCK_WAIT_UNTIL (clock, mono_ts + (diff / 1000));
+
+ GST_SYSTEM_CLOCK_ENTRY_LOCK ((GstClockEntryImpl *) entry);
+ waitret =
+ GST_SYSTEM_CLOCK_ENTRY_WAIT_UNTIL ((GstClockEntryImpl *) entry,
+ mono_ts * 1000 + diff);
+ GST_SYSTEM_CLOCK_ENTRY_UNLOCK ((GstClockEntryImpl *) entry);
/* get the new status, mark as DONE. We do this so that the unschedule
* function knows when we left the poll and doesn't need to wakeup the
GstClockTimeDiff * jitter)
{
GstClockReturn status;
+ GstClockEntryImpl *entry_impl = (GstClockEntryImpl *) entry;
- GST_SYSTEM_CLOCK_LOCK (clock);
do {
status = GET_ENTRY_STATUS (entry);
/* stop when we are unscheduled */
if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
- GST_SYSTEM_CLOCK_UNLOCK (clock);
return status;
}
* statuses */
} while (G_UNLIKELY (!CAS_ENTRY_STATUS (entry, status, GST_CLOCK_BUSY)));
+ GST_CAT_DEBUG_OBJECT (GST_CAT_CLOCK, clock, "waiting on entry %p", entry);
+
+ if (!entry_impl->initialized) {
+ GST_SYSTEM_CLOCK_LOCK (clock);
+ ensure_entry_initialized (entry_impl);
+ GST_SYSTEM_CLOCK_UNLOCK (clock);
+ }
+
status =
gst_system_clock_id_wait_jitter_unlocked (clock, entry, jitter, TRUE);
- GST_SYSTEM_CLOCK_UNLOCK (clock);
return status;
}
/* need to take a ref */
gst_clock_id_ref ((GstClockID) entry);
+
+ ensure_entry_initialized ((GstClockEntryImpl *) entry);
+
/* insert the entry in sorted order */
priv->entries = g_list_insert_sorted (priv->entries, entry,
gst_clock_id_compare_func);
* looks at the new head entry instead, we only need to do this once */
GST_CAT_DEBUG_OBJECT (GST_CAT_CLOCK, clock,
"head entry was busy. Wakeup async thread");
- GST_SYSTEM_CLOCK_BROADCAST (clock);
+ GST_SYSTEM_CLOCK_ENTRY_BROADCAST ((GstClockEntryImpl *) head);
}
}
}
{
GstClockReturn status;
- GST_CAT_DEBUG_OBJECT (GST_CAT_CLOCK, clock, "unscheduling entry %p", entry);
-
GST_SYSTEM_CLOCK_LOCK (clock);
+
+ GST_CAT_DEBUG_OBJECT (GST_CAT_CLOCK, clock, "unscheduling entry %p time %"
+ GST_TIME_FORMAT, entry, GST_TIME_ARGS (GST_CLOCK_ENTRY_TIME (entry)));
+
/* change the entry status to unscheduled */
do {
status = GET_ENTRY_STATUS (entry);
GST_CLOCK_UNSCHEDULED)));
if (G_LIKELY (status == GST_CLOCK_BUSY)) {
- /* the entry was being busy, wake up all entries so that they recheck their
- * status. We cannot wake up just one entry because allocating such a
- * datastructure for each entry would be too heavy and unlocking an entry
- * is usually done when shutting down or some other exceptional case. */
+ /* the entry was being busy, wake up the entry */
GST_CAT_DEBUG_OBJECT (GST_CAT_CLOCK, clock, "entry was BUSY, doing wakeup");
if (!entry->unscheduled) {
- GST_SYSTEM_CLOCK_BROADCAST (clock);
+ ensure_entry_initialized ((GstClockEntryImpl *) entry);
+ GST_SYSTEM_CLOCK_ENTRY_BROADCAST ((GstClockEntryImpl *) entry);
}
}
GST_SYSTEM_CLOCK_UNLOCK (clock);