gst/adder/gstadder.*: Updated some docs. Added comments and FIXMEs all over the place.
authorWim Taymans <wim.taymans@gmail.com>
Wed, 10 May 2006 11:54:36 +0000 (11:54 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Wed, 10 May 2006 11:54:36 +0000 (11:54 +0000)
Original commit message from CVS:
* gst/adder/gstadder.c: (gst_adder_setcaps),
(gst_adder_query_duration), (gst_adder_query), (forward_event),
(gst_adder_src_event), (gst_adder_sink_event),
(gst_adder_class_init), (gst_adder_finalize),
(gst_adder_request_new_pad), (gst_adder_collected):
* gst/adder/gstadder.h:
Updated some docs. Added comments and FIXMEs all over the place.
Improve debugging info.
Fix leak on finalize by not calling the parent.
Implement duration query.
Make event forwarding threadsafe.
Correctly send NEWSEGMENT at start and after flush.
Handle EOS correctly.
Post error when not negotiated.
* tests/check/elements/adder.c: (GST_START_TEST):
Added FIXME in the test.

ChangeLog
gst/adder/gstadder.c
gst/adder/gstadder.h
tests/check/elements/adder.c

index 890999c..f42129a 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,23 @@
+2006-05-10  Wim Taymans  <wim@fluendo.com>
+
+       * gst/adder/gstadder.c: (gst_adder_setcaps),
+       (gst_adder_query_duration), (gst_adder_query), (forward_event),
+       (gst_adder_src_event), (gst_adder_sink_event),
+       (gst_adder_class_init), (gst_adder_finalize),
+       (gst_adder_request_new_pad), (gst_adder_collected):
+       * gst/adder/gstadder.h:
+       Updated some docs. Added comments and FIXMEs all over the place.
+       Improve debugging info.
+       Fix leak on finalize by not calling the parent.
+       Implement duration query.
+       Make event forwarding threadsafe.
+       Correctly send NEWSEGMENT at start and after flush.
+       Handle EOS correctly.
+       Post error when not negotiated.
+
+       * tests/check/elements/adder.c: (GST_START_TEST):
+       Added FIXME in the test.
+
 2006-05-09  Tim-Philipp Müller  <tim at centricular dot net>
 
        * ext/pango/gsttextoverlay.c: (gst_text_overlay_valign_get_type),
index 6eddc27..354e7fa 100644 (file)
@@ -1,7 +1,7 @@
 /* GStreamer
  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
  *                    2001 Thomas <thomas@apestaart.org>
- *                    2005 Wim Taymans <wim@fluendo.com>
+ *               2005,2006 Wim Taymans <wim@fluendo.com>
  *
  * adder.c: Adder element, N in, one out, samples are added
  *
  * </programlisting>
  * This pipeline produces two sine waves mixed together.
  * </para>
+ * <para>
+ * The Adder currently mixes all data received on the sinkpads as soon as possible
+ * without trying to synchronize the streams.
+ * </para>
  * </refsect2>
+ *
+ * Last reviewed on 2006-05-09 (0.10.7)
  */
 /* Element-Checklist-Version: 5 */
 
@@ -86,11 +92,12 @@ static GstStaticPadTemplate gst_adder_sink_template =
 
 static void gst_adder_class_init (GstAdderClass * klass);
 static void gst_adder_init (GstAdder * adder);
-static void gst_adder_dispose (GObject * object);
+static void gst_adder_finalize (GObject * object);
 
 static gboolean gst_adder_setcaps (GstPad * pad, GstCaps * caps);
 static gboolean gst_adder_query (GstPad * pad, GstQuery * query);
 static gboolean gst_adder_src_event (GstPad * pad, GstEvent * event);
+static gboolean gst_adder_sink_event (GstPad * pad, GstEvent * event);
 
 static GstPad *gst_adder_request_new_pad (GstElement * element,
     GstPadTemplate * temp, const gchar * unused);
@@ -151,7 +158,8 @@ gst_adder_setcaps (GstPad * pad, GstCaps * caps)
 
   adder = GST_ADDER (GST_PAD_PARENT (pad));
 
-  /* see if the other pads can accept the format */
+  /* FIXME, see if the other pads can accept the format. Also lock the
+   * format on the other pads to this new format. */
   GST_OBJECT_LOCK (adder);
   pads = GST_ELEMENT (adder)->pads;
   while (pads) {
@@ -215,15 +223,73 @@ gst_adder_setcaps (GstPad * pad, GstCaps * caps)
 
   gst_structure_get_int (structure, "channels", &adder->channels);
   gst_structure_get_int (structure, "rate", &adder->rate);
+  /* precalc bps */
+  adder->bps = (adder->width / 8) * adder->channels;
 
   return TRUE;
 
 not_supported:
   {
+    GST_DEBUG_OBJECT (adder, "unsupported format set as caps");
     return FALSE;
   }
 }
 
+/* FIXME, the duration query should reflect how long you will produce
+ * data, that is the amount of stream time until you will emit EOS. 
+ * For synchronized mixing this
+ * is always the max of all the durations of upstream since we emit
+ * EOS when all of them finished.
+ * We don't do synchronized mixing so this really depends on where the
+ * streams where punched in and what their relative offsets are against
+ * eachother which we can get from the first timestamps we see.
+ * When we add a new stream (or remove a stream) the duration might
+ * also become invalid again and we need to post a new DURATION
+ * message to ntify this fact to the parent.
+ * For now we take the max of all the upstream elements so the simple
+ * cases work at least somewhat. */
+static gboolean
+gst_adder_query_duration (GstAdder * adder, GstQuery * query)
+{
+  GList *pads;
+  gint64 max;
+  gboolean res;
+  GstFormat format;
+
+  max = -1;
+  res = TRUE;
+
+  /* parse format */
+  gst_query_parse_duration (query, &format, NULL);
+
+  GST_OBJECT_LOCK (adder);
+  pads = GST_ELEMENT_CAST (adder)->sinkpads;
+  for (; pads; pads = g_list_next (pads)) {
+    GstPad *pad = GST_PAD_CAST (pads->data);
+    gint64 duration;
+
+    /* ask sink peer for duration */
+    res &= gst_pad_query_peer_duration (pad, &format, &duration);
+    /* take max from all valid return values */
+    if (res) {
+      /* valid unknown length, stop searching */
+      if (duration == -1) {
+        max = duration;
+        break;
+      }
+      /* else see if bigger than current max */
+      else if (duration > max)
+        max = duration;
+    }
+  }
+  GST_OBJECT_UNLOCK (adder);
+
+  /* and store the max */
+  gst_query_set_duration (query, format, max);
+
+  return res;
+}
+
 static gboolean
 gst_adder_query (GstPad * pad, GstQuery * query)
 {
@@ -239,11 +305,12 @@ gst_adder_query (GstPad * pad, GstQuery * query)
 
       switch (format) {
         case GST_FORMAT_TIME:
-          gst_query_set_position (query, GST_FORMAT_TIME, adder->timestamp);
+          /* FIXME, bring to stream time, might be tricky */
+          gst_query_set_position (query, format, adder->timestamp);
           res = TRUE;
           break;
         case GST_FORMAT_DEFAULT:
-          gst_query_set_position (query, GST_FORMAT_DEFAULT, adder->offset);
+          gst_query_set_position (query, format, adder->offset);
           res = TRUE;
           break;
         default:
@@ -251,12 +318,12 @@ gst_adder_query (GstPad * pad, GstQuery * query)
       }
       break;
     }
-      /* FIXME: what to do about the length? query all pads upstream and
-       * pick the longest length? or the shortest length? or what? */
     case GST_QUERY_DURATION:
-      res = FALSE;
+      res = gst_adder_query_duration (adder, query);
       break;
     default:
+      /* FIXME, needs a custom query handler because we have multiple
+       * sinkpads */
       res = gst_pad_query_default (pad, query);
       break;
   }
@@ -265,33 +332,110 @@ gst_adder_query (GstPad * pad, GstQuery * query)
   return res;
 }
 
+/* forwards the event to all sinkpads, takes ownership of the 
+ * event
+ *
+ * Returns: TRUE if the event could be forwarded on all
+ * sinkpads.
+ */
 static gboolean
-gst_adder_src_event (GstPad * pad, GstEvent * event)
+forward_event (GstAdder * adder, GstEvent * event)
 {
-  GstAdder *adder = GST_ADDER (gst_pad_get_parent (pad));
-  GSList *node;
-  GstCollectData *data;
-  gboolean result = TRUE;
+  gboolean ret;
+  GList *pads;
 
-  GST_LOG_OBJECT (pad, "Sending event %p (%s)", event,
+  GST_LOG_OBJECT (adder, "Forwarding event %p (%s)", event,
       GST_EVENT_TYPE_NAME (event));
 
-  for (node = adder->collect->data; (node && result);
-      node = g_slist_next (node)) {
-    data = (GstCollectData *) node->data;
+  ret = TRUE;
+
+  GST_OBJECT_LOCK (adder);
+  pads = GST_ELEMENT_CAST (adder)->sinkpads;
+  for (; pads; pads = g_list_next (pads)) {
+    GstPad *pad = GST_PAD_CAST (pads->data);
 
-    GST_LOG_OBJECT (pad, "  to %s:%s", GST_DEBUG_PAD_NAME (data->pad));
     gst_event_ref (event);
-    result &= gst_pad_push_event (data->pad, event);
-    if (!result) {
-      GST_WARNING ("Sending event  %p (%s) to %s:%s failed.",
-          event, GST_EVENT_TYPE_NAME (event), GST_DEBUG_PAD_NAME (data->pad));
+    ret &= gst_pad_push_event (pad, event);
+
+    if (!ret) {
+      GST_WARNING_OBJECT (pad, "Sending event  %p (%s) failed.",
+          event, GST_EVENT_TYPE_NAME (event));
+      break;
+    } else {
+      GST_LOG_OBJECT (pad, "Sent event  %p (%s).",
+          event, GST_EVENT_TYPE_NAME (event));
     }
   }
+  GST_OBJECT_UNLOCK (adder);
+
+  gst_event_unref (event);
+
+  return ret;
+}
+
+static gboolean
+gst_adder_src_event (GstPad * pad, GstEvent * event)
+{
+  GstAdder *adder;
+  gboolean result;
+
+  adder = GST_ADDER (gst_pad_get_parent (pad));
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_QOS:
+      /* QoS might be tricky */
+      result = FALSE;
+      break;
+    case GST_EVENT_SEEK:
+      /* FIXME seek needs something smarter. */
+      result = forward_event (adder, event);
+      break;
+    case GST_EVENT_NAVIGATION:
+      /* navigation is rather pointless. */
+      result = FALSE;
+      break;
+    default:
+      /* just forward the rest for now */
+      result = forward_event (adder, event);
+      break;
+  }
   gst_object_unref (adder);
+
   return result;
 }
 
+static gboolean
+gst_adder_sink_event (GstPad * pad, GstEvent * event)
+{
+  GstAdder *adder;
+  gboolean ret;
+
+  adder = GST_ADDER (gst_pad_get_parent (pad));
+
+  GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
+      GST_DEBUG_PAD_NAME (pad));
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_FLUSH_STOP:
+      /* mark a pending new segment. This event is synchronized
+       * with the streaming thread so we can safely update the 
+       * variable without races. It's somewhat weird because we
+       * assume the collectpads forwarded the FLUSH_STOP past us
+       * and downstream (using our source pad, the bastard!).
+       */
+      adder->segment_pending = TRUE;
+      break;
+    default:
+      break;
+  }
+
+  /* now GstCollectPads can take care of the rest, e.g. EOS */
+  ret = adder->collect_event (pad, event);
+
+  gst_object_unref (adder);
+  return ret;
+}
+
 static void
 gst_adder_class_init (GstAdderClass * klass)
 {
@@ -300,7 +444,7 @@ gst_adder_class_init (GstAdderClass * klass)
 
   gobject_class = (GObjectClass *) klass;
 
-  gobject_class->dispose = gst_adder_dispose;
+  gobject_class->finalize = gst_adder_finalize;
 
   gstelement_class = (GstElementClass *) klass;
 
@@ -345,12 +489,14 @@ gst_adder_init (GstAdder * adder)
 }
 
 static void
-gst_adder_dispose (GObject * object)
+gst_adder_finalize (GObject * object)
 {
   GstAdder *adder = GST_ADDER (object);
 
   gst_object_unref (adder->collect);
   adder->collect = NULL;
+
+  G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
 static GstPad *
@@ -376,6 +522,12 @@ gst_adder_request_new_pad (GstElement * element, GstPadTemplate * templ,
   gst_pad_set_setcaps_function (newpad, GST_DEBUG_FUNCPTR (gst_adder_setcaps));
   gst_collect_pads_add_pad (adder->collect, newpad, sizeof (GstCollectData));
 
+  /* FIXME: hacked way to override/extend the event function of
+   * GstCollectPads; because it sets its own event function giving the
+   * element no access to events */
+  adder->collect_event = (GstPadEventFunction) GST_PAD_EVENTFUNC (newpad);
+  gst_pad_set_event_function (newpad, GST_DEBUG_FUNCPTR (gst_adder_sink_event));
+
   if (!gst_element_add_pad (GST_ELEMENT (adder), newpad))
     goto could_not_add;
 
@@ -419,19 +571,20 @@ gst_adder_collected (GstCollectPads * pads, gpointer user_data)
 
   adder = GST_ADDER (user_data);
 
-  /* get available bytes for reading */
-  size = gst_collect_pads_available (pads);
-  if (size == 0)
-    return GST_FLOW_OK;
+  /* this is fatal */
+  if (G_UNLIKELY (adder->func == NULL))
+    goto not_negotiated;
 
   outbuf = NULL;
   outbytes = NULL;
 
-  if (adder->func == NULL)
-    goto not_negotiated;
+  /* get available bytes for reading, this can be 0 which could mean
+   * empty buffers or EOS, which we will catch when we loop over the
+   * pads. */
+  size = gst_collect_pads_available (pads);
 
   GST_LOG_OBJECT (adder,
-      "starting to cycle through channels, collecting %d bytes", size);
+      "starting to cycle through channels, %d bytes available", size);
 
   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
     GstCollectData *data;
@@ -440,77 +593,107 @@ gst_adder_collected (GstCollectPads * pads, gpointer user_data)
 
     data = (GstCollectData *) collected->data;
 
-    GST_LOG_OBJECT (adder, "looking into channel %p", data);
-
     /* get pointer to copy size bytes */
     len = gst_collect_pads_read (pads, data, &bytes, size);
-    if (len == 0)
-      continue;
-
-    GST_LOG_OBJECT (adder, " copying %d bytes (format %d,%d)",
-        len, adder->format, adder->width);
-    GST_LOG_OBJECT (adder, " from channel %p from input data %p", data, bytes);
+    /* length 0 means EOS or an empty buffer so we still need to flush in
+     * case of an empty buffer. */
+    if (len == 0) {
+      GST_LOG_OBJECT (adder, "channel %p: no bytes available", data);
+      goto next;
+    }
 
     if (outbuf == NULL) {
-      /* first buffer, alloc size bytes */
+      GST_LOG_OBJECT (adder, "channel %p: making output buffer of %d bytes",
+          data, size);
+
+      /* first buffer, alloc size bytes. FIXME, we can easily subbuffer
+       * and _make_writable. */
       outbuf = gst_buffer_new_and_alloc (size);
-      gst_buffer_set_caps (outbuf, GST_PAD_CAPS (adder->srcpad));
       outbytes = GST_BUFFER_DATA (outbuf);
+      gst_buffer_set_caps (outbuf, GST_PAD_CAPS (adder->srcpad));
+
+      /* clear if we are only going to fill a partial buffer */
+      if (G_UNLIKELY (size > len))
+        memset (outbytes, 0, size);
 
-      memset (outbytes, 0, size);
+      GST_LOG_OBJECT (adder, "channel %p: copying %d bytes from data %p",
+          data, len, bytes);
 
       /* and copy the data into it */
       memcpy (outbytes, bytes, len);
     } else {
+      GST_LOG_OBJECT (adder, "channel %p: mixing %d bytes from data %p",
+          data, len, bytes);
       /* other buffers, need to add them */
       adder->func ((gpointer) outbytes, (gpointer) bytes, len);
     }
+  next:
     gst_collect_pads_flush (pads, data, len);
   }
 
-  /* we always timestamp in stream time */
+  /* can only happen when no pads to collect or all EOS */
+  if (outbuf == NULL)
+    goto eos;
+
+  /* our timestamping is very simple, just an ever incrementing 
+   * counter, the new segment time will take care of their respective
+   * stream time. */
   if (adder->segment_pending) {
     GstEvent *event;
 
+    /* FIXME, use rate/applied_rate as set on all sinkpads.
+     * We could potentially figure out the duration as well using
+     * the current segment positions and the stated stop positions.
+     * Also we just start from stream time 0 which is rather
+     * weird. For non-synchronized mixing, the time should be
+     * the min of the stream times of all received segments,
+     * rationale being that the duration is at least going to
+     * be as long as the earliest stream we start mixing. This
+     * would also be correct for synchronized mixing but then
+     * the later streams would be delayed until the stream times
+     * match.
+     */
     event = gst_event_new_new_segment_full (FALSE, 1.0,
-        1.0, GST_FORMAT_TIME, adder->timestamp, -1, adder->timestamp);
+        1.0, GST_FORMAT_TIME, adder->timestamp, -1, 0);
 
     gst_pad_push_event (adder->srcpad, event);
     adder->segment_pending = FALSE;
   }
 
   /* set timestamps on the output buffer */
-  {
-    guint64 samples;
+  GST_BUFFER_TIMESTAMP (outbuf) = adder->timestamp;
+  GST_BUFFER_OFFSET (outbuf) = adder->offset;
 
-    GST_BUFFER_TIMESTAMP (outbuf) = adder->timestamp;
-    GST_BUFFER_OFFSET (outbuf) = adder->offset;
+  /* for the next timestamp, use the sample counter, which will
+   * never accumulate rounding errors */
+  adder->offset += size / adder->bps;
+  adder->timestamp = gst_util_uint64_scale_int (adder->offset,
+      GST_SECOND, adder->rate);
 
-    /* get next timestamp */
-    /* width is in bits and we need bytes */
-    samples = size / ((adder->width / 8) * adder->channels);
-
-    adder->offset += samples;
-    adder->timestamp = gst_util_uint64_scale_int (adder->offset,
-        GST_SECOND, adder->rate);
-
-    /* now we can set the duration */
-    GST_BUFFER_DURATION (outbuf) = adder->timestamp -
-        GST_BUFFER_TIMESTAMP (outbuf);
-  }
+  /* now we can set the duration of the buffer */
+  GST_BUFFER_DURATION (outbuf) = adder->timestamp -
+      GST_BUFFER_TIMESTAMP (outbuf);
 
   /* send it out */
-  GST_LOG_OBJECT (adder, "pushing outbuf");
+  GST_LOG_OBJECT (adder, "pushing outbuf, timestamp %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)));
   ret = gst_pad_push (adder->srcpad, outbuf);
-  GST_LOG_OBJECT (adder, "pushed outbuf");
 
   return ret;
 
   /* ERRORS */
 not_negotiated:
   {
+    GST_ELEMENT_ERROR (adder, STREAM, FORMAT, (NULL),
+        ("Unknown data received, not negotiated"));
     return GST_FLOW_NOT_NEGOTIATED;
   }
+eos:
+  {
+    GST_DEBUG_OBJECT (adder, "no data available, must be EOS");
+    gst_pad_push_event (adder->srcpad, gst_event_new_eos ());
+    return GST_FLOW_UNEXPECTED;
+  }
 }
 
 static GstStateChangeReturn
index 02b7092..9b1aa82 100644 (file)
@@ -65,6 +65,9 @@ struct _GstAdder {
   gint            depth;
   gboolean        is_signed;
 
+  /* number of bytes per sample, actually width/8 * channels */
+  gint            bps; 
+
   /* function to add samples */
   GstAdderFunction func;
 
@@ -73,6 +76,7 @@ struct _GstAdder {
   gint64          offset;
   
   /* sink event handling */
+  GstPadEventFunction  collect_event;
   GstSegment      segment;
   gboolean        segment_pending;
 };
index 76f3e98..35450c3 100644 (file)
@@ -127,6 +127,9 @@ GST_START_TEST (test_event)
   bus = gst_element_get_bus (bin);
   gst_bus_add_signal_watch_full (bus, G_PRIORITY_HIGH);
 
+  /* FIXME, fakesrc with default setting will produce 0 sized
+   * buffers and incompatible caps for adder that will make
+   * adder EOS and error out */
   src1 = gst_element_factory_make ("fakesrc", "src1");
   //g_object_set (src1, "wave", 4, NULL); /* silence */
   src2 = gst_element_factory_make ("fakesrc", "src2");
@@ -161,6 +164,7 @@ GST_START_TEST (test_event)
   res = gst_element_set_state (bin, GST_STATE_PAUSED);
   fail_unless (res != GST_STATE_CHANGE_FAILURE, NULL);
 
+  /* FIXME, PAUSED is async and seek might not work before being prerolled. */
   res = gst_element_send_event (bin, seek_event);
   fail_unless (res == TRUE, NULL);