/* info structure passed to selected-samples signal, must only be accessed
* from the aggregate thread */
GstStructure *selected_samples_info;
+
+ /* Only access from src thread */
+ /* Messages to post after releasing locks */
+ GQueue messages;
};
#define GST_AUDIO_AGGREGATOR_LOCK(self) g_mutex_lock (&(self)->priv->mutex);
aagg->priv->selected_samples_info =
gst_structure_new_empty ("GstAudioAggregatorSelectedSamplesInfo");
+
+ g_queue_init (&aagg->priv->messages);
}
static void
return buffer;
}
+
+/* Called with the object lock for both the element and pad held,
+ * as well as the audio aggregator lock.
+ * Should only be called on the output queue.
+ */
+static GstClockTime
+gst_audio_aggregator_pad_enqueue_qos_message (GstAudioAggregatorPad * pad,
+ GstAudioAggregator * aagg, guint64 samples)
+{
+ GstAggregator *agg = GST_AGGREGATOR (aagg);
+ GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
+ GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
+
+ guint rate_output = GST_AUDIO_INFO_RATE (&srcpad->info);
+ GstClockTime offset = gst_util_uint64_scale (GST_SECOND, pad->priv->position,
+ rate_output);
+ GstClockTime timestamp = GST_BUFFER_PTS (pad->priv->buffer) + offset;
+ GstClockTime running_time =
+ gst_segment_to_running_time (&aggpad->segment, GST_FORMAT_TIME,
+ timestamp);
+ GstClockTime stream_time = gst_segment_to_stream_time (&aggpad->segment,
+ GST_FORMAT_TIME, timestamp);
+ GstClockTime duration;
+ guint rate_input;
+ guint64 processed, dropped;
+ GstMessage *msg;
+
+ if (GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (pad)->convert_buffer)
+ rate_input = GST_AUDIO_INFO_RATE (&srcpad->info);
+ else
+ rate_input = GST_AUDIO_INFO_RATE (&pad->info);
+
+ duration = gst_util_uint64_scale (samples, GST_SECOND, rate_input);
+
+ processed = gst_util_uint64_scale (pad->priv->processed, rate_input,
+ rate_output);
+ dropped = gst_util_uint64_scale (pad->priv->dropped, rate_output,
+ rate_output);
+
+ msg = gst_message_new_qos (GST_OBJECT (aggpad), TRUE, running_time,
+ stream_time, timestamp, duration);
+ gst_message_set_qos_stats (msg, GST_FORMAT_DEFAULT, processed, dropped);
+
+ g_queue_push_tail (&aagg->priv->messages, msg);
+
+ return running_time;
+}
+
+static void
+gst_audio_aggregator_post_messages (GstAudioAggregator * aagg)
+{
+ if (g_queue_get_length (&aagg->priv->messages) != 0) {
+ GstClockTime latency = gst_aggregator_get_latency (GST_AGGREGATOR (aagg));
+ gboolean is_live = GST_CLOCK_TIME_IS_VALID (latency);
+ GstElement *e = GST_ELEMENT (aagg);
+ GstMessage *msg;
+
+ while ((msg = g_queue_pop_head (&aagg->priv->messages))) {
+ if (is_live) {
+ GstStructure *s = gst_message_writable_structure (msg);
+ gst_structure_set (s, "live", G_TYPE_BOOLEAN, TRUE, NULL);
+ }
+
+ gst_element_post_message (e, msg);
+ }
+ }
+}
+
/* Called with the object lock for both the element and pad held,
* as well as the aagg lock
*
end_output_offset = start_output_offset + pad->priv->size;
if (end_output_offset < aagg->priv->offset) {
+ GstClockTime rt;
+
+ pad->priv->dropped += pad->priv->size;
+ rt = gst_audio_aggregator_pad_enqueue_qos_message (pad, aagg,
+ pad->priv->size);
+ GST_DEBUG_OBJECT (pad, "Dropped buffer of %u samples at running time %"
+ GST_TIME_FORMAT " because input buffer is entirely before current"
+ " output offset", pad->priv->size, GST_TIME_ARGS (rt));
+
pad->priv->position = 0;
pad->priv->size = 0;
GST_DEBUG_OBJECT (pad,
}
pad->priv->dropped += MIN (diff, pad->priv->size);
+ if (diff != 0 && pad->priv->qos_messages) {
+ GstClockTime rt;
+
+ rt = gst_audio_aggregator_pad_enqueue_qos_message (pad, aagg, diff);
+ GST_DEBUG_OBJECT (pad, "Dropped %u samples at running time %"
+ GST_TIME_FORMAT " because input buffer starts before current"
+ " output offset", diff, GST_TIME_ARGS (rt));
+ }
pad->priv->position += diff;
if (pad->priv->position >= pad->priv->size) {
/* Empty buffer, drop */
if (pad->priv->position + diff > pad->priv->size)
diff = pad->priv->size - pad->priv->position;
pad->priv->dropped += diff;
+ if (diff != 0 && pad->priv->qos_messages) {
+ GstClockTime rt;
+
+ rt = gst_audio_aggregator_pad_enqueue_qos_message (pad, aagg, diff);
+ GST_DEBUG_OBJECT (pad, "Dropped %" G_GINT64_FORMAT " samples at"
+ " running time %" GST_TIME_FORMAT " because input buffer is before"
+ " output offset", diff, GST_TIME_ARGS (rt));
+ }
pad->priv->position += diff;
pad->priv->output_offset += diff;
}
GST_OBJECT_UNLOCK (agg);
+ gst_audio_aggregator_post_messages (aagg);
+
{
gst_structure_set (aagg->priv->selected_samples_info, "offset",
G_TYPE_UINT64, aagg->priv->offset, "frames", G_TYPE_UINT, blocksize,