libs/gst/base/gstcollectpads.*: Clean up the mess that is collectpads, add comments and
authorWim Taymans <wim.taymans@gmail.com>
Tue, 9 May 2006 20:47:23 +0000 (20:47 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Tue, 9 May 2006 20:47:23 +0000 (20:47 +0000)
Original commit message from CVS:
* libs/gst/base/gstcollectpads.c: (gst_collect_pads_init),
(gst_collect_pads_finalize), (gst_collect_pads_add_pad),
(gst_collect_pads_remove_pad), (gst_collect_pads_set_flushing),
(gst_collect_pads_start), (gst_collect_pads_stop),
(gst_collect_pads_peek), (gst_collect_pads_pop),
(gst_collect_pads_available), (gst_collect_pads_read),
(gst_collect_pads_flush), (gst_collect_pads_check_pads),
(gst_collect_pads_is_collected), (gst_collect_pads_event),
(gst_collect_pads_chain):
* libs/gst/base/gstcollectpads.h:
Clean up the mess that is collectpads, add comments and
FIXMEs where needed.
Maintain a separate pad list so we can add pads while
collecting the other ones. For this we need a new separate
lock (see comics).
Fix memory leak in finalize.
Refactor some weird code to set/unset pad flushing flags, mark
with comments.
Don't crash in _available, _read, _flush when we're EOS.
* tests/check/libs/.cvsignore:
Ignore adapter check binary.

ChangeLog
libs/gst/base/gstcollectpads.c
libs/gst/base/gstcollectpads.h
tests/check/libs/.gitignore

index b3c9fc6..7d54641 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,28 @@
+2006-05-09  Wim Taymans  <wim@fluendo.com>
+
+       * libs/gst/base/gstcollectpads.c: (gst_collect_pads_init),
+       (gst_collect_pads_finalize), (gst_collect_pads_add_pad),
+       (gst_collect_pads_remove_pad), (gst_collect_pads_set_flushing),
+       (gst_collect_pads_start), (gst_collect_pads_stop),
+       (gst_collect_pads_peek), (gst_collect_pads_pop),
+       (gst_collect_pads_available), (gst_collect_pads_read),
+       (gst_collect_pads_flush), (gst_collect_pads_check_pads),
+       (gst_collect_pads_is_collected), (gst_collect_pads_event),
+       (gst_collect_pads_chain):
+       * libs/gst/base/gstcollectpads.h:
+       Clean up the mess that is collectpads, add comments and
+       FIXMEs where needed.
+       Maintain a separate pad list so we can add pads while
+       collecting the other ones. For this we need a new separate 
+       lock (see comics).
+       Fix memory leak in finalize.
+       Refactor some weird code to set/unset pad flushing flags, mark
+       with comments.
+       Don't crash in _available, _read, _flush when we're EOS.
+
+       * tests/check/libs/.cvsignore:
+       Ignore adapter check binary.
+
 2006-05-09  Tim-Philipp Müller  <tim at centricular dot net>
 
        * gst/gstindex.c: (gst_index_resolver_get_type):
index 8dde27e..57c8d49 100644 (file)
@@ -107,6 +107,11 @@ gst_collect_pads_init (GstCollectPads * pads, GstCollectPadsClass * g_class)
   pads->queuedpads = 0;
   pads->eospads = 0;
   pads->started = FALSE;
+
+  /* members to manage the pad list */
+  pads->abidata.ABI.pad_lock = g_mutex_new ();
+  pads->abidata.ABI.pad_cookie = 0;
+  pads->abidata.ABI.pad_list = NULL;
 }
 
 static void
@@ -117,20 +122,20 @@ gst_collect_pads_finalize (GObject * object)
 
   gst_collect_pads_stop (pads);
   g_cond_free (pads->cond);
+  g_mutex_free (pads->abidata.ABI.pad_lock);
 
   /* Remove pads */
   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
     GstCollectData *pdata = (GstCollectData *) collected->data;
 
-    if (pdata->pad) {
+    if (pdata->pad)
       gst_object_unref (pdata->pad);
-    }
+
+    g_free (pdata);
   }
   /* Free pads list */
   g_slist_free (pads->data);
 
-  /* FIXME, free data */
-
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
@@ -209,18 +214,20 @@ gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size)
   data->collect = pads;
   data->pad = gst_object_ref (pad);
   data->buffer = NULL;
+  data->pos = 0;
   gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
   data->abidata.ABI.flushing = FALSE;
   data->abidata.ABI.new_segment = FALSE;
+  data->abidata.ABI.eos = FALSE;
 
-  GST_OBJECT_LOCK (pads);
-  pads->data = g_slist_append (pads->data, data);
+  GST_COLLECT_PADS_PAD_LOCK (pads);
+  pads->abidata.ABI.pad_list =
+      g_slist_append (pads->abidata.ABI.pad_list, data);
   gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
   gst_pad_set_element_private (pad, data);
-  pads->numpads++;
-  pads->cookie++;
-  GST_OBJECT_UNLOCK (pads);
+  pads->abidata.ABI.pad_cookie++;
+  GST_COLLECT_PADS_PAD_UNLOCK (pads);
 
   return data;
 }
@@ -254,20 +261,26 @@ gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
   g_return_val_if_fail (pad != NULL, FALSE);
   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
 
-  GST_OBJECT_LOCK (pads);
-  list = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
+  GST_COLLECT_PADS_PAD_LOCK (pads);
+  list =
+      g_slist_find_custom (pads->abidata.ABI.pad_list, pad,
+      (GCompareFunc) find_pad);
   if (list) {
+    pads->abidata.ABI.pad_list =
+        g_slist_delete_link (pads->abidata.ABI.pad_list, list);
+    /* clear the stuff we configured */
+    gst_pad_set_chain_function (pad, NULL);
+    gst_pad_set_event_function (pad, NULL);
+    /* FIXME, check that freeing the private data does not causes
+     * crashes in the streaming thread */
+    gst_pad_set_element_private (pad, NULL);
     g_free (list->data);
-    pads->data = g_slist_delete_link (pads->data, list);
     gst_object_unref (pad);
+    pads->abidata.ABI.pad_cookie++;
   }
-  pads->numpads--;
-  /* FIXME : if the pad has data queued we should decrease the number of
-     queuedpads */
-  pads->cookie++;
-  GST_OBJECT_UNLOCK (pads);
+  GST_COLLECT_PADS_PAD_UNLOCK (pads);
 
-  return list != NULL;
+  return (list != NULL);
 }
 
 /**
@@ -277,6 +290,8 @@ gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
  *
  * Check if a pad is active.
  *
+ * This function is currently not implemented.
+ *
  * Returns: TRUE if the pad is active.
  *
  * MT safe.
@@ -299,8 +314,9 @@ gst_collect_pads_is_active (GstCollectPads * pads, GstPad * pad)
  * @pads: the collectspads to use
  *
  * Collect data on all pads. This function is usually called
- * from a GstTask function in an element. This function is
- * currently not implemented.
+ * from a GstTask function in an element. 
+ *
+ * This function is currently not implemented.
  *
  * Returns: GstFlowReturn of the operation.
  *
@@ -324,8 +340,9 @@ gst_collect_pads_collect (GstCollectPads * pads)
  * @length: the length to collect
  *
  * Collect data with @offset and @length on all pads. This function
- * is typically called in the getrange function of an element. This
- * function is currently not implemented.
+ * is typically called in the getrange function of an element. 
+ *
+ * This function is currently not implemented.
  *
  * Returns: GstFlowReturn of the operation.
  *
@@ -343,6 +360,31 @@ gst_collect_pads_collect_range (GstCollectPads * pads, guint64 offset,
   return GST_FLOW_ERROR;
 }
 
+/* FIXME, I think this function is used to work around bad behaviour
+ * of elements that add pads to themselves without activating them.
+ */
+static void
+gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
+{
+  GSList *walk = NULL;
+
+  GST_COLLECT_PADS_PAD_LOCK (pads);
+  /* Update the pads flushing flag */
+  for (walk = pads->data; walk; walk = g_slist_next (walk)) {
+    GstCollectData *cdata = walk->data;
+
+    if (GST_IS_PAD (cdata->pad)) {
+      GST_OBJECT_LOCK (cdata->pad);
+      if (flushing)
+        GST_PAD_SET_FLUSHING (cdata->pad);
+      else
+        GST_PAD_UNSET_FLUSHING (cdata->pad);
+      GST_OBJECT_UNLOCK (cdata->pad);
+    }
+  }
+  GST_COLLECT_PADS_PAD_UNLOCK (pads);
+}
+
 /**
  * gst_collect_pads_start:
  * @pads: the collectspads to use
@@ -354,27 +396,17 @@ gst_collect_pads_collect_range (GstCollectPads * pads, guint64 offset,
 void
 gst_collect_pads_start (GstCollectPads * pads)
 {
-  GSList *walk = NULL;
-
   g_return_if_fail (pads != NULL);
   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
 
   GST_DEBUG_OBJECT (pads, "starting collect pads");
 
+  /* make sure stop and collect cannot be called anymore */
   GST_OBJECT_LOCK (pads);
-  /* Set our pads as non flushing */
-  walk = pads->data;
-  while (walk) {
-    GstCollectData *cdata = walk->data;
 
-    if (GST_IS_PAD (cdata->pad)) {
-      GST_OBJECT_LOCK (cdata->pad);
-      GST_PAD_UNSET_FLUSHING (cdata->pad);
-      GST_OBJECT_UNLOCK (cdata->pad);
-    }
+  /* make pads streamable */
+  gst_collect_pads_set_flushing (pads, FALSE);
 
-    walk = g_slist_next (walk);
-  }
   /* Start collect pads */
   pads->started = TRUE;
   GST_OBJECT_UNLOCK (pads);
@@ -392,31 +424,22 @@ gst_collect_pads_start (GstCollectPads * pads)
 void
 gst_collect_pads_stop (GstCollectPads * pads)
 {
-  GSList *walk = NULL;
-
   g_return_if_fail (pads != NULL);
   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
 
   GST_DEBUG_OBJECT (pads, "stopping collect pads");
 
+  /* make sure collect and start cannot be called anymore */
   GST_OBJECT_LOCK (pads);
-  /* Set our pads as flushing */
-  walk = pads->data;
-  while (walk) {
-    GstCollectData *cdata = walk->data;
 
-    if (GST_IS_PAD (cdata->pad)) {
-      GST_OBJECT_LOCK (cdata->pad);
-      GST_PAD_SET_FLUSHING (cdata->pad);
-      GST_OBJECT_UNLOCK (cdata->pad);
-    }
+  /* make pads not accept data anymore */
+  gst_collect_pads_set_flushing (pads, TRUE);
 
-    walk = g_slist_next (walk);
-  }
   /* Stop collect pads */
   pads->started = FALSE;
-  /* Wake them up */
+  /* Wake them up so then can end the chain functions. */
   GST_COLLECT_PADS_BROADCAST (pads);
+
   GST_OBJECT_UNLOCK (pads);
 }
 
@@ -443,9 +466,7 @@ gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
   g_return_val_if_fail (data != NULL, NULL);
 
-  result = data->buffer;
-
-  if (result)
+  if ((result = data->buffer))
     gst_buffer_ref (result);
 
   GST_DEBUG ("Peeking at pad %s:%s: buffer=%p",
@@ -478,11 +499,11 @@ gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
   g_return_val_if_fail (data != NULL, NULL);
 
-  result = data->buffer;
-  if (result) {
+  if ((result = data->buffer)) {
     buffer_p = &data->buffer;
     gst_buffer_replace (buffer_p, NULL);
     data->pos = 0;
+    /* one less pad with queued data now */
     pads->queuedpads--;
   }
 
@@ -521,18 +542,30 @@ gst_collect_pads_available (GstCollectPads * pads)
 
   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
     GstCollectData *pdata;
+    GstBuffer *buffer;
     gint size;
 
     pdata = (GstCollectData *) collected->data;
 
-    if (pdata->buffer == NULL)
+    /* ignore pad with EOS */
+    if (pdata->abidata.ABI.eos)
+      continue;
+
+    /* an empty buffer not EOS is weird */
+    if ((buffer = pdata->buffer) == NULL)
       goto not_filled;
 
-    size = GST_BUFFER_SIZE (pdata->buffer) - pdata->pos;
+    /* this is the size left of the buffer */
+    size = GST_BUFFER_SIZE (buffer) - pdata->pos;
 
+    /* need to return the min of all available data */
     if (size < result)
       result = size;
   }
+  /* nothing changed, all must be EOS then, return 0 */
+  if (result == G_MAXUINT)
+    result = 0;
+
   return result;
 
 not_filled:
@@ -565,15 +598,20 @@ gst_collect_pads_read (GstCollectPads * pads, GstCollectData * data,
     guint8 ** bytes, guint size)
 {
   guint readsize;
+  GstBuffer *buffer;
 
   g_return_val_if_fail (pads != NULL, 0);
   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
   g_return_val_if_fail (data != NULL, 0);
   g_return_val_if_fail (bytes != NULL, 0);
 
-  readsize = MIN (size, GST_BUFFER_SIZE (data->buffer) - data->pos);
+  /* no buffer, must be EOS */
+  if ((buffer = data->buffer) == NULL)
+    return 0;
 
-  *bytes = GST_BUFFER_DATA (data->buffer) + data->pos;
+  readsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
+
+  *bytes = GST_BUFFER_DATA (buffer) + data->pos;
 
   return readsize;
 }
@@ -599,18 +637,25 @@ gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
     guint size)
 {
   guint flushsize;
+  GstBuffer *buffer;
 
   g_return_val_if_fail (pads != NULL, 0);
   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
   g_return_val_if_fail (data != NULL, 0);
 
-  flushsize = MIN (size, GST_BUFFER_SIZE (data->buffer) - data->pos);
+  /* no buffer, must be EOS */
+  if ((buffer = data->buffer) == NULL)
+    return 0;
+
+  /* this is what we can flush at max */
+  flushsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
 
   data->pos += size;
 
-  if (data->pos >= GST_BUFFER_SIZE (data->buffer)) {
+  if (data->pos >= GST_BUFFER_SIZE (buffer)) {
     GstBuffer *buf;
 
+    /* _pop will also reset data->pos to 0 */
     buf = gst_collect_pads_pop (pads, data);
     gst_buffer_unref (buf);
   }
@@ -618,6 +663,57 @@ gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
   return flushsize;
 }
 
+/* see if pads were added or removed and update our stats. Any pad
+ * added after releasing the PAD_LOCK will get collected in the next
+ * round.
+ *
+ * We can do a quick check by checking the cookies, that get changed
+ * whenever the pad list is updated.
+ *
+ * Must be called with LOCK.
+ */
+static void
+gst_collect_pads_check_pads (GstCollectPads * pads)
+{
+  /* the master list and cookie are protected with the PAD_LOCK */
+  GST_COLLECT_PADS_PAD_LOCK (pads);
+  if (pads->abidata.ABI.pad_cookie != pads->cookie) {
+    GSList *collected;
+
+    /* clear list and stats */
+    pads->data = NULL;
+    pads->numpads = 0;
+    pads->queuedpads = 0;
+    pads->eospads = 0;
+
+    /* loop over the master pad list */
+    collected = pads->abidata.ABI.pad_list;
+    for (; collected; collected = g_slist_next (collected)) {
+      GstCollectData *data;
+
+      /* update the stats */
+      pads->numpads++;
+      data = collected->data;
+      if (data->buffer)
+        pads->queuedpads++;
+      if (data->abidata.ABI.eos)
+        pads->eospads++;
+
+      /* add to the list of pads to collect */
+      pads->data = g_slist_prepend (pads->data, data);
+    }
+    /* and update the cookie */
+    pads->cookie = pads->abidata.ABI.pad_cookie;
+  }
+  GST_COLLECT_PADS_PAD_UNLOCK (pads);
+}
+
+/* checks if all the pads are collected and call the collectfunction
+ *
+ * Should be called with LOCK.
+ *
+ * Returns: TRUE if the collectfunction was called, FALSE otherwise.
+ */
 static gboolean
 gst_collect_pads_is_collected (GstCollectPads * pads, GstFlowReturn * ret)
 {
@@ -625,8 +721,13 @@ gst_collect_pads_is_collected (GstCollectPads * pads, GstFlowReturn * ret)
   gboolean res = FALSE;
 
   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
+  g_return_val_if_fail (pads->func != NULL, FALSE);
+
+  /* check for new pads, update stats etc.. */
+  gst_collect_pads_check_pads (pads);
 
-  /* If all our pads are EOS just collect once */
+  /* If all our pads are EOS just collect once to let the element
+   * do its final EOS handling. */
   if (pads->eospads == pads->numpads) {
     GST_DEBUG ("All active pads (%d) are EOS, calling %s",
         pads->numpads, GST_DEBUG_FUNCPTR_NAME (pads->func));
@@ -641,7 +742,7 @@ gst_collect_pads_is_collected (GstCollectPads * pads, GstFlowReturn * ret)
      function
      FIXME: Shouldn't we also check gst_pad_is_blocked () somewhere
    */
-  while (((pads->queuedpads + pads->eospads) >= pads->numpads) && pads->func) {
+  while (((pads->queuedpads + pads->eospads) >= pads->numpads)) {
     GST_DEBUG ("All active pads (%d) have data, calling %s",
         pads->numpads, GST_DEBUG_FUNCPTR_NAME (pads->func));
     flow_ret = pads->func (pads, pads->user_data);
@@ -657,9 +758,8 @@ beach:
     GST_DEBUG ("Not all active pads (%d) have data, continuing", pads->numpads);
   }
 
-  if (ret) {
+  if (ret)
     *ret = flow_ret;
-  }
 
   return res;
 }
@@ -667,6 +767,7 @@ beach:
 static gboolean
 gst_collect_pads_event (GstPad * pad, GstEvent * event)
 {
+  gboolean res;
   GstCollectData *data;
   GstCollectPads *pads;
 
@@ -675,6 +776,8 @@ gst_collect_pads_event (GstPad * pad, GstEvent * event)
   if (data == NULL)
     goto not_ours;
 
+  res = TRUE;
+
   pads = data->collect;
 
   GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
@@ -686,15 +789,16 @@ gst_collect_pads_event (GstPad * pad, GstEvent * event)
       /* forward event to unblock is_collected */
       gst_pad_event_default (pad, event);
 
-      /* now unblock the chain function
-         no cond per pad, so they all unblock, non-flushing block again */
+      /* now unblock the chain function.
+       * no cond per pad, so they all unblock, 
+       * non-flushing block again */
       GST_OBJECT_LOCK (pads);
       data->abidata.ABI.flushing = TRUE;
       GST_COLLECT_PADS_BROADCAST (pads);
       GST_OBJECT_UNLOCK (pads);
 
       /* event already cleaned up by forwarding */
-      return TRUE;
+      goto done;
     }
     case GST_EVENT_FLUSH_STOP:
     {
@@ -702,31 +806,43 @@ gst_collect_pads_event (GstPad * pad, GstEvent * event)
       GST_OBJECT_LOCK (pads);
       data->abidata.ABI.flushing = FALSE;
       gst_collect_pads_pop (pads, data);
+      /* if the pad was EOS, remove the EOS flag and
+       * decrement the number of eospads */
+      if (data->abidata.ABI.eos == TRUE) {
+        pads->eospads--;
+        data->abidata.ABI.eos = FALSE;
+      }
       GST_OBJECT_UNLOCK (pads);
-      goto beach;
+
+      /* forward event */
+      goto forward;
     }
     case GST_EVENT_EOS:
     {
       GST_OBJECT_LOCK (pads);
-
-      pads->eospads++;
-
+      /* if the pad was not EOS, make it EOS and so we
+       * have one more eospad */
+      if (data->abidata.ABI.eos == FALSE) {
+        data->abidata.ABI.eos = TRUE;
+        pads->eospads++;
+      }
+      /* check if we need collecting anything */
       gst_collect_pads_is_collected (pads, NULL);
-
       GST_OBJECT_UNLOCK (pads);
 
-      /* We eat this event */
+      /* We eat this event, element should do something
+       * in the collected callback. */
       gst_event_unref (event);
-      return TRUE;
+      goto done;
     }
     case GST_EVENT_NEWSEGMENT:
     {
       gint64 start, stop, time;
-      gdouble rate;
+      gdouble rate, arate;
       GstFormat format;
       gboolean update;
 
-      gst_event_parse_new_segment (event, &update, &rate, &format,
+      gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
           &start, &stop, &time);
 
       GST_DEBUG_OBJECT (data->pad, "got newsegment, start %" GST_TIME_FORMAT
@@ -736,12 +852,13 @@ gst_collect_pads_event (GstPad * pad, GstEvent * event)
       if (data->segment.format != format)
         gst_segment_init (&data->segment, format);
 
-      gst_segment_set_newsegment (&data->segment, update, rate, format,
-          start, stop, time);
+      gst_segment_set_newsegment_full (&data->segment, update, rate, arate,
+          format, start, stop, time);
 
       data->abidata.ABI.new_segment = TRUE;
 
-      /* For now we eat this event */
+      /* we must not forward this event since multiple segments will be 
+       * accumulated and this is certainly not what we want. */
       gst_event_unref (event);
       /* FIXME: collect-pads based elements need to create their own newsegment
          event (and only one really)
@@ -753,14 +870,18 @@ gst_collect_pads_event (GstPad * pad, GstEvent * event)
          (that's what avimux does for something IIRC)
          see #340060
        */
-      return TRUE;
+      goto done;
     }
     default:
-      goto beach;
+      /* forward other events */
+      goto forward;
   }
 
-beach:
-  return gst_pad_event_default (pad, event);
+forward:
+  res = gst_pad_event_default (pad, event);
+
+done:
+  return res;
 
   /* ERRORS */
 not_ours:
@@ -771,10 +892,11 @@ not_ours:
 }
 
 /* For each buffer we receive we check if our collected condition is reached
-   and if so we call the collected function. When this is done we check if
-   data has been unqueued. If data is still queued we wait holding the stream
-   lock to make sure no EOS event can happen while we are ready to be
-   collected */
+ * and if so we call the collected function. When this is done we check if
+ * data has been unqueued. If data is still queued we wait holding the stream
+ * lock to make sure no EOS event can happen while we are ready to be
+ * collected 
+ */
 static GstFlowReturn
 gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
 {
@@ -811,13 +933,16 @@ gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
   buffer_p = &data->buffer;
   gst_buffer_replace (buffer_p, buffer);
 
-  if (data->segment.format == GST_FORMAT_TIME
-      && GST_BUFFER_TIMESTAMP_IS_VALID (buffer))
-    gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME,
-        GST_BUFFER_TIMESTAMP (buffer));
+  /* update segment last position if in TIME */
+  if (data->segment.format == GST_FORMAT_TIME) {
+    GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buffer);
+
+    if (GST_CLOCK_TIME_IS_VALID (timestamp))
+      gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME, timestamp);
+  }
 
   /* Check if our collected condition is matched and call the collected function
-     if it is */
+   * if it is */
   gst_collect_pads_is_collected (pads, &ret);
 
   /* We still have data queued on this pad, wait for something to happen */
@@ -832,7 +957,6 @@ gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
     if (data->abidata.ABI.flushing)
       goto flushing;
   }
-
   GST_OBJECT_UNLOCK (pads);
 
   return ret;
index 5b2771b..bdb4bef 100644 (file)
@@ -49,6 +49,7 @@ typedef struct _GstCollectPadsClass GstCollectPadsClass;
  */
 struct _GstCollectData
 {
+  /* with LOCK of @collect */
   GstCollectPads       *collect;
   GstPad               *pad;
   GstBuffer            *buffer;
@@ -60,6 +61,7 @@ struct _GstCollectData
     struct {
       gboolean           flushing;
       gboolean           new_segment;
+      gboolean           eos;
     } ABI;
     /* adding + 0 to mark ABI change to be undone later */
     gpointer _gst_reserved[GST_PADDING + 0];
@@ -77,6 +79,10 @@ struct _GstCollectData
  */
 typedef GstFlowReturn (*GstCollectPadsFunction) (GstCollectPads *pads, gpointer user_data);
 
+#define GST_COLLECT_PADS_GET_PAD_LOCK(pads) (((GstCollectPads *)pads)->abidata.ABI.pad_lock)
+#define GST_COLLECT_PADS_PAD_LOCK(pads)     (g_mutex_lock(GST_COLLECT_PADS_GET_PAD_LOCK (pads)))
+#define GST_COLLECT_PADS_PAD_UNLOCK(pads)   (g_mutex_unlock(GST_COLLECT_PADS_GET_PAD_LOCK (pads)))
+
 #define GST_COLLECT_PADS_GET_COND(pads) (((GstCollectPads *)pads)->cond)
 #define GST_COLLECT_PADS_WAIT(pads)     (g_cond_wait (GST_COLLECT_PADS_GET_COND (pads), GST_OBJECT_GET_LOCK (pads)))
 #define GST_COLLECT_PADS_SIGNAL(pads)   (g_cond_signal (GST_COLLECT_PADS_GET_COND (pads)))
@@ -96,21 +102,31 @@ struct _GstCollectPads {
   GSList       *data;                  /* list of CollectData items */
 
   /*< private >*/
-  guint32       cookie;
+  guint32       cookie;                /* @data list cookie */
 
+  /* with LOCK */
   GCond                *cond;                  /* to signal removal of data */
 
   GstCollectPadsFunction func;         /* function and user_data for callback */
   gpointer      user_data;
 
-  guint                 numpads;               /* number of pads */
+  guint                 numpads;               /* number of pads in @data */
   guint                 queuedpads;            /* number of pads with a buffer */
   guint                 eospads;               /* number of pads that are EOS */
 
   gboolean      started;
 
   /*< private >*/
-  gpointer       _gst_reserved[GST_PADDING];
+  union {
+    struct {
+      /* since 0.10.6 */ /* with PAD_LOCK */
+      GMutex    *pad_lock;             /* used to serialize add/remove */
+      GSList    *pad_list;              /* updated pad list */
+      guint32   pad_cookie;            /* updated cookie */
+    } ABI;
+    /* adding + 0 to mark ABI change to be undone later */
+    gpointer _gst_reserved[GST_PADDING + 0];
+  } abidata;
 };
 
 struct _GstCollectPadsClass {
index 39ba3ab..f9ed20d 100644 (file)
@@ -1,4 +1,5 @@
 .dirstamp
+adapter
 gdp
 controller
 gstnetclientclock