From abffdb2ac384740831352eb5957ecfad15428cd7 Mon Sep 17 00:00:00 2001 From: Jonas Holmberg Date: Tue, 3 Feb 2009 18:04:46 +0100 Subject: [PATCH] Implement the systemclock with gstpoll Add a property to select the clock type, currently REALTIME and MONOTONIC when posix timers are available. Implement the systemclock with GstPoll instead of GCond. This allows us to schedule timeouts with nanosecond precission on newer kernels and with ppoll support. It's also resilient to changes to the systemclock because of NTP or similar. --- docs/gst/gstreamer-sections.txt | 7 +- gst/gstclock.c | 2 +- gst/gstsystemclock.c | 343 ++++++++++++++++++++++++++++++++-------- gst/gstsystemclock.h | 18 ++- 4 files changed, 301 insertions(+), 69 deletions(-) diff --git a/docs/gst/gstreamer-sections.txt b/docs/gst/gstreamer-sections.txt index 0e79ca0..f15ea08 100644 --- a/docs/gst/gstreamer-sections.txt +++ b/docs/gst/gstreamer-sections.txt @@ -1967,14 +1967,15 @@ GST_TYPE_STRUCTURE gst_structure_get_type -
gstsystemclock GstSystemClock +GstClockType GstSystemClock gst_system_clock_obtain GstSystemClockClass +GstSystemClockPrivate GST_SYSTEM_CLOCK GST_IS_SYSTEM_CLOCK gst_system_clock_get_type @@ -1982,9 +1983,11 @@ GST_SYSTEM_CLOCK_CLASS GST_IS_SYSTEM_CLOCK_CLASS GST_SYSTEM_CLOCK_GET_CLASS GST_TYPE_SYSTEM_CLOCK +GST_SYSTEM_CLOCK_CAST +GST_TYPE_CLOCK_TYPE +gst_clock_type_get_type
-
gsttaglist GstTagList diff --git a/gst/gstclock.c b/gst/gstclock.c index a795c06..e3b0c8e 100644 --- a/gst/gstclock.c +++ b/gst/gstclock.c @@ -166,7 +166,7 @@ gst_clock_entry_new (GstClock * clock, GstClockTime time, entry->type = type; entry->time = time; entry->interval = interval; - entry->status = GST_CLOCK_BUSY; + entry->status = GST_CLOCK_OK; entry->func = NULL; entry->user_data = NULL; diff --git a/gst/gstsystemclock.c b/gst/gstsystemclock.c index ee7b889..e5eaa81 100644 --- a/gst/gstsystemclock.c +++ b/gst/gstsystemclock.c @@ -40,18 +40,42 @@ #include "gst_private.h" #include "gstinfo.h" - #include "gstsystemclock.h" +#include "gstpoll.h" /* Define this to get some extra debug about jitter from each clock_wait */ #undef WAIT_DEBUGGING +#define GST_TYPE_CLOCK_TYPE (gst_clock_type_get_type()) + +struct _GstSystemClockPrivate +{ + GstClockType clock_type; + GstPoll *timer; + gint async_wakeup_count; +}; + +#define GST_SYSTEM_CLOCK_GET_PRIVATE(obj) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_SYSTEM_CLOCK, \ + GstSystemClockPrivate)) + +enum +{ + PROP_0, + PROP_CLOCK_TYPE, + /* FILL ME */ +}; + /* the one instance of the systemclock */ static GstClock *_the_system_clock = NULL; static void gst_system_clock_class_init (GstSystemClockClass * klass); static void gst_system_clock_init (GstSystemClock * clock); static void gst_system_clock_dispose (GObject * object); +static void gst_system_clock_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_system_clock_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); static GstClockTime gst_system_clock_get_internal_time (GstClock * clock); static guint64 gst_system_clock_get_resolution (GstClock * clock); @@ -98,6 +122,22 @@ gst_system_clock_get_type (void) return clock_type; } +static GType +gst_clock_type_get_type (void) +{ + static GType clock_type_type = 0; + static const GEnumValue clock_types[] = { + {GST_CLOCK_TYPE_REALTIME, "GST_CLOCK_TYPE_REALTIME", "realtime"}, + {GST_CLOCK_TYPE_MONOTONIC, "GST_CLOCK_TYPE_MONOTONIC", "monotonic"}, + {0, NULL, NULL}, + }; + + if (G_UNLIKELY (!clock_type_type)) { + clock_type_type = g_enum_register_static ("GstClockType", clock_types); + } + return clock_type_type; +} + static void gst_system_clock_class_init (GstSystemClockClass * klass) { @@ -111,7 +151,16 @@ gst_system_clock_class_init (GstSystemClockClass * klass) parent_class = g_type_class_peek_parent (klass); + g_type_class_add_private (klass, sizeof (GstSystemClockPrivate)); + gobject_class->dispose = gst_system_clock_dispose; + gobject_class->set_property = gst_system_clock_set_property; + gobject_class->get_property = gst_system_clock_get_property; + + g_object_class_install_property (gobject_class, PROP_CLOCK_TYPE, + g_param_spec_enum ("clock-type", "Clock type", + "The type of underlying clock implementation used", + GST_TYPE_CLOCK_TYPE, GST_CLOCK_TYPE_REALTIME, G_PARAM_READWRITE)); gstclock_class->get_internal_time = gst_system_clock_get_internal_time; gstclock_class->get_resolution = gst_system_clock_get_resolution; @@ -129,11 +178,16 @@ gst_system_clock_init (GstSystemClock * clock) GST_CLOCK_FLAG_CAN_DO_PERIODIC_SYNC | GST_CLOCK_FLAG_CAN_DO_PERIODIC_ASYNC); + clock->priv = GST_SYSTEM_CLOCK_GET_PRIVATE (clock); + + clock->priv->clock_type = GST_CLOCK_TYPE_REALTIME; + clock->priv->timer = gst_poll_new_timer (); + #if 0 /* Uncomment this to start the async clock thread straight away */ - GST_CLOCK_LOCK (clock); + GST_OBJECT_LOCK (clock); gst_system_clock_start_async (clock); - GST_CLOCK_UNLOCK (clock); + GST_OBJECT_UNLOCK (clock); #endif } @@ -141,8 +195,7 @@ static void gst_system_clock_dispose (GObject * object) { GstClock *clock = (GstClock *) object; - - GstSystemClock *sysclock = GST_SYSTEM_CLOCK (clock); + GstSystemClock *sysclock = GST_SYSTEM_CLOCK_CAST (clock); GList *entries; /* else we have to stop the thread */ @@ -165,6 +218,8 @@ gst_system_clock_dispose (GObject * object) sysclock->thread = NULL; GST_CAT_DEBUG (GST_CAT_CLOCK, "joined thread"); + gst_poll_free (sysclock->priv->timer); + G_OBJECT_CLASS (parent_class)->dispose (object); if (_the_system_clock == clock) { @@ -173,6 +228,40 @@ gst_system_clock_dispose (GObject * object) } } +static void +gst_system_clock_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstSystemClock *sysclock = GST_SYSTEM_CLOCK (object); + + switch (prop_id) { + case PROP_CLOCK_TYPE: + sysclock->priv->clock_type = g_value_get_enum (value); + GST_CAT_DEBUG (GST_CAT_CLOCK, "clock-type set to %d", + sysclock->priv->clock_type); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_system_clock_get_property (GObject * object, guint prop_id, GValue * value, + GParamSpec * pspec) +{ + GstSystemClock *sysclock = GST_SYSTEM_CLOCK (object); + + switch (prop_id) { + case PROP_CLOCK_TYPE: + g_value_set_enum (value, sysclock->priv->clock_type); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + /** * gst_system_clock_obtain: * @@ -214,6 +303,30 @@ gst_system_clock_obtain (void) return clock; } +static void +gst_system_clock_clear_async_wakeups_unlocked (GstSystemClock * sysclock) +{ + while (sysclock->priv->async_wakeup_count > 0) { + GST_CAT_DEBUG (GST_CAT_CLOCK, "reading control"); + while (!gst_poll_read_control (sysclock->priv->timer)) { + g_warning ("gstsystemclock: read control failed, trying again\n"); + } + sysclock->priv->async_wakeup_count--; + } + GST_CLOCK_BROADCAST (sysclock); +} + +static void +gst_system_clock_wakeup_async_unlocked (GstSystemClock * sysclock) +{ + GST_CAT_DEBUG (GST_CAT_CLOCK, "writing control"); + + while (!gst_poll_write_control (sysclock->priv->timer)) { + g_warning ("gstsystemclock: write control failed, trying again\n"); + } + sysclock->priv->async_wakeup_count++; +} + /* this thread reads the sorted clock entries from the queue. * * It waits on each of them and fires the callback when the timeout occurs. @@ -229,7 +342,7 @@ gst_system_clock_obtain (void) static void gst_system_clock_async_thread (GstClock * clock) { - GstSystemClock *sysclock = GST_SYSTEM_CLOCK (clock); + GstSystemClock *sysclock = GST_SYSTEM_CLOCK_CAST (clock); GST_CAT_DEBUG (GST_CAT_CLOCK, "enter system clock thread"); GST_OBJECT_LOCK (clock); @@ -285,6 +398,10 @@ gst_system_clock_async_thread (GstClock * clock) entry->user_data); GST_OBJECT_LOCK (clock); } + if (clock->entries->data != entry) { + /* new entries have been added, clear async wakeups */ + gst_system_clock_clear_async_wakeups_unlocked (sysclock); + } if (entry->type == GST_CLOCK_ENTRY_PERIODIC) { /* adjust time now */ entry->time = requested + entry->interval; @@ -303,6 +420,8 @@ gst_system_clock_async_thread (GstClock * clock) * was canceled. Whatever it is, pick the head entry of the list and * continue waiting. */ GST_CAT_DEBUG (GST_CAT_CLOCK, "async entry %p needs restart", entry); + /* clear async wakeups, if any */ + gst_system_clock_clear_async_wakeups_unlocked (sysclock); continue; default: GST_CAT_DEBUG (GST_CAT_CLOCK, @@ -323,21 +442,60 @@ exit: GST_CAT_DEBUG (GST_CAT_CLOCK, "exit system clock thread"); } +#ifdef HAVE_POSIX_TIMERS +static inline clockid_t +clock_type_to_posix_id (GstClockType clock_type) +{ +#ifdef HAVE_MONOTONIC_CLOCK + if (clock_type == GST_CLOCK_TYPE_MONOTONIC) + return CLOCK_MONOTONIC; + else +#endif + return CLOCK_REALTIME; +} +#endif + /* MT safe */ static GstClockTime gst_system_clock_get_internal_time (GstClock * clock) { +#ifdef HAVE_POSIX_TIMERS + GstSystemClock *sysclock = GST_SYSTEM_CLOCK_CAST (clock); + clockid_t ptype; + struct timespec ts; + + ptype = clock_type_to_posix_id (sysclock->priv->clock_type); + + if (G_UNLIKELY (clock_gettime (ptype, &ts))) + return GST_CLOCK_TIME_NONE; + + return GST_TIMESPEC_TO_TIME (ts); +#else GTimeVal timeval; g_get_current_time (&timeval); return GST_TIMEVAL_TO_TIME (timeval); +#endif } static guint64 gst_system_clock_get_resolution (GstClock * clock) { +#ifdef HAVE_POSIX_TIMERS + GstSystemClock *sysclock = GST_SYSTEM_CLOCK_CAST (clock); + clockid_t ptype; + struct timespec ts; + + ptype = clock_type_to_posix_id (sysclock->priv->clock_type); + + if (G_UNLIKELY (clock_getres (ptype, &ts))) + return GST_CLOCK_TIME_NONE; + + return GST_TIMESPEC_TO_TIME (ts); +#else return 1 * GST_USECOND; +#endif } /* synchronously wait on the given GstClockEntry. @@ -360,81 +518,119 @@ static GstClockReturn gst_system_clock_id_wait_jitter_unlocked (GstClock * clock, GstClockEntry * entry, GstClockTimeDiff * jitter, gboolean restart) { - GstClockTime entryt, real, now, target; + GstSystemClock *sysclock = GST_SYSTEM_CLOCK_CAST (clock); + GstClockTime entryt, real, now; GstClockTimeDiff diff; /* need to call the overridden method because we want to sync against the time * of the clock, whatever the subclass uses as a clock. */ real = GST_CLOCK_GET_CLASS (clock)->get_internal_time (clock); + now = gst_clock_adjust_unlocked (clock, real); + + /* get the time of the entry */ entryt = GST_CLOCK_ENTRY_TIME (entry); - now = gst_clock_adjust_unlocked (clock, real); if (jitter) { *jitter = GST_CLOCK_DIFF (entryt, now); } /* the diff of the entry with the clock is the amount of time we have to * wait */ diff = entryt - now; - /* Our GCond implementation expects an absolute time against the system clock - * as a timeout value. We use our internal time to get the system time and add - * the expected timeout to it, this gives us the absolute time of the - * timeout. */ - target = gst_system_clock_get_internal_time (clock) + diff; GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p" - " target %" GST_TIME_FORMAT - " entry %" GST_TIME_FORMAT + " time %" GST_TIME_FORMAT " now %" GST_TIME_FORMAT " real %" GST_TIME_FORMAT - " diff (entry-now) %" G_GINT64_FORMAT, + " diff (time-now) %" G_GINT64_FORMAT, entry, - GST_TIME_ARGS (target), GST_TIME_ARGS (entryt), GST_TIME_ARGS (now), GST_TIME_ARGS (real), diff); if (diff > 0) { - GTimeVal tv; - #ifdef WAIT_DEBUGGING - GstClockTime result, final; + GstClockTime final; #endif - GST_TIME_TO_TIMEVAL (target, tv); + while (entry->status != GST_CLOCK_UNSCHEDULED) { + gint pollret; - while (TRUE) { - /* now wait on the entry, it either times out or the cond is signaled. */ - if (!GST_CLOCK_TIMED_WAIT (clock, &tv)) { - /* timeout, this is fine, we can report success now */ - GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p unlocked after timeout", entry); - entry->status = GST_CLOCK_OK; + /* mark the entry as busy */ + entry->status = GST_CLOCK_BUSY; + GST_OBJECT_UNLOCK (clock); -#ifdef WAIT_DEBUGGING + /* now wait on the entry, it either times out or the fd is written. */ + pollret = gst_poll_wait (sysclock->priv->timer, diff); + + /* another thread can read the fd before we get the lock */ + GST_OBJECT_LOCK (clock); + if (entry->status == GST_CLOCK_UNSCHEDULED) { + + GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p unlocked", entry); + while (!gst_poll_read_control (sysclock->priv->timer)) { + g_warning ("gstsystemclock: read control failed, trying again\n"); + } + GST_CLOCK_BROADCAST (clock); + } else { + if (pollret != 0) { + /* some other id got unlocked */ + if (!restart) { + /* this can happen if the entry got unlocked because of an async + * entry was added to the head of the async queue. */ + GST_CAT_DEBUG (GST_CAT_CLOCK, "wakeup waiting for entry %p", entry); + break; + } + + /* mark ourselves as EARLY, we release the lock and we could be + * unscheduled ourselves but we don't want the unscheduling thread + * to write on the fd */ + entry->status = GST_CLOCK_EARLY; + + /* before waiting on the cond, check if another thread read the fd + * before we got the lock */ + while (gst_poll_wait (sysclock->priv->timer, 0) > 0) { + GST_CLOCK_WAIT (clock); + } + + /* we released the lock in the wait, recheck our status */ + if (entry->status == GST_CLOCK_UNSCHEDULED) { + GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p got unscheduled", entry); + break; + } + + GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p needs to be restarted", + entry); + } else { + GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p unlocked after timeout", + entry); + } + + /* reschedule if gst_poll_wait returned early or we have to reschedule after + * an unlock*/ real = GST_CLOCK_GET_CLASS (clock)->get_internal_time (clock); - result = gst_clock_adjust_unlocked (clock, real); - final = gst_system_clock_get_internal_time (clock); - GST_CAT_DEBUG (GST_CAT_CLOCK, "Waited for %" G_GINT64_FORMAT - " got %" G_GINT64_FORMAT " diff %" G_GINT64_FORMAT - " %g target-offset %" G_GINT64_FORMAT " %g", entryt, result, - result - entryt, - (double) (GstClockTimeDiff) (result - entryt) / GST_SECOND, - (final - target), - ((double) (GstClockTimeDiff) (final - target)) / GST_SECOND); + now = gst_clock_adjust_unlocked (clock, real); + diff = entryt - now; + + if (diff <= 0) { + /* timeout, this is fine, we can report success now */ + entry->status = GST_CLOCK_OK; + + GST_CAT_DEBUG (GST_CAT_CLOCK, + "entry %p finished, diff %" G_GINT64_FORMAT, entry, diff); + +#ifdef WAIT_DEBUGGING + final = gst_system_clock_get_internal_time (clock); + GST_CAT_DEBUG (GST_CAT_CLOCK, "Waited for %" G_GINT64_FORMAT + " got %" G_GINT64_FORMAT " diff %" G_GINT64_FORMAT + " %g target-offset %" G_GINT64_FORMAT " %g", entryt, now, + now - entryt, + (double) (GstClockTimeDiff) (now - entryt) / GST_SECOND, + (final - target), + ((double) (GstClockTimeDiff) (final - target)) / GST_SECOND); #endif - break; - } else { - /* the waiting is interrupted because the GCond was signaled. This can - * be because this or some other entry was unscheduled. */ - GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p unlocked with signal", entry); - /* if the entry is unscheduled, we can stop waiting for it, else we - * continue our while loop. */ - if (entry->status == GST_CLOCK_UNSCHEDULED) - break; - /* else restart if we must */ - if (!restart) break; - - /* this can happen if the entry got unlocked because of an async entry - * was added to the head of the async queue. */ - GST_CAT_DEBUG (GST_CAT_CLOCK, "continue waiting for entry %p", entry); + } else { + GST_CAT_DEBUG (GST_CAT_CLOCK, + "entry %p restart, diff %" G_GINT64_FORMAT, entry, diff); + } } } } else if (diff == 0) { @@ -496,14 +692,21 @@ no_thread: static GstClockReturn gst_system_clock_id_wait_async (GstClock * clock, GstClockEntry * entry) { + gboolean empty; + GstSystemClock *sysclock; + + sysclock = GST_SYSTEM_CLOCK_CAST (clock); + GST_CAT_DEBUG (GST_CAT_CLOCK, "adding async entry %p", entry); GST_OBJECT_LOCK (clock); /* Start the clock async thread if needed */ - if (!gst_system_clock_start_async (GST_SYSTEM_CLOCK (clock))) + if (!gst_system_clock_start_async (sysclock)) goto thread_error; + empty = (clock->entries == NULL); + /* need to take a ref */ gst_clock_id_ref ((GstClockID) entry); /* insert the entry in sorted order */ @@ -514,13 +717,17 @@ gst_system_clock_id_wait_async (GstClock * clock, GstClockEntry * entry) * front, else the thread is just waiting for another entry and * will get to this entry automatically. */ if (clock->entries->data == entry) { - GST_CAT_DEBUG (GST_CAT_CLOCK, "async entry added to head, sending signal"); - /* this will wake up _all_ entries waiting for the clock because we have - * only one cond for all entries (makes allocation faster). Entries that - * have not timed out will have their status set to BUSY and should continue - * to wait. In the case of the async ones, the new head entry should be - * taken and waited for. */ - GST_CLOCK_BROADCAST (clock); + GST_CAT_DEBUG (GST_CAT_CLOCK, "async entry added to head"); + if (empty) { + /* the list was empty before, signal the cond so that the async thread can + * start taking a look at the queue */ + GST_CAT_DEBUG (GST_CAT_CLOCK, "sending signal"); + GST_CLOCK_BROADCAST (clock); + } else { + /* the async thread was waiting for an entry, unlock the wait so that it + * looks at the new head entry instead */ + gst_system_clock_wakeup_async_unlocked (sysclock); + } } GST_OBJECT_UNLOCK (clock); @@ -544,11 +751,17 @@ gst_system_clock_id_unschedule (GstClock * clock, GstClockEntry * entry) GST_CAT_DEBUG (GST_CAT_CLOCK, "unscheduling entry %p", entry); GST_OBJECT_LOCK (clock); - /* mark entry as unscheduled, then wake up all entries. The entries that did - * not timeout will be woken up but immediatly go to sleep again because their - * status would still be busy. */ + if (entry->status == GST_CLOCK_BUSY) { + /* the entry was being busy, wake up all entries so that they recheck their + * status. We cannot wake up just one entry because allocating such a + * datastructure for each entry would be too heavey and unlocking an entry + * is usually done when shutting down or some other exceptional case. */ + GST_CAT_DEBUG (GST_CAT_CLOCK, "writing control"); + while (!gst_poll_write_control (GST_SYSTEM_CLOCK_CAST (clock)->priv->timer)) { + g_warning ("gstsystemclock: write control failed, trying again\n"); + } + } + /* when it leaves the poll, it'll detect the unscheduled */ entry->status = GST_CLOCK_UNSCHEDULED; - GST_CAT_DEBUG (GST_CAT_CLOCK, "sending signal"); - GST_CLOCK_BROADCAST (clock); GST_OBJECT_UNLOCK (clock); } diff --git a/gst/gstsystemclock.h b/gst/gstsystemclock.h index b164c72..7e13cfa 100644 --- a/gst/gstsystemclock.h +++ b/gst/gstsystemclock.h @@ -31,6 +31,7 @@ G_BEGIN_DECLS #define GST_TYPE_SYSTEM_CLOCK (gst_system_clock_get_type ()) #define GST_SYSTEM_CLOCK(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SYSTEM_CLOCK, GstSystemClock)) +#define GST_SYSTEM_CLOCK_CAST(obj) ((GstSystemClock *)(obj)) #define GST_IS_SYSTEM_CLOCK(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SYSTEM_CLOCK)) #define GST_SYSTEM_CLOCK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SYSTEM_CLOCK, GstSystemClockClass)) #define GST_IS_SYSTEM_CLOCK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SYSTEM_CLOCK)) @@ -39,6 +40,18 @@ G_BEGIN_DECLS typedef struct _GstSystemClock GstSystemClock; typedef struct _GstSystemClockClass GstSystemClockClass; +typedef struct _GstSystemClockPrivate GstSystemClockPrivate; + +/** + * GstClockType: + * @GST_CLOCK_TYPE_REALTIME: time since Epoch + * @GST_CLOCK_TYPE_MONOTONIC: monotonic time since some unspecified starting + * point + */ +typedef enum { + GST_CLOCK_TYPE_REALTIME = 0, + GST_CLOCK_TYPE_MONOTONIC = 1 +} GstClockType; /** * GstSystemClock: @@ -53,7 +66,10 @@ struct _GstSystemClock { GThread *thread; /* thread for async notify */ gboolean stopping; - gpointer _gst_reserved[GST_PADDING]; + /* ABI added */ + GstSystemClockPrivate *priv; + + gpointer _gst_reserved[GST_PADDING - 1]; }; struct _GstSystemClockClass { -- 2.7.4