pads->queuedpads = 0;
pads->eospads = 0;
pads->started = FALSE;
+
+ /* members to manage the pad list */
+ pads->abidata.ABI.pad_lock = g_mutex_new ();
+ pads->abidata.ABI.pad_cookie = 0;
+ pads->abidata.ABI.pad_list = NULL;
}
static void
gst_collect_pads_stop (pads);
g_cond_free (pads->cond);
+ g_mutex_free (pads->abidata.ABI.pad_lock);
/* Remove pads */
for (collected = pads->data; collected; collected = g_slist_next (collected)) {
GstCollectData *pdata = (GstCollectData *) collected->data;
- if (pdata->pad) {
+ if (pdata->pad)
gst_object_unref (pdata->pad);
- }
+
+ g_free (pdata);
}
/* Free pads list */
g_slist_free (pads->data);
- /* FIXME, free data */
-
G_OBJECT_CLASS (parent_class)->finalize (object);
}
data->collect = pads;
data->pad = gst_object_ref (pad);
data->buffer = NULL;
+ data->pos = 0;
gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
data->abidata.ABI.flushing = FALSE;
data->abidata.ABI.new_segment = FALSE;
+ data->abidata.ABI.eos = FALSE;
- GST_OBJECT_LOCK (pads);
- pads->data = g_slist_append (pads->data, data);
+ GST_COLLECT_PADS_PAD_LOCK (pads);
+ pads->abidata.ABI.pad_list =
+ g_slist_append (pads->abidata.ABI.pad_list, data);
gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
gst_pad_set_element_private (pad, data);
- pads->numpads++;
- pads->cookie++;
- GST_OBJECT_UNLOCK (pads);
+ pads->abidata.ABI.pad_cookie++;
+ GST_COLLECT_PADS_PAD_UNLOCK (pads);
return data;
}
g_return_val_if_fail (pad != NULL, FALSE);
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
- GST_OBJECT_LOCK (pads);
- list = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
+ GST_COLLECT_PADS_PAD_LOCK (pads);
+ list =
+ g_slist_find_custom (pads->abidata.ABI.pad_list, pad,
+ (GCompareFunc) find_pad);
if (list) {
+ pads->abidata.ABI.pad_list =
+ g_slist_delete_link (pads->abidata.ABI.pad_list, list);
+ /* clear the stuff we configured */
+ gst_pad_set_chain_function (pad, NULL);
+ gst_pad_set_event_function (pad, NULL);
+ /* FIXME, check that freeing the private data does not causes
+ * crashes in the streaming thread */
+ gst_pad_set_element_private (pad, NULL);
g_free (list->data);
- pads->data = g_slist_delete_link (pads->data, list);
gst_object_unref (pad);
+ pads->abidata.ABI.pad_cookie++;
}
- pads->numpads--;
- /* FIXME : if the pad has data queued we should decrease the number of
- queuedpads */
- pads->cookie++;
- GST_OBJECT_UNLOCK (pads);
+ GST_COLLECT_PADS_PAD_UNLOCK (pads);
- return list != NULL;
+ return (list != NULL);
}
/**
*
* Check if a pad is active.
*
+ * This function is currently not implemented.
+ *
* Returns: TRUE if the pad is active.
*
* MT safe.
* @pads: the collectspads to use
*
* Collect data on all pads. This function is usually called
- * from a GstTask function in an element. This function is
- * currently not implemented.
+ * from a GstTask function in an element.
+ *
+ * This function is currently not implemented.
*
* Returns: GstFlowReturn of the operation.
*
* @length: the length to collect
*
* Collect data with @offset and @length on all pads. This function
- * is typically called in the getrange function of an element. This
- * function is currently not implemented.
+ * is typically called in the getrange function of an element.
+ *
+ * This function is currently not implemented.
*
* Returns: GstFlowReturn of the operation.
*
return GST_FLOW_ERROR;
}
+/* FIXME, I think this function is used to work around bad behaviour
+ * of elements that add pads to themselves without activating them.
+ */
+static void
+gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
+{
+ GSList *walk = NULL;
+
+ GST_COLLECT_PADS_PAD_LOCK (pads);
+ /* Update the pads flushing flag */
+ for (walk = pads->data; walk; walk = g_slist_next (walk)) {
+ GstCollectData *cdata = walk->data;
+
+ if (GST_IS_PAD (cdata->pad)) {
+ GST_OBJECT_LOCK (cdata->pad);
+ if (flushing)
+ GST_PAD_SET_FLUSHING (cdata->pad);
+ else
+ GST_PAD_UNSET_FLUSHING (cdata->pad);
+ GST_OBJECT_UNLOCK (cdata->pad);
+ }
+ }
+ GST_COLLECT_PADS_PAD_UNLOCK (pads);
+}
+
/**
* gst_collect_pads_start:
* @pads: the collectspads to use
void
gst_collect_pads_start (GstCollectPads * pads)
{
- GSList *walk = NULL;
-
g_return_if_fail (pads != NULL);
g_return_if_fail (GST_IS_COLLECT_PADS (pads));
GST_DEBUG_OBJECT (pads, "starting collect pads");
+ /* make sure stop and collect cannot be called anymore */
GST_OBJECT_LOCK (pads);
- /* Set our pads as non flushing */
- walk = pads->data;
- while (walk) {
- GstCollectData *cdata = walk->data;
- if (GST_IS_PAD (cdata->pad)) {
- GST_OBJECT_LOCK (cdata->pad);
- GST_PAD_UNSET_FLUSHING (cdata->pad);
- GST_OBJECT_UNLOCK (cdata->pad);
- }
+ /* make pads streamable */
+ gst_collect_pads_set_flushing (pads, FALSE);
- walk = g_slist_next (walk);
- }
/* Start collect pads */
pads->started = TRUE;
GST_OBJECT_UNLOCK (pads);
void
gst_collect_pads_stop (GstCollectPads * pads)
{
- GSList *walk = NULL;
-
g_return_if_fail (pads != NULL);
g_return_if_fail (GST_IS_COLLECT_PADS (pads));
GST_DEBUG_OBJECT (pads, "stopping collect pads");
+ /* make sure collect and start cannot be called anymore */
GST_OBJECT_LOCK (pads);
- /* Set our pads as flushing */
- walk = pads->data;
- while (walk) {
- GstCollectData *cdata = walk->data;
- if (GST_IS_PAD (cdata->pad)) {
- GST_OBJECT_LOCK (cdata->pad);
- GST_PAD_SET_FLUSHING (cdata->pad);
- GST_OBJECT_UNLOCK (cdata->pad);
- }
+ /* make pads not accept data anymore */
+ gst_collect_pads_set_flushing (pads, TRUE);
- walk = g_slist_next (walk);
- }
/* Stop collect pads */
pads->started = FALSE;
- /* Wake them up */
+ /* Wake them up so then can end the chain functions. */
GST_COLLECT_PADS_BROADCAST (pads);
+
GST_OBJECT_UNLOCK (pads);
}
g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
g_return_val_if_fail (data != NULL, NULL);
- result = data->buffer;
-
- if (result)
+ if ((result = data->buffer))
gst_buffer_ref (result);
GST_DEBUG ("Peeking at pad %s:%s: buffer=%p",
g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
g_return_val_if_fail (data != NULL, NULL);
- result = data->buffer;
- if (result) {
+ if ((result = data->buffer)) {
buffer_p = &data->buffer;
gst_buffer_replace (buffer_p, NULL);
data->pos = 0;
+ /* one less pad with queued data now */
pads->queuedpads--;
}
for (collected = pads->data; collected; collected = g_slist_next (collected)) {
GstCollectData *pdata;
+ GstBuffer *buffer;
gint size;
pdata = (GstCollectData *) collected->data;
- if (pdata->buffer == NULL)
+ /* ignore pad with EOS */
+ if (pdata->abidata.ABI.eos)
+ continue;
+
+ /* an empty buffer not EOS is weird */
+ if ((buffer = pdata->buffer) == NULL)
goto not_filled;
- size = GST_BUFFER_SIZE (pdata->buffer) - pdata->pos;
+ /* this is the size left of the buffer */
+ size = GST_BUFFER_SIZE (buffer) - pdata->pos;
+ /* need to return the min of all available data */
if (size < result)
result = size;
}
+ /* nothing changed, all must be EOS then, return 0 */
+ if (result == G_MAXUINT)
+ result = 0;
+
return result;
not_filled:
guint8 ** bytes, guint size)
{
guint readsize;
+ GstBuffer *buffer;
g_return_val_if_fail (pads != NULL, 0);
g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
g_return_val_if_fail (data != NULL, 0);
g_return_val_if_fail (bytes != NULL, 0);
- readsize = MIN (size, GST_BUFFER_SIZE (data->buffer) - data->pos);
+ /* no buffer, must be EOS */
+ if ((buffer = data->buffer) == NULL)
+ return 0;
- *bytes = GST_BUFFER_DATA (data->buffer) + data->pos;
+ readsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
+
+ *bytes = GST_BUFFER_DATA (buffer) + data->pos;
return readsize;
}
guint size)
{
guint flushsize;
+ GstBuffer *buffer;
g_return_val_if_fail (pads != NULL, 0);
g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
g_return_val_if_fail (data != NULL, 0);
- flushsize = MIN (size, GST_BUFFER_SIZE (data->buffer) - data->pos);
+ /* no buffer, must be EOS */
+ if ((buffer = data->buffer) == NULL)
+ return 0;
+
+ /* this is what we can flush at max */
+ flushsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
data->pos += size;
- if (data->pos >= GST_BUFFER_SIZE (data->buffer)) {
+ if (data->pos >= GST_BUFFER_SIZE (buffer)) {
GstBuffer *buf;
+ /* _pop will also reset data->pos to 0 */
buf = gst_collect_pads_pop (pads, data);
gst_buffer_unref (buf);
}
return flushsize;
}
+/* see if pads were added or removed and update our stats. Any pad
+ * added after releasing the PAD_LOCK will get collected in the next
+ * round.
+ *
+ * We can do a quick check by checking the cookies, that get changed
+ * whenever the pad list is updated.
+ *
+ * Must be called with LOCK.
+ */
+static void
+gst_collect_pads_check_pads (GstCollectPads * pads)
+{
+ /* the master list and cookie are protected with the PAD_LOCK */
+ GST_COLLECT_PADS_PAD_LOCK (pads);
+ if (pads->abidata.ABI.pad_cookie != pads->cookie) {
+ GSList *collected;
+
+ /* clear list and stats */
+ pads->data = NULL;
+ pads->numpads = 0;
+ pads->queuedpads = 0;
+ pads->eospads = 0;
+
+ /* loop over the master pad list */
+ collected = pads->abidata.ABI.pad_list;
+ for (; collected; collected = g_slist_next (collected)) {
+ GstCollectData *data;
+
+ /* update the stats */
+ pads->numpads++;
+ data = collected->data;
+ if (data->buffer)
+ pads->queuedpads++;
+ if (data->abidata.ABI.eos)
+ pads->eospads++;
+
+ /* add to the list of pads to collect */
+ pads->data = g_slist_prepend (pads->data, data);
+ }
+ /* and update the cookie */
+ pads->cookie = pads->abidata.ABI.pad_cookie;
+ }
+ GST_COLLECT_PADS_PAD_UNLOCK (pads);
+}
+
+/* checks if all the pads are collected and call the collectfunction
+ *
+ * Should be called with LOCK.
+ *
+ * Returns: TRUE if the collectfunction was called, FALSE otherwise.
+ */
static gboolean
gst_collect_pads_is_collected (GstCollectPads * pads, GstFlowReturn * ret)
{
gboolean res = FALSE;
g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
+ g_return_val_if_fail (pads->func != NULL, FALSE);
+
+ /* check for new pads, update stats etc.. */
+ gst_collect_pads_check_pads (pads);
- /* If all our pads are EOS just collect once */
+ /* If all our pads are EOS just collect once to let the element
+ * do its final EOS handling. */
if (pads->eospads == pads->numpads) {
GST_DEBUG ("All active pads (%d) are EOS, calling %s",
pads->numpads, GST_DEBUG_FUNCPTR_NAME (pads->func));
function
FIXME: Shouldn't we also check gst_pad_is_blocked () somewhere
*/
- while (((pads->queuedpads + pads->eospads) >= pads->numpads) && pads->func) {
+ while (((pads->queuedpads + pads->eospads) >= pads->numpads)) {
GST_DEBUG ("All active pads (%d) have data, calling %s",
pads->numpads, GST_DEBUG_FUNCPTR_NAME (pads->func));
flow_ret = pads->func (pads, pads->user_data);
GST_DEBUG ("Not all active pads (%d) have data, continuing", pads->numpads);
}
- if (ret) {
+ if (ret)
*ret = flow_ret;
- }
return res;
}
static gboolean
gst_collect_pads_event (GstPad * pad, GstEvent * event)
{
+ gboolean res;
GstCollectData *data;
GstCollectPads *pads;
if (data == NULL)
goto not_ours;
+ res = TRUE;
+
pads = data->collect;
GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
/* forward event to unblock is_collected */
gst_pad_event_default (pad, event);
- /* now unblock the chain function
- no cond per pad, so they all unblock, non-flushing block again */
+ /* now unblock the chain function.
+ * no cond per pad, so they all unblock,
+ * non-flushing block again */
GST_OBJECT_LOCK (pads);
data->abidata.ABI.flushing = TRUE;
GST_COLLECT_PADS_BROADCAST (pads);
GST_OBJECT_UNLOCK (pads);
/* event already cleaned up by forwarding */
- return TRUE;
+ goto done;
}
case GST_EVENT_FLUSH_STOP:
{
GST_OBJECT_LOCK (pads);
data->abidata.ABI.flushing = FALSE;
gst_collect_pads_pop (pads, data);
+ /* if the pad was EOS, remove the EOS flag and
+ * decrement the number of eospads */
+ if (data->abidata.ABI.eos == TRUE) {
+ pads->eospads--;
+ data->abidata.ABI.eos = FALSE;
+ }
GST_OBJECT_UNLOCK (pads);
- goto beach;
+
+ /* forward event */
+ goto forward;
}
case GST_EVENT_EOS:
{
GST_OBJECT_LOCK (pads);
-
- pads->eospads++;
-
+ /* if the pad was not EOS, make it EOS and so we
+ * have one more eospad */
+ if (data->abidata.ABI.eos == FALSE) {
+ data->abidata.ABI.eos = TRUE;
+ pads->eospads++;
+ }
+ /* check if we need collecting anything */
gst_collect_pads_is_collected (pads, NULL);
-
GST_OBJECT_UNLOCK (pads);
- /* We eat this event */
+ /* We eat this event, element should do something
+ * in the collected callback. */
gst_event_unref (event);
- return TRUE;
+ goto done;
}
case GST_EVENT_NEWSEGMENT:
{
gint64 start, stop, time;
- gdouble rate;
+ gdouble rate, arate;
GstFormat format;
gboolean update;
- gst_event_parse_new_segment (event, &update, &rate, &format,
+ gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
&start, &stop, &time);
GST_DEBUG_OBJECT (data->pad, "got newsegment, start %" GST_TIME_FORMAT
if (data->segment.format != format)
gst_segment_init (&data->segment, format);
- gst_segment_set_newsegment (&data->segment, update, rate, format,
- start, stop, time);
+ gst_segment_set_newsegment_full (&data->segment, update, rate, arate,
+ format, start, stop, time);
data->abidata.ABI.new_segment = TRUE;
- /* For now we eat this event */
+ /* we must not forward this event since multiple segments will be
+ * accumulated and this is certainly not what we want. */
gst_event_unref (event);
/* FIXME: collect-pads based elements need to create their own newsegment
event (and only one really)
(that's what avimux does for something IIRC)
see #340060
*/
- return TRUE;
+ goto done;
}
default:
- goto beach;
+ /* forward other events */
+ goto forward;
}
-beach:
- return gst_pad_event_default (pad, event);
+forward:
+ res = gst_pad_event_default (pad, event);
+
+done:
+ return res;
/* ERRORS */
not_ours:
}
/* For each buffer we receive we check if our collected condition is reached
- and if so we call the collected function. When this is done we check if
- data has been unqueued. If data is still queued we wait holding the stream
- lock to make sure no EOS event can happen while we are ready to be
- collected */
+ * and if so we call the collected function. When this is done we check if
+ * data has been unqueued. If data is still queued we wait holding the stream
+ * lock to make sure no EOS event can happen while we are ready to be
+ * collected
+ */
static GstFlowReturn
gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
{
buffer_p = &data->buffer;
gst_buffer_replace (buffer_p, buffer);
- if (data->segment.format == GST_FORMAT_TIME
- && GST_BUFFER_TIMESTAMP_IS_VALID (buffer))
- gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME,
- GST_BUFFER_TIMESTAMP (buffer));
+ /* update segment last position if in TIME */
+ if (data->segment.format == GST_FORMAT_TIME) {
+ GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buffer);
+
+ if (GST_CLOCK_TIME_IS_VALID (timestamp))
+ gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME, timestamp);
+ }
/* Check if our collected condition is matched and call the collected function
- if it is */
+ * if it is */
gst_collect_pads_is_collected (pads, &ret);
/* We still have data queued on this pad, wait for something to happen */
if (data->abidata.ABI.flushing)
goto flushing;
}
-
GST_OBJECT_UNLOCK (pads);
return ret;