- Reworked the clock to prepare for async notifications
authorWim Taymans <wim.taymans@gmail.com>
Sat, 2 Nov 2002 13:54:34 +0000 (13:54 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Sat, 2 Nov 2002 13:54:34 +0000 (13:54 +0000)
Original commit message from CVS:
- Reworked the clock to prepare for async notifications
- moved some common scheduler checking to gstbin
- added some vmethods to gstbin for future use
- more fixes to the optimal scheduler
- use new clock api in the schedulers

gst/gstbin.c
gst/gstbin.h
gst/gstclock.c
gst/gstclock.h
gst/gstscheduler.c
gst/gstsystemclock.c
gst/schedulers/Makefile.am
gst/schedulers/gstbasicscheduler.c
gst/schedulers/gstoptimalscheduler.c

index 8d05ba826eed7f023ae6f27346ce704fbe5f3ad4..9edac83aaac568fa78b6125009ab86af253cc13a 100644 (file)
@@ -139,9 +139,6 @@ gst_bin_init (GstBin * bin)
   bin->post_iterate_func = NULL;
   bin->pre_iterate_private = NULL;
   bin->post_iterate_private = NULL;
-
-  bin->iterate_mutex = g_mutex_new ();
-  bin->iterate_cond = g_cond_new ();
 }
 
 /**
@@ -218,11 +215,6 @@ gst_bin_set_element_sched (GstElement *element, GstScheduler *sched)
   GList *children;
   GstElement *child;
 
-  g_return_if_fail (element != NULL);
-  g_return_if_fail (GST_IS_ELEMENT (element));
-  g_return_if_fail (sched != NULL);
-  g_return_if_fail (GST_IS_SCHEDULER (sched));
-
   GST_INFO (GST_CAT_SCHEDULING, "setting element \"%s\" sched to %p", GST_ELEMENT_NAME (element),
            sched);
 
@@ -249,7 +241,34 @@ gst_bin_set_element_sched (GstElement *element, GstScheduler *sched)
   }
   /* otherwise, if it's just a regular old element */
   else {
+    GList *pads;
+         
     gst_scheduler_add_element (sched, element);
+             
+    /* set the sched pointer in all the pads */
+    pads = element->pads;
+    while (pads) {
+      GstPad *pad;
+
+      pad = GST_PAD (pads->data);
+      pads = g_list_next (pads);
+                                                                         
+      /* we only operate on real pads */
+      if (!GST_IS_REAL_PAD (pad))
+        continue;
+
+      /* if the peer element exists and is a candidate */
+      if (GST_PAD_PEER (pad)) {
+        if (gst_pad_get_scheduler (GST_PAD_PEER (pad)) == sched) {
+          GST_INFO (GST_CAT_SCHEDULING, "peer is in same scheduler, telling scheduler");
+                       
+          if (GST_PAD_IS_SRC (pad))
+            gst_scheduler_pad_connect (sched, pad, GST_PAD_PEER (pad));
+          else
+            gst_scheduler_pad_connect (sched, GST_PAD_PEER (pad), pad);
+        }
+      }
+    }
   }
 }
 
@@ -260,9 +279,6 @@ gst_bin_unset_element_sched (GstElement *element, GstScheduler *sched)
   GList *children;
   GstElement *child;
 
-  g_return_if_fail (element != NULL);
-  g_return_if_fail (GST_IS_ELEMENT (element));
-
   if (GST_ELEMENT_SCHED (element) == NULL) {
     GST_INFO (GST_CAT_SCHEDULING, "element \"%s\" has no scheduler",
              GST_ELEMENT_NAME (element));
@@ -296,6 +312,32 @@ gst_bin_unset_element_sched (GstElement *element, GstScheduler *sched)
   }
   /* otherwise, if it's just a regular old element */
   else {
+    GList *pads;
+
+    /* set the sched pointer in all the pads */
+    pads = element->pads;
+    while (pads) {
+      GstPad *pad;
+
+      pad = GST_PAD (pads->data);
+      pads = g_list_next (pads);
+
+      /* we only operate on real pads */
+      if (!GST_IS_REAL_PAD (pad))
+        continue;
+
+      /* if the peer element exists and is a candidate */
+      if (GST_PAD_PEER (pad)) {
+        if (gst_pad_get_scheduler (GST_PAD_PEER (pad)) == sched) {
+          GST_INFO (GST_CAT_SCHEDULING, "peer is in same scheduler, telling scheduler");
+
+          if (GST_PAD_IS_SRC (pad))
+            gst_scheduler_pad_disconnect (sched, pad, GST_PAD_PEER (pad));
+          else
+            gst_scheduler_pad_disconnect (sched, GST_PAD_PEER (pad), pad);
+        }
+      }
+    }
     gst_scheduler_remove_element (GST_ELEMENT_SCHED (element), element);
   }
 }
@@ -489,6 +531,11 @@ gst_bin_child_state_change (GstBin *bin, GstElementState oldstate, GstElementSta
        GST_STATE_PENDING (bin) = state;
         GST_UNLOCK (bin);
        gst_bin_change_state_norecurse (bin);
+       if (state != GST_STATE (bin)) {
+          g_warning ("%s: state change in cllback %d %d", 
+                         GST_ELEMENT_NAME (bin),
+                         state, GST_STATE (bin));
+       }
        return;
       }
       break;
index 140010bfaee869aadd9da664742c2ad82d68732d..bb99bea9ef118b7377b3a3521c510e7ce787b985 100644 (file)
@@ -74,9 +74,6 @@ struct _GstBin {
   gint                  numchildren;
   GList        *children;
 
-  GMutex        *iterate_mutex;
-  GCond         *iterate_cond;
-
   GstElementState child_states[GST_NUM_STATES];
 
   gpointer      sched_private;
@@ -90,12 +87,16 @@ struct _GstBin {
 struct _GstBinClass {
   GstElementClass parent_class;
 
+  /* vtable */
+  void         (*add_element)          (GstBin *bin, GstElement);
+  void         (*remove_element)       (GstBin *bin, GstElement);
+  /* run a full iteration of operation */
+  gboolean     (*iterate)              (GstBin *bin);
+
   /* signals */
   void         (*object_added)         (GstObject *object, GstObject *child);
   void         (*object_removed)       (GstObject *object, GstObject *child);
 
-  /* run a full iteration of operation */
-  gboolean     (*iterate)              (GstBin *bin);
 };
 
 GType          gst_bin_get_type                (void);
index 386b5d6d7d823abba72f64828e2bb5beb6014ec4..bbb809847fb36a0b515a2179befe80855fb89860 100644 (file)
 #include "gstlog.h"
 #include "gstmemchunk.h"
 
+enum {
+  ARG_0,
+  ARG_STATS,
+};
+
 #define CLASS(clock)  GST_CLOCK_CLASS (G_OBJECT_GET_CLASS (clock))
 
 static GstMemChunk *_gst_clock_entries_chunk;
 
 static void            gst_clock_class_init            (GstClockClass *klass);
 static void            gst_clock_init                  (GstClock *clock);
+static void             gst_clock_set_property         (GObject *object, guint prop_id, 
+                                                        const GValue *value, GParamSpec *pspec);
+static void             gst_clock_get_property         (GObject *object, guint prop_id, 
+                                                        GValue *value, GParamSpec * pspec);
+static void            gst_clock_update_stats          (GstClock *clock);
 
 
 static GstObjectClass *parent_class = NULL;
 /* static guint gst_clock_signals[LAST_SIGNAL] = { 0 }; */
 
-typedef struct _GstClockEntry GstClockEntry;
+static GMutex *_gst_clock_mutex;
+static GCond  *_gst_clock_cond;
 
-static void            gst_clock_free_entry            (GstClock *clock, GstClockEntry *entry);
+static inline GstClockID
+gst_clock_entry_new (GstClock *clock, GstClockTime time, 
+                    GstClockTime interval, GstClockEntryType type)
+{
+  GstClockEntry *entry;
 
-typedef enum {
-  GST_ENTRY_OK,
-  GST_ENTRY_RESTART,
-} GstEntryStatus;
+  entry = gst_mem_chunk_alloc (_gst_clock_entries_chunk);
 
-struct _GstClockEntry {
-  GstClockTime                  time;
-  GstEntryStatus        status;
-  GstClockCallback      func;
-  gpointer              user_data;
-};
+  entry->clock = clock;
+  entry->time = time;
+  entry->interval = time;
+  entry->type = type;
+  entry->status = GST_CLOCK_ENTRY_OK;
 
-#define GST_CLOCK_ENTRY(entry)          ((GstClockEntry *)(entry))
-#define GST_CLOCK_ENTRY_TIME(entry)     (((GstClockEntry *)(entry))->time)
+  return (GstClockID) entry;
+}
 
-static GstClockEntry*
-gst_clock_entry_new (GstClockTime time,
-                    GstClockCallback func, gpointer user_data)
+/**
+ * gst_clock_new_single_shot_id
+ * @clock: The clockid to get a single shot notification from
+ * @time: the requested time
+ *
+ * Get an ID from the given clock to trigger a single shot 
+ * notification at the requested time.
+ *
+ * Returns: An id that can be used to request the time notification.
+ */
+GstClockID
+gst_clock_new_single_shot_id (GstClock *clock, GstClockTime time)
+{
+  return gst_clock_entry_new (clock, 
+                             time, 
+                             GST_CLOCK_TIME_NONE, 
+                             GST_CLOCK_ENTRY_SINGLE);
+}
+
+/**
+ * gst_clock_new_periodic__id
+ * @clock: The clockid to get a periodic notification id from
+ * @start_time: the requested start time
+ * @interval: the requested interval
+ *
+ * Get an ID from the given clock to trigger a periodic notification.
+ * The periodeic notifications will be start at time start_time and
+ * will then be fired with the given interval.
+ *
+ * Returns: An id that can be used to request the time notification.
+ */
+GstClockID
+gst_clock_new_periodic_id (GstClock *clock, GstClockTime start_time,
+                           GstClockTime interval)
+{
+  return gst_clock_entry_new (clock, 
+                             start_time, 
+                             interval, 
+                             GST_CLOCK_ENTRY_PERIODIC);
+}
+
+/**
+ * gst_clock_id_get_time
+ * @id: The clockid to query
+ *
+ * Get the time of the clock ID
+ *
+ * Returns: the time of the given clock id
+ */
+GstClockTime
+gst_clock_id_get_time (GstClockID id)
+{
+  g_return_val_if_fail (id != NULL, GST_CLOCK_TIME_NONE);
+
+  return GST_CLOCK_ENTRY_TIME ((GstClockEntry *)id);
+}
+
+
+/**
+ * gst_clock_id_wait
+ * @id: The clockid to wait on
+ * @jitter: A pointer that will contain the jitter
+ *
+ * Perform a blocking wait on the given ID. The jitter arg can be
+ * NULL
+ *
+ * Returns: the result of the blocking wait.
+ */
+GstClockReturn
+gst_clock_id_wait (GstClockID id, GstClockTimeDiff *jitter)
 {
   GstClockEntry *entry;
+  GstClock *clock;
+  GstClockReturn res = GST_CLOCK_UNSUPPORTED;
+  GstClockTime requested;
+  
+  g_return_val_if_fail (id != NULL, GST_CLOCK_ERROR);
 
-  entry = gst_mem_chunk_alloc (_gst_clock_entries_chunk);
+  entry = (GstClockEntry *) id;
+  clock = GST_CLOCK_ENTRY_CLOCK (entry);
+  requested = GST_CLOCK_ENTRY_TIME (entry);
+  
+  if (CLASS (clock)->wait) {
+    GstClockTime now;
 
-  entry->time = time;
-  entry->func = func;
-  entry->user_data = user_data;
+    do {
+      res = CLASS (clock)->wait (clock, entry);
+    }
+    while (res == GST_CLOCK_ENTRY_RESTART);
+
+    if (jitter) {
+      now = gst_clock_get_time (clock);
+      *jitter = now - requested;
+    }
+
+    if (clock->stats) {
+      gst_clock_update_stats (clock);
+    }
+  }
+
+  if (entry->type == GST_CLOCK_ENTRY_SINGLE) {
+    gst_clock_id_free (id);
+  }
+
+  return res;
+}
+
+/**
+ * gst_clock_wait_async
+ * @clock: a #GstClock to wait on
+ * @time: The #GstClockTime to wait for
+ * @func: The callback function 
+ * @user_data: User data passed in the calback
+ *
+ * Register a callback on the given clock that will be triggered 
+ * when the clock has reached the given time. A ClockID is returned
+ * that can be used to cancel the request.
+ *
+ * Returns: the clock id or NULL when async notification is not supported.
+ */
+GstClockReturn
+gst_clock_id_wait_async (GstClockID id,
+                        GstClockCallback func, gpointer user_data)
+{
+  GstClockEntry *entry;
+  GstClock *clock;
+  GstClockReturn res = GST_CLOCK_UNSUPPORTED;
+  
+  g_return_val_if_fail (id != NULL, GST_CLOCK_ERROR);
+
+  entry = (GstClockEntry *) id;
+  clock = entry->clock;
+
+  if (CLASS (clock)->wait_async) {
+    res = CLASS (clock)->wait_async (clock, entry, func, user_data);
+  }
+
+  return res;
+}
+
+/**
+ * gst_clock_remove_id
+ * @clock: The clock to cancel the request on
+ * @id: The id to cancel
+ *
+ * Cancel an outstanding async notification request with the given ID.
+ * This can be an ID generated with gst_clock_wait_async() or 
+ * gst_clock_notify_async().
+ */
+void
+gst_clock_id_unschedule (GstClockID id)
+{
+  GstClockEntry *entry;
+  GstClock *clock;
+  
+  g_return_if_fail (id != NULL);
+
+  entry = (GstClockEntry *) id;
+  clock = entry->clock;
 
-  return entry;
+  if (CLASS (clock)->unschedule)
+    CLASS (clock)->unschedule (clock, entry);
 }
 
-/*
-static gint
-clock_compare_func (gconstpointer a,
-                    gconstpointer b)
+/**
+ * gst_clock_id_free
+ * @id: The clockid to free
+ *
+ * Free the resources held by the given id
+ */
+void
+gst_clock_id_free (GstClockID id)
 {
-  GstClockEntry *entry1 = (GstClockEntry *)a;
-  GstClockEntry *entry2 = (GstClockEntry *)b;
+  gst_mem_chunk_free (_gst_clock_entries_chunk, id);
+}
 
-  return (entry1->time - entry2->time);
+/**
+ * gst_clock_unlock_id
+ * @id: The clockid to unlock
+ *
+ * Unlock the givan ClockID.
+ */
+void
+gst_clock_id_unlock (GstClockID id)
+{
+  GstClockEntry *entry;
+  GstClock *clock;
+  
+  g_return_if_fail (id != NULL);
+
+  entry = (GstClockEntry *) id;
+  clock = entry->clock;
+
+  if (CLASS (clock)->unlock)
+    CLASS (clock)->unlock (clock, entry);
 }
-*/
 
+
+/**
+ * GstClock abstract base class implementation
+ */
 GType
 gst_clock_get_type (void)
 {
@@ -127,6 +312,16 @@ gst_clock_class_init (GstClockClass *klass)
   _gst_clock_entries_chunk = gst_mem_chunk_new ("GstClockEntries",
                      sizeof (GstClockEntry), sizeof (GstClockEntry) * 32,
                      G_ALLOC_AND_FREE);
+
+  _gst_clock_mutex = g_mutex_new ();
+  _gst_clock_cond  = g_cond_new ();
+
+  gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_clock_set_property);
+  gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_clock_get_property);
+
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_STATS,
+    g_param_spec_boolean ("stats", "Stats", "Enable clock stats",
+                          FALSE, G_PARAM_READWRITE));
 }
 
 static void
@@ -137,28 +332,13 @@ gst_clock_init (GstClock *clock)
   clock->start_time = 0;
   clock->last_time = 0;
   clock->entries = NULL;
-  clock->async_supported = FALSE;
+  clock->flags = 0;
+  clock->stats = FALSE;
 
   clock->active_mutex = g_mutex_new ();
   clock->active_cond = g_cond_new ();
 }
 
-/**
- * gst_clock_async_supported
- * @clock: a #GstClock to query
- *
- * Checks if this clock can support asynchronous notification.
- *
- * Returns: TRUE if async notification is supported.
- */
-gboolean
-gst_clock_async_supported (GstClock *clock)
-{
-  g_return_val_if_fail (GST_IS_CLOCK (clock), FALSE);
-
-  return clock->async_supported;
-}
-
 /**
  * gst_clock_set_speed
  * @clock: a #GstClock to modify
@@ -166,13 +346,18 @@ gst_clock_async_supported (GstClock *clock)
  *
  * Sets the speed on the given clock. 1.0 is the default 
  * speed.
+ *
+ * Returns: the new speed of the clock.
  */
-void
+gdouble
 gst_clock_set_speed (GstClock *clock, gdouble speed)
 {
-  g_return_if_fail (GST_IS_CLOCK (clock));
+  g_return_val_if_fail (GST_IS_CLOCK (clock), 0.0);
+
+  if (CLASS (clock)->change_speed)
+    clock->speed = CLASS (clock)->change_speed (clock, clock->speed, speed);
 
-  clock->speed = speed;
+  return clock->speed;
 }
 
 /**
@@ -191,29 +376,43 @@ gst_clock_get_speed (GstClock *clock)
   return clock->speed;
 }
 
-
 /**
- * gst_clock_reset
- * @clock: a #GstClock to reset
+ * gst_clock_set_resolution
+ * @clock: The clock set the resolution on
+ * @resolution: The resolution to set
  *
- * Reset the clock to time 0.
+ * Set the accuracy of the clock.
+ *
+ * Returns: the new resolution of the clock.
  */
-void
-gst_clock_reset (GstClock *clock)
+guint64
+gst_clock_set_resolution (GstClock *clock, guint64 resolution)
 {
-  GstClockTime time = 0LL;
+  g_return_val_if_fail (GST_IS_CLOCK (clock), 0LL);
 
-  g_return_if_fail (GST_IS_CLOCK (clock));
+  if (CLASS (clock)->change_resolution)
+    clock->resolution = CLASS (clock)->change_resolution (clock, clock->resolution, resolution);
 
-  if (CLASS (clock)->get_internal_time) {
-    time = CLASS (clock)->get_internal_time (clock);
-  }
+  return clock->resolution;
+}
 
-  GST_LOCK (clock);
-  clock->active = FALSE;
-  clock->start_time = time;
-  clock->last_time = 0LL;
-  GST_UNLOCK (clock);
+/**
+ * gst_clock_get_resolution
+ * @clock: The clock get the resolution of
+ *
+ * Get the accuracy of the clock.
+ *
+ * Returns: the resolution of the clock in microseconds.
+ */
+guint64
+gst_clock_get_resolution (GstClock *clock)
+{
+  g_return_val_if_fail (GST_IS_CLOCK (clock), 0LL);
+
+  if (CLASS (clock)->get_resolution)
+    return CLASS (clock)->get_resolution (clock);
+
+  return 1LL;
 }
 
 /**
@@ -269,6 +468,30 @@ gst_clock_is_active (GstClock *clock)
   return clock->active;
 }
 
+/**
+ * gst_clock_reset
+ * @clock: a #GstClock to reset
+ *
+ * Reset the clock to time 0.
+ */
+void
+gst_clock_reset (GstClock *clock)
+{
+  GstClockTime time = 0LL;
+
+  g_return_if_fail (GST_IS_CLOCK (clock));
+
+  if (CLASS (clock)->get_internal_time) {
+    time = CLASS (clock)->get_internal_time (clock);
+  }
+
+  GST_LOCK (clock);
+  clock->active = FALSE;
+  clock->start_time = time;
+  clock->last_time = 0LL;
+  GST_UNLOCK (clock);
+}
+
 /**
  * gst_clock_handle_discont
  * @clock: a #GstClock to notify of the discontinuity
@@ -349,181 +572,6 @@ gst_clock_get_time (GstClock *clock)
   return ret;
 }
 
-static GstClockID
-gst_clock_wait_async_func (GstClock *clock, GstClockTime time,
-                          GstClockCallback func, gpointer user_data)
-{
-  GstClockEntry *entry = NULL;
-  g_return_val_if_fail (GST_IS_CLOCK (clock), NULL);
-
-  if (!clock->active) {
-    GST_DEBUG (GST_CAT_CLOCK, "blocking on clock");
-    g_mutex_lock (clock->active_mutex);        
-    g_cond_wait (clock->active_cond, clock->active_mutex);     
-    g_mutex_unlock (clock->active_mutex);      
-  }
-
-  entry = gst_clock_entry_new (time, func, user_data);
-
-  return entry;
-}
-
-/**
- * gst_clock_wait
- * @clock: a #GstClock to wait on
- * @time: The #GstClockTime to wait for
- * @jitter: The jitter 
- *
- * Wait and block till the clock reaches the specified time.
- * The jitter value contains the difference between the requested time and
- * the actual time, negative values indicate that the requested time
- * was allready passed when this call was made.
- *
- * Returns: the #GstClockReturn result of the operation.
- */
-GstClockReturn
-gst_clock_wait (GstClock *clock, GstClockTime time, GstClockTimeDiff *jitter)
-{
-  GstClockID id;
-  GstClockReturn res;
-  
-  g_return_val_if_fail (GST_IS_CLOCK (clock), GST_CLOCK_STOPPED);
-
-  id = gst_clock_wait_async_func (clock, time, NULL, NULL);
-  res = gst_clock_wait_id (clock, id, jitter);
-
-  return res;
-}
-
-/**
- * gst_clock_wait_async
- * @clock: a #GstClock to wait on
- * @time: The #GstClockTime to wait for
- * @func: The callback function 
- * @user_data: User data passed in the calback
- *
- * Register a callback on the given clock that will be triggered 
- * when the clock has reached the given time. A ClockID is returned
- * that can be used to cancel the request.
- *
- * Returns: the clock id or NULL when async notification is not supported.
- */
-GstClockID
-gst_clock_wait_async (GstClock *clock, GstClockTime time,
-                     GstClockCallback func, gpointer user_data)
-{
-  g_return_val_if_fail (GST_IS_CLOCK (clock), NULL);
-
-  if (clock->async_supported) {
-    return gst_clock_wait_async_func (clock, time, func, user_data);
-  }
-  return NULL;
-}
-
-/**
- * gst_clock_cancel_wait_async
- * @clock: The clock to cancel the request on
- * @id: The id to cancel
- *
- * Cancel an outstanding async notification request with the given ID.
- */
-void
-gst_clock_cancel_wait_async (GstClock *clock, GstClockID id)
-{
-  g_warning ("not supported");
-}
-
-/**
- * gst_clock_notify_async
- * @clock: The clock to wait on
- * @interval: The interval between notifications
- * @func: The callback function 
- * @user_data: User data passed in the calback
- *
- * Register a callback on the given clock that will be periodically
- * triggered with the specified interval. A ClockID is returned
- * that can be used to cancel the request.
- *
- * Returns: the clock id or NULL when async notification is not supported.
- */
-GstClockID
-gst_clock_notify_async (GstClock *clock, GstClockTime interval,
-                       GstClockCallback func, gpointer user_data) 
-{
-  g_warning ("not supported");
-  return NULL;
-}
-
-/**
- * gst_clock_remove_notify_async
- * @clock: The clock to cancel the request on
- * @id: The id to cancel
- *
- * Cancel an outstanding async notification request with the given ID.
- */
-void
-gst_clock_remove_notify_async (GstClock *clock, GstClockID id)
-{
-  g_warning ("not supported");
-}
-
-static void
-gst_clock_unlock_func (GstClock *clock, GstClockTime time, GstClockID id, gpointer user_data)
-{
-}
-
-/**
- * gst_clock_wait_id
- * @clock: The clock to wait on
- * @id: The clock id to wait on
- * @jitter: The jitter 
- *
- * Wait and block on the clockid obtained with gst_clock_wait_async.
- * The jitter value is described in gst_clock_wait().
- *
- * Returns: result of the operation.
- */
-GstClockReturn
-gst_clock_wait_id (GstClock *clock, GstClockID id, GstClockTimeDiff *jitter)
-{
-  GstClockReturn res = GST_CLOCK_TIMEOUT;
-  GstClockEntry *entry = (GstClockEntry *) id;
-  GstClockTime current, target;
-  GstClockTimeDiff this_jitter;
-  
-  g_return_val_if_fail (GST_IS_CLOCK (clock), GST_CLOCK_ERROR);
-  g_return_val_if_fail (entry, GST_CLOCK_ERROR);
-
-  current = gst_clock_get_time (clock);
-
-  entry->func = gst_clock_unlock_func;
-  target = GST_CLOCK_ENTRY_TIME (entry) - current;
-
-  GST_DEBUG (GST_CAT_CLOCK, "real_target %llu,  target %llu, now %llu", 
-                 target, GST_CLOCK_ENTRY_TIME (entry), current); 
-  
-  if (((gint64)target) > 0) {
-    struct timeval tv;
-
-    GST_TIME_TO_TIMEVAL (target, tv);
-    select (0, NULL, NULL, NULL, &tv);
-
-    current = gst_clock_get_time (clock);
-    this_jitter = current - GST_CLOCK_ENTRY_TIME (entry);
-  }
-  else {
-    res = GST_CLOCK_EARLY;
-    this_jitter = target;
-  }
-
-  if (jitter)
-    *jitter = this_jitter;
-
-  gst_clock_free_entry (clock, entry);
-
-  return res;
-}
-
 /**
  * gst_clock_get_next_id
  * @clock: The clock to query
@@ -545,76 +593,43 @@ gst_clock_get_next_id (GstClock *clock)
   return (GstClockID *) entry;
 }
 
-/**
- * gst_clock_id_get_time
- * @id: The clockid to query
- *
- * Get the time of the clock ID
- *
- * Returns: the time of the given clock id
- */
-GstClockTime
-gst_clock_id_get_time (GstClockID id)
-{
-  return GST_CLOCK_ENTRY_TIME (id);
-}
-
 static void
-gst_clock_free_entry (GstClock *clock, GstClockEntry *entry)
-{
-  gst_mem_chunk_free (_gst_clock_entries_chunk, entry);
-}
-
-/**
- * gst_clock_unlock_id
- * @clock: The clock that own the id
- * @id: The clockid to unlock
- *
- * Unlock the ClockID.
- */
-void
-gst_clock_unlock_id (GstClock *clock, GstClockID id)
+gst_clock_update_stats (GstClock *clock)
 {
-  GstClockEntry *entry = (GstClockEntry *) id;
-
-  if (entry->func)
-    entry->func (clock, gst_clock_get_time (clock), id, entry->user_data);
-
-  gst_clock_free_entry (clock, entry);
 }
 
-/**
- * gst_clock_set_resolution
- * @clock: The clock set the resolution on
- * @resolution: The resolution to set
- *
- * Set the accuracy of the clock.
- */
-void
-gst_clock_set_resolution (GstClock *clock, guint64 resolution)
+static void
+gst_clock_set_property (GObject *object, guint prop_id,
+                       const GValue *value, GParamSpec *pspec)
 {
-  g_return_if_fail (GST_IS_CLOCK (clock));
-
-  if (CLASS (clock)->set_resolution)
-    CLASS (clock)->set_resolution (clock, resolution);
+  GstClock *clock;
+            
+  clock = GST_CLOCK (object);
+
+  switch (prop_id) {
+    case ARG_STATS:
+      clock->stats = g_value_get_boolean (value);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+   }
 }
 
-/**
- * gst_clock_get_resolution
- * @clock: The clock get the resolution of
- *
- * Get the accuracy of the clock.
- *
- * Returns: the resolution of the clock in microseconds.
- */
-guint64
-gst_clock_get_resolution (GstClock *clock)
+static void
+gst_clock_get_property (GObject *object, guint prop_id, 
+                       GValue *value, GParamSpec * pspec)
 {
-  g_return_val_if_fail (GST_IS_CLOCK (clock), 0LL);
-
-  if (CLASS (clock)->get_resolution)
-    return CLASS (clock)->get_resolution (clock);
-
-  return 1LL;
+  GstClock *clock;
+            
+  clock = GST_CLOCK (object);
+
+  switch (prop_id) {
+    case ARG_STATS:
+      g_value_set_boolean (value, clock->stats);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+   }
 }
-
index 6b811cb354b50128291ca778f942d0277e2a2772..75ad1e62dc814f65201f60a60d8e4c63deeca443 100644 (file)
@@ -20,7 +20,6 @@
  * Boston, MA 02111-1307, USA.
  */
 
-
 #ifndef __GST_CLOCK_H__
 #define __GST_CLOCK_H__
 
@@ -45,88 +44,156 @@ typedef gpointer   GstClockID;
 
 #define GST_CLOCK_TIME_NONE  ((guint64)-1)
 
-#define GST_SECOND  ((guint64)G_USEC_PER_SEC * 1000LL)
-#define GST_MSECOND ((guint64)GST_SECOND/1000LL)
-#define GST_USECOND ((guint64)GST_SECOND/1000000LL)
-#define GST_NSECOND ((guint64)GST_SECOND/1000000000LL)
+#define GST_SECOND  ((guint64) G_USEC_PER_SEC * 1000LL)
+#define GST_MSECOND ((guint64) GST_SECOND / 1000LL)
+#define GST_USECOND ((guint64) GST_SECOND / 1000000LL)
+#define GST_NSECOND ((guint64) GST_SECOND / 1000000000LL)
 
-#define GST_CLOCK_DIFF(s, e)           (GstClockTimeDiff)((s)-(e))
+#define GST_CLOCK_DIFF(s, e)           (GstClockTimeDiff)((s) - (e))
 #define GST_TIMEVAL_TO_TIME(tv)                ((tv).tv_sec * GST_SECOND + (tv).tv_usec * GST_USECOND)
 #define GST_TIME_TO_TIMEVAL(t,tv)                      \
 G_STMT_START {                                                 \
-  (tv).tv_sec  = (t) / GST_SECOND;                     \
+  (tv).tv_sec  =  (t) / GST_SECOND;                    \
   (tv).tv_usec = ((t) / GST_USECOND) % GST_MSECOND;    \
 } G_STMT_END
 
+typedef struct _GstClockEntry  GstClockEntry;
 typedef struct _GstClock       GstClock;
 typedef struct _GstClockClass  GstClockClass;
 
-typedef void (*GstClockCallback) (GstClock *clock, GstClockTime time, GstClockID id, gpointer user_data);
+typedef gboolean (*GstClockCallback) (GstClock *clock, GstClockTime time, GstClockID id, gpointer user_data);
+
+typedef enum {
+  /*< protected >*/
+  GST_CLOCK_ENTRY_OK,
+  GST_CLOCK_ENTRY_EARLY,
+  GST_CLOCK_ENTRY_RESTART,
+} GstClockEntryStatus;
+
+typedef enum {
+  /*< protected >*/
+  GST_CLOCK_ENTRY_SINGLE,
+  GST_CLOCK_ENTRY_PERIODIC,
+} GstClockEntryType;
+
+#define GST_CLOCK_ENTRY(entry)         ((GstClockEntry *)(entry))
+#define GST_CLOCK_ENTRY_CLOCK(entry)   ((entry)->clock)
+#define GST_CLOCK_ENTRY_TYPE(entry)    ((entry)->type)
+#define GST_CLOCK_ENTRY_TIME(entry)    ((entry)->time)
+#define GST_CLOCK_ENTRY_INTERVAL(entry)        ((entry)->interval)
+#define GST_CLOCK_ENTRY_STATUS(entry)  ((entry)->status)
+
+struct _GstClockEntry {
+  /*< protected >*/
+  GstClock             *clock;
+  GstClockEntryType     type;
+  GstClockTime                  time;
+  GstClockTime                  interval;
+  GstClockEntryStatus   status;
+  GstClockCallback      func;
+  gpointer              user_data;
+};
 
 typedef enum
 {
   GST_CLOCK_STOPPED    = 0,
   GST_CLOCK_TIMEOUT    = 1,
   GST_CLOCK_EARLY      = 2,
-  GST_CLOCK_ERROR      = 3
+  GST_CLOCK_ERROR      = 3,
+  GST_CLOCK_UNSUPPORTED        = 4
 } GstClockReturn;
 
+typedef enum
+{
+  GST_CLOCK_FLAG_CAN_DO_SINGLE_SYNC     = (1 << 1),
+  GST_CLOCK_FLAG_CAN_DO_SINGLE_ASYNC    = (1 << 2),
+  GST_CLOCK_FLAG_CAN_DO_PERIODIC_SYNC   = (1 << 3),
+  GST_CLOCK_FLAG_CAN_DO_PERIODIC_ASYNC  = (1 << 4),
+  GST_CLOCK_FLAG_CAN_SET_RESOLUTION     = (1 << 5),
+  GST_CLOCK_FLAG_CAN_SET_SPEED          = (1 << 6),
+} GstClockFlags;
+
+#define GST_CLOCK_FLAGS(clock)  (GST_CLOCK(clock)->flags)
+
 struct _GstClock {
   GstObject     object;
 
+  GstClockFlags         flags;
+
+  /*< protected >*/
   GstClockTime  start_time;
   GstClockTime  last_time;
+
+  /*< private >*/
   gboolean      accept_discont;
   gdouble       speed;
+  guint64       resolution;
   gboolean      active;
   GList                *entries;
-  gboolean      async_supported;
-
   GMutex       *active_mutex;
   GCond                *active_cond;
+  gboolean      stats;
 };
 
 struct _GstClockClass {
   GstObjectClass        parent_class;
 
   /* vtable */
+  gdouble               (*change_speed)         (GstClock *clock,
+                                                gdouble oldspeed, gdouble newspeed);
+  gdouble               (*get_speed)            (GstClock *clock);
+  guint64               (*change_resolution)    (GstClock *clock, guint64 old_resolution,
+                                                guint64 new_resolution);
+  guint64               (*get_resolution)       (GstClock *clock);
+
   GstClockTime                 (*get_internal_time)    (GstClock *clock);
 
-  void                         (*set_resolution)       (GstClock *clock, guint64 resolution);
-  guint64              (*get_resolution)       (GstClock *clock);
+  /* waiting on an ID */
+  GstClockEntryStatus   (*wait)                        (GstClock *clock, GstClockEntry *entry);
+  GstClockEntryStatus   (*wait_async)           (GstClock *clock, GstClockEntry *entry,
+                                                GstClockCallback func, gpointer user_data);
+  void                  (*unschedule)          (GstClock *clock, GstClockEntry *entry);
+  void                  (*unlock)              (GstClock *clock, GstClockEntry *entry);
 
   /* signals */
+  void                  (*object_sync)          (GstClock *clock, GstObject *object, 
+                                                GstClockID id);
 };
 
 GType                  gst_clock_get_type              (void);
 
-void                   gst_clock_set_speed             (GstClock *clock, gdouble speed);
+gdouble                        gst_clock_set_speed             (GstClock *clock, gdouble speed);
 gdouble                gst_clock_get_speed             (GstClock *clock);
 
+guint64                        gst_clock_set_resolution        (GstClock *clock, guint64 resolution);
+guint64                        gst_clock_get_resolution        (GstClock *clock);
+
 void                   gst_clock_set_active            (GstClock *clock, gboolean active);
 gboolean               gst_clock_is_active             (GstClock *clock);
 void                   gst_clock_reset                 (GstClock *clock);
 gboolean               gst_clock_handle_discont        (GstClock *clock, guint64 time);
-gboolean               gst_clock_async_supported       (GstClock *clock);
 
 GstClockTime           gst_clock_get_time              (GstClock *clock);
 
-GstClockReturn         gst_clock_wait                  (GstClock *clock, GstClockTime time, GstClockTimeDiff *jitter);
-GstClockID             gst_clock_wait_async            (GstClock *clock, GstClockTime time, 
-                                                        GstClockCallback func, gpointer user_data);
-void                   gst_clock_cancel_wait_async     (GstClock *clock, GstClockID id);
-GstClockID             gst_clock_notify_async          (GstClock *clock, GstClockTime interval, 
-                                                        GstClockCallback func, gpointer user_data);
-void                   gst_clock_remove_notify_async   (GstClock *clock, GstClockID id);
-GstClockReturn         gst_clock_wait_id               (GstClock *clock, GstClockID id, GstClockTimeDiff *jitter);
-
 GstClockID             gst_clock_get_next_id           (GstClock *clock);
-void                   gst_clock_unlock_id             (GstClock *clock, GstClockID id);
 
-GstClockTime           gst_clock_id_get_time           (GstClockID id);
+/* creating IDs that can be used to get notifications */
+GstClockID             gst_clock_new_single_shot_id    (GstClock *clock, 
+                                                        GstClockTime time); 
+GstClockID             gst_clock_new_periodic_id       (GstClock *clock, 
+                                                        GstClockTime start_time,
+                                                        GstClockTime interval); 
 
-void                   gst_clock_set_resolution        (GstClock *clock, guint64 resolution);
-guint64                        gst_clock_get_resolution        (GstClock *clock);
+/* operations on IDs */
+GstClockTime           gst_clock_id_get_time           (GstClockID id);
+GstClockReturn         gst_clock_id_wait               (GstClockID id, 
+                                                        GstClockTimeDiff *jitter);
+GstClockReturn         gst_clock_id_wait_async         (GstClockID id, 
+                                                        GstClockCallback func, 
+                                                        gpointer user_data);
+void                   gst_clock_id_unschedule         (GstClockID id);
+void                   gst_clock_id_unlock             (GstClockID id);
+void                   gst_clock_id_free               (GstClockID id);
 
 G_END_DECLS
 
index 24b2d6faf56d434a017d15e8bbdf0ddcc0c48659..e27c4155a717038f0bd5b8fd2399c8f6005b7ada 100644 (file)
@@ -566,6 +566,7 @@ gst_scheduler_set_clock (GstScheduler *sched, GstClock *clock)
 
     GST_DEBUG (GST_CAT_CLOCK, "scheduler setting clock %p (%s) on element %s", clock, 
                (clock ? GST_OBJECT_NAME (clock) : "nil"), GST_ELEMENT_NAME (element));
+    
     gst_element_set_clock (element, clock);
     receivers = g_list_next (receivers);
   }
@@ -618,7 +619,11 @@ gst_scheduler_clock_wait (GstScheduler *sched, GstElement *element, GstClock *cl
   if (CLASS (sched)->clock_wait)
     return CLASS (sched)->clock_wait (sched, element, clock, time, jitter);
   else
-    return gst_clock_wait (clock, time, jitter);
+  {
+    GstClockID id = gst_clock_new_single_shot_id (clock, time);
+
+    return gst_clock_id_wait (id, jitter);
+  }
 
   return GST_CLOCK_TIMEOUT;
 }
index 8ea28b0cad3c61c29a4502f3e73cfd0885401730..b7b0fa789a872adeff6d0f4a4ac7be73e64757db 100644 (file)
 
 static GstClock *_the_system_clock = NULL;
 
-static void            gst_system_clock_class_init     (GstSystemClockClass *klass);
-static void            gst_system_clock_init           (GstSystemClock *clock);
+static void                    gst_system_clock_class_init     (GstSystemClockClass *klass);
+static void                    gst_system_clock_init           (GstSystemClock *clock);
 
-static GstClockTime    gst_system_clock_get_internal_time      (GstClock *clock);
-static guint64         gst_system_clock_get_resolution (GstClock *clock);
+static GstClockTime            gst_system_clock_get_internal_time      (GstClock *clock);
+static guint64                 gst_system_clock_get_resolution (GstClock *clock);
+static GstClockEntryStatus     gst_system_clock_wait           (GstClock *clock, GstClockEntry *entry);
 
+static GCond   *_gst_sysclock_cond = NULL;
+static GMutex  *_gst_sysclock_mutex = NULL;
 
 static GstClockClass *parent_class = NULL;
 /* static guint gst_system_clock_signals[LAST_SIGNAL] = { 0 }; */
@@ -78,8 +81,12 @@ gst_system_clock_class_init (GstSystemClockClass *klass)
 
   parent_class = g_type_class_ref (GST_TYPE_CLOCK);
 
-  gstclock_class->get_internal_time =  gst_system_clock_get_internal_time;
-  gstclock_class->get_resolution =     gst_system_clock_get_resolution;
+  gstclock_class->get_internal_time    = gst_system_clock_get_internal_time;
+  gstclock_class->get_resolution       = gst_system_clock_get_resolution;
+  gstclock_class->wait                 = gst_system_clock_wait;
+
+  _gst_sysclock_cond  = g_cond_new ();
+  _gst_sysclock_mutex = g_mutex_new ();
 }
 
 static void
@@ -99,6 +106,7 @@ gst_system_clock_obtain (void)
 {
   if (_the_system_clock == NULL) {
     _the_system_clock = GST_CLOCK (g_object_new (GST_TYPE_SYSTEM_CLOCK, NULL));
+    
     gst_object_set_name (GST_OBJECT (_the_system_clock), "GstSystemClock");
   }
   return _the_system_clock;
@@ -120,4 +128,30 @@ gst_system_clock_get_resolution (GstClock *clock)
   return 1 * GST_USECOND;
 }
 
+static GstClockEntryStatus     
+gst_system_clock_wait (GstClock *clock, GstClockEntry *entry)
+{
+  GstClockEntryStatus res = GST_CLOCK_ENTRY_OK;
+  GstClockTime current, target;
+         
+  current = gst_clock_get_time (clock);
+  target = gst_system_clock_get_internal_time (clock) +
+           GST_CLOCK_ENTRY_TIME (entry) - current;
+                  
+  GST_DEBUG (GST_CAT_CLOCK, "real_target %llu,  target %llu, now %llu",
+                      target, GST_CLOCK_ENTRY_TIME (entry), current);
+                      
+  if (((gint64)target) > 0) {
+    GTimeVal tv;
+
+    GST_TIME_TO_TIMEVAL (target, tv);
+    g_mutex_lock (_gst_sysclock_mutex);
+    g_cond_timed_wait (_gst_sysclock_cond, _gst_sysclock_mutex, &tv);
+    g_mutex_unlock (_gst_sysclock_mutex);
+  }
+  else {
+    res = GST_CLOCK_ENTRY_EARLY;
+  }
+  return res;
+}
 
index e96ac3fb7234b2fb0a13f3894853ad4b158608d0..984c99db207ac203004faf534fc3d3321b07bd9a 100644 (file)
@@ -5,7 +5,8 @@ plugin_LTLIBRARIES = \
        libgstbasicwingoscheduler.la \
        libgstfastomegascheduler.la \
        libgstfastwingoscheduler.la \
-       libgstoptomegascheduler.la
+       libgstoptomegascheduler.la \
+       libgstoptwingoscheduler.la
 
 libgstbasicomegascheduler_la_SOURCES = gstbasicscheduler.c 
 libgstbasicomegascheduler_la_CFLAGS = $(GST_CFLAGS) -D_COTHREADS_OMEGA
@@ -40,6 +41,13 @@ libgstoptomegascheduler_la_CFLAGS = $(GST_CFLAGS) -D_COTHREADS_OMEGA
 libgstoptomegascheduler_la_LIBADD = ../libcothreads.la
 libgstoptomegascheduler_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
 
+libgstoptwingoscheduler_la_SOURCES = gstoptimalscheduler.c 
+libgstoptwingoscheduler_la_CFLAGS = $(GST_CFLAGS) -D_COTHREADS_WINGO
+libgstoptwingoscheduler_la_CFLAGS += -I$(top_builddir)/libs/ext/cothreads
+libgstoptwingoscheduler_la_CFLAGS += -I$(top_srcdir)/libs/ext/cothreads
+libgstoptwingoscheduler_la_LIBADD = $(top_builddir)/libs/ext/cothreads/cothreads/libcothreads-gthreads.la
+libgstoptwingoscheduler_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
+
 ## this is a REALLY evil hack 
 ## but we need to keep it as long as we have libs/gst and libs/ext
 $(top_builddir)/libs/ext/cothreads/cothreads/libcothreads-gthreads.la:
index 10c7564961260d1d08ae426d2cafb92a5e9d0702..12f2632a830341d177d5f7c6c766a33c4009c6a7 100644 (file)
@@ -1012,9 +1012,6 @@ gst_basic_scheduler_reset (GstScheduler *sched)
 static void
 gst_basic_scheduler_add_element (GstScheduler * sched, GstElement * element)
 {
-  GList *pads;
-  GstPad *pad;
-  GstElement *peerelement;
   GstSchedulerChain *chain;
   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
 
@@ -1032,27 +1029,6 @@ gst_basic_scheduler_add_element (GstScheduler * sched, GstElement * element)
   /* create a chain to hold it, and add */
   chain = gst_basic_scheduler_chain_new (bsched);
   gst_basic_scheduler_chain_add_element (chain, element);
-
-  /* set the sched pointer in all the pads */
-  pads = element->pads;
-  while (pads) {
-    pad = GST_PAD (pads->data);
-    pads = g_list_next (pads);
-
-    /* we only operate on real pads */
-    if (!GST_IS_REAL_PAD (pad))
-      continue;
-
-    /* if the peer element exists and is a candidate */
-    if (GST_PAD_PEER (pad)) {
-      peerelement = GST_PAD_PARENT (GST_PAD_PEER (pad));
-      if (GST_ELEMENT_SCHED (element) == GST_ELEMENT_SCHED (peerelement)) {
-       GST_INFO (GST_CAT_SCHEDULING, "peer is in same scheduler, chaining together");
-       /* make sure that the two elements are in the same chain */
-       gst_basic_scheduler_chain_elements (bsched, element, peerelement);
-      }
-    }
-  }
 }
 
 static void
@@ -1296,7 +1272,11 @@ static GstClockReturn
 gst_basic_scheduler_clock_wait (GstScheduler *sched, GstElement *element,
                                GstClock *clock, GstClockTime time, GstClockTimeDiff *jitter)
 {
-  return gst_clock_wait (clock, time, jitter);
+  GstClockID id;
+  
+  id = gst_clock_new_single_shot_id (clock, time);
+
+  return gst_clock_id_wait (id, jitter);
 }
 
 static GstSchedulerState
index a97e167381efa411a248c402cf07797e725445f4..936775709b3378d9b613bc45b71af6dfad969f14 100644 (file)
@@ -137,8 +137,10 @@ struct _GstOptSchedulerGroup {
   gint                          num_enabled;
   GstElement                   *entry;                 /* the group's entry point */
 
-  cothread                     *cothread;              /* the cothread of this group */
+  GSList                       *providers;             /* other groups that provide data
+                                                          for this group */
 
+  cothread                     *cothread;              /* the cothread of this group */
   GroupScheduleFunction         schedulefunc;
   int                           argc;
   char                        **argv;
@@ -803,11 +805,11 @@ setup_group_scheduler (GstOptScheduler *osched, GstOptSchedulerGroup *group)
   if (osched->use_cothreads) {
     if (!(group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)) {
       do_cothread_create (group->cothread, osched->context,
-                     wrapper, 0, (char **) group);
+                     (cothread_func) wrapper, 0, (char **) group);
     }
     else {
       do_cothread_setfunc (group->cothread, osched->context,
-                     wrapper, 0, (char **) group);
+                     (cothread_func) wrapper, 0, (char **) group);
     }
   }
   else {
@@ -879,6 +881,18 @@ gst_opt_scheduler_state_transition (GstScheduler *sched, GstElement *element, gi
   return res;
 }
 
+static void
+get_group (GstElement *element, GstOptSchedulerGroup **group)
+{
+  GstOptSchedulerCtx *ctx;
+
+  ctx = GST_ELEMENT_SCHED_CONTEXT (element);
+  if (ctx) 
+    *group = ctx->group;
+  else
+    *group = NULL;
+}
+
 /*
  * the idea is to put the two elements into the same group. 
  * - When no element is inside a group, we create a new group and add 
@@ -891,15 +905,10 @@ gst_opt_scheduler_state_transition (GstScheduler *sched, GstElement *element, gi
 static GstOptSchedulerGroup*
 group_elements (GstOptScheduler *osched, GstElement *element1, GstElement *element2)
 {
-  GstOptSchedulerCtx *ctx1, *ctx2;
-  GstOptSchedulerGroup *group1 = NULL, *group2 = NULL, *group = NULL;
+  GstOptSchedulerGroup *group1, *group2, *group = NULL;
   
-  ctx1 = GST_ELEMENT_SCHED_CONTEXT (element1);
-  if (ctx1)
-    group1 = ctx1->group;
-  ctx2 = GST_ELEMENT_SCHED_CONTEXT (element2);
-  if (ctx2)
-    group2 = ctx2->group;
+  get_group (element1, &group1);
+  get_group (element2, &group2);
   
   /* none of the elements is added to a group, create a new group
    * and chain to add the elements to */
@@ -1008,14 +1017,29 @@ gst_opt_scheduler_add_element (GstScheduler *sched, GstElement *element)
 static void
 gst_opt_scheduler_remove_element (GstScheduler *sched, GstElement *element)
 {
-  //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+  GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+  GstOptSchedulerGroup *group;
 
   GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from scheduler", GST_ELEMENT_NAME (element));
 
+  /* decoupled elements are not added to the scheduler lists and should therefor
+   * no be removed */
+  if (GST_ELEMENT_IS_DECOUPLED (element))
+    return;
+
+  /* the element is guaranteed to live in it's own group/chain now */
+  get_group (element, &group);
+  if (group) {
+    if (group->chain) {
+      remove_from_chain (group->chain, group);
+      delete_chain (osched, group->chain);
+    }
+
+    delete_group (group);
+  }
+
   g_free (GST_ELEMENT_SCHED_CONTEXT (element));
   GST_ELEMENT_SCHED_CONTEXT (element) = NULL;
-
-  g_warning ("remove implement me");
 }
 
 static void
@@ -1080,14 +1104,34 @@ gst_opt_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sink
         type = GST_OPT_LOOP_TO_CHAIN;
     }
     else if (element2->loopfunc) {
-      if (GST_RPAD_GETFUNC (srcpad))
+      if (GST_RPAD_GETFUNC (srcpad)) {
         type = GST_OPT_GET_TO_LOOP;
+       /* this could be tricky, the get based source could 
+        * already be part of a loop based group in another pad,
+        * we assert on that for now */
+       if (GST_ELEMENT_SCHED_CONTEXT (element1) &&
+           GST_ELEMENT_SCHED_GROUP (element1) != NULL) 
+       {
+          g_warning ("internal error: cannot schedule get to loop with get in group");
+         return;
+       }
+      }
       else
         type = GST_OPT_CHAIN_TO_LOOP;
     }
     else {
-      if (GST_RPAD_GETFUNC (srcpad) && GST_RPAD_CHAINFUNC (sinkpad))
+      if (GST_RPAD_GETFUNC (srcpad) && GST_RPAD_CHAINFUNC (sinkpad)) {
         type = GST_OPT_GET_TO_CHAIN;
+       /* the get based source could already be part of a loop 
+        * based group in another pad,
+        * we assert on that for now */
+       if (GST_ELEMENT_SCHED_CONTEXT (element1) &&
+           GST_ELEMENT_SCHED_GROUP (element1) != NULL) 
+       {
+          g_warning ("internal error: cannot schedule get to loop with get in group");
+         return;
+       }
+      }
       else 
         type = GST_OPT_CHAIN_TO_CHAIN;
     }
@@ -1182,14 +1226,78 @@ gst_opt_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sink
   }
 }
 
+static gboolean
+element_has_connection_with_group (GstElement *element, GstOptSchedulerGroup *group)
+{
+  gboolean connected = FALSE;
+  const GList *pads;
+
+  /* see if the element has no more connections to the peer group */
+  pads = gst_element_get_pad_list (element);
+  while (pads && !connected) {
+    GstPad *pad = GST_PAD_CAST (pads->data);
+    pads = g_list_next (pads);
+
+    /* we only operate on real pads */
+    if (!GST_IS_REAL_PAD (pad))
+      continue;
+
+    if (GST_PAD_PEER (pad)) {
+    } 
+  }
+  return connected;
+}
+
 static void
 gst_opt_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
 {
   //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+  GstElement *element1, *element2;
+  GstOptSchedulerGroup *group1, *group2;
+  gboolean still_connect;
+
   GST_INFO (GST_CAT_SCHEDULING, "pad disconnect between \"%s:%s\" and \"%s:%s\"", 
                  GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
 
-  g_warning ("pad disconnect, implement me");
+  element1 = GST_PAD_PARENT (srcpad);
+  element2 = GST_PAD_PARENT (sinkpad);
+  
+  get_group (element1, &group1);
+  get_group (element2, &group2);
+
+  /* having no groups is pretty bad, this means that two decoupled 
+   * elements were connected or something */
+  if (!group1 && !group2) {
+    g_warning ("internal error: cannot disconnect pads");
+    return;
+  }
+
+  /* see if the group has to be broken up */
+  if (group1)
+    still_connect = element_has_connection_with_group (element2, group1);
+  else
+    still_connect = element_has_connection_with_group (element1, group2);
+
+  /* if there is still a connection, we don't need to break this group */
+  if (still_connect)
+    return;
+
+  /* if they are equal, they both are non zero */
+  if (group1 == group2) {
+    g_warning ("pad disconnect: implement me");
+  }
+  else if (group1) {
+    g_warning ("pad disconnect: implement me");
+  }
+  else {
+    /* there was no group for element1, see if the element
+     * was an entry point for group2 */
+    if (group2) {
+      if (group2->entry == element1) {
+        group2->entry = NULL;
+      }
+    }
+  }
 }
 
 static GstPad*
@@ -1206,7 +1314,11 @@ static GstClockReturn
 gst_opt_scheduler_clock_wait (GstScheduler *sched, GstElement *element,
                              GstClock *clock, GstClockTime time, GstClockTimeDiff *jitter)
 {
-  return gst_clock_wait (clock, time, jitter);
+  GstClockID id;
+  
+  id = gst_clock_new_single_shot_id (clock, time);
+
+  return gst_clock_id_wait (id, jitter);
 }
 
 /* a scheduler iteration is done by looping and scheduling the active chains */