*/
struct _GstBaseSinkPrivate
{
+ GQueue *preroll_queue;
+ gint preroll_queued;
+
gint qos_enabled; /* ATOMIC */
gboolean async_enabled;
GstClockTimeDiff ts_offset;
gobject_class->set_property = gst_base_sink_set_property;
gobject_class->get_property = gst_base_sink_get_property;
- /* FIXME, this next value should be configured using an event from the
- * upstream element, ie, the BUFFER_SIZE event. */
- g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
- g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
- "Number of buffers to queue during preroll", 0, G_MAXUINT,
- DEFAULT_PREROLL_QUEUE_LEN,
- G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
-
g_object_class_install_property (gobject_class, PROP_SYNC,
g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
basesink->pad_mode = GST_ACTIVATE_NONE;
basesink->preroll_lock = g_mutex_new ();
basesink->preroll_cond = g_cond_new ();
- basesink->preroll_queue = g_queue_new ();
+ priv->preroll_queue = g_queue_new ();
priv->have_latency = FALSE;
basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
g_mutex_free (basesink->preroll_lock);
g_cond_free (basesink->preroll_cond);
- g_queue_free (basesink->preroll_queue);
+ g_queue_free (basesink->priv->preroll_queue);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
GstBaseSink *sink = GST_BASE_SINK (object);
switch (prop_id) {
- case PROP_PREROLL_QUEUE_LEN:
- /* preroll lock necessary to serialize with finish_preroll */
- GST_BASE_SINK_PREROLL_LOCK (sink);
- g_atomic_int_set (&sink->preroll_queue_max_len, g_value_get_uint (value));
- GST_BASE_SINK_PREROLL_UNLOCK (sink);
- break;
case PROP_SYNC:
gst_base_sink_set_sync (sink, g_value_get_boolean (value));
break;
GstBaseSink *sink = GST_BASE_SINK (object);
switch (prop_id) {
- case PROP_PREROLL_QUEUE_LEN:
- g_value_set_uint (value, g_atomic_int_get (&sink->preroll_queue_max_len));
- break;
case PROP_SYNC:
g_value_set_boolean (value, gst_base_sink_get_sync (sink));
break;
GstMiniObject *obj;
GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
- while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
+ while ((obj = g_queue_pop_head (basesink->priv->preroll_queue))) {
GST_DEBUG_OBJECT (basesink, "popped %p", obj);
gst_mini_object_unref (obj);
}
basesink->have_preroll = FALSE;
basesink->priv->step_unlock = FALSE;
basesink->eos_queued = FALSE;
- basesink->preroll_queued = 0;
- basesink->buffers_queued = 0;
- basesink->events_queued = 0;
+ basesink->priv->preroll_queued = 0;
/* can't report latency anymore until we preroll again */
if (basesink->priv->async_enabled) {
GST_OBJECT_LOCK (basesink);
if (G_UNLIKELY (basesink->need_preroll)) {
if (G_LIKELY (prerollable))
- basesink->preroll_queued++;
+ basesink->priv->preroll_queued++;
- length = basesink->preroll_queued;
+ length = basesink->priv->preroll_queued;
GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
if (G_UNLIKELY (basesink->need_preroll)) {
/* see if we can render now, if we can't add the object to the preroll
* queue. */
- if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
+ if (G_UNLIKELY (length <= 0))
goto more_preroll;
}
}
/* we can start rendering (or blocking) the queued object
* if any. */
- q = basesink->preroll_queue;
+ q = basesink->priv->preroll_queue;
while (G_UNLIKELY (!g_queue_is_empty (q))) {
GstMiniObject *o;
guint8 ot;
/* now render the object */
ret = gst_base_sink_render_object (basesink, pad, obj_type, obj);
- basesink->preroll_queued = 0;
+ basesink->priv->preroll_queued = 0;
return ret;
more_preroll:
{
/* add object to the queue and return */
- GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
- length, basesink->preroll_queue_max_len);
- g_queue_push_tail (basesink->preroll_queue, obj);
+ GST_DEBUG_OBJECT (basesink, "need more preroll data");
+ g_queue_push_tail (basesink->priv->preroll_queue, obj);
return GST_FLOW_OK;
}
dequeue_failed:
/*< protected >*/ /* with PREROLL_LOCK */
GMutex *preroll_lock;
GCond *preroll_cond;
- GQueue *preroll_queue;
- gint preroll_queue_max_len; /* FIXME-0.11: the property is guint */
- gint preroll_queued;
- gint buffers_queued;
- gint events_queued;
gboolean eos;
gboolean eos_queued;
gboolean need_preroll;
gst_object_unref (pipeline);
}
-GST_END_TEST static void
+GST_END_TEST static gpointer
send_eos (GstPad * sinkpad)
{
- gst_pad_send_event (sinkpad, gst_event_new_eos ());
+ gboolean ret;
+
+ ret = gst_pad_send_event (sinkpad, gst_event_new_eos ());
+
+ return GINT_TO_POINTER (ret);
}
/* push a buffer with a very long duration in a fakesink, then push an EOS
ret = gst_element_set_state (pipeline, GST_STATE_NULL);
fail_unless (ret == GST_STATE_CHANGE_SUCCESS, "no SUCCESS state return");
+ /* we can join now */
+ g_thread_join (thread);
+
gst_object_unref (pipeline);
}
GST_START_TEST (test_async_done)
{
GstElement *sink;
- GstBuffer *buffer;
GstEvent *event;
GstStateChangeReturn ret;
GstPad *sinkpad;
sink = gst_element_factory_make ("fakesink", "sink");
g_object_set (G_OBJECT (sink), "sync", TRUE, NULL);
- g_object_set (G_OBJECT (sink), "preroll-queue-len", 2, NULL);
g_object_set (G_OBJECT (sink), "signal-handoffs", TRUE, NULL);
g_signal_connect (sink, "preroll-handoff", (GCallback) async_done_handoff,
fail_unless (qret == TRUE, "position wrong");
fail_unless (position == 10 * GST_SECOND, "position is wrong");
- /* Since we are paused and the preroll queue has a length of 2, this function
- * will return immediatly, the preroll handoff will be called and the stream
- * position should now be 10 seconds. */
- GST_DEBUG ("pushing first buffer");
- buffer = gst_buffer_new_and_alloc (10);
- GST_BUFFER_TIMESTAMP (buffer) = 1 * GST_SECOND;
- GST_BUFFER_DURATION (buffer) = 100 * GST_SECOND;
- res = gst_pad_chain (sinkpad, buffer);
- fail_unless (res == GST_FLOW_OK, "no OK flow return");
-
- /* scond buffer, will not block either but position should still be 10
- * seconds */
- GST_DEBUG ("pushing second buffer");
- buffer = gst_buffer_new_and_alloc (10);
- GST_BUFFER_TIMESTAMP (buffer) = 100 * GST_SECOND;
- GST_BUFFER_DURATION (buffer) = 100 * GST_SECOND;
- res = gst_pad_chain (sinkpad, buffer);
- fail_unless (res == GST_FLOW_OK, "no OK flow return");
-
- /* check if position is still 10 seconds */
- gst_element_query_position (sink, GST_FORMAT_TIME, &position);
- GST_DEBUG ("first buffer position %" GST_TIME_FORMAT,
- GST_TIME_ARGS (position));
- fail_unless (position == 10 * GST_SECOND, "position is wrong");
-
/* last buffer, blocks because preroll queue is filled. Start the push in a
* new thread so that we can check the position */
GST_DEBUG ("starting thread");
GstPad *sinkpad;
gboolean res;
GstBus *bus;
- gint64 position;
gboolean qret;
GstSegment segment;
+ gint64 position;
+ GThread *thread;
sink = gst_element_factory_make ("fakesink", "sink");
g_object_set (G_OBJECT (sink), "sync", TRUE, NULL);
- g_object_set (G_OBJECT (sink), "preroll-queue-len", 1, NULL);
sinkpad = gst_element_get_static_pad (sink, "sink");
fail_unless (qret == TRUE, "position wrong");
fail_unless (position == 10 * GST_SECOND, "position is wrong");
- /* Since we are paused and the preroll queue has a length of 1, this function
- * will return immediatly. The EOS will complete the preroll and the
- * position should now be 10 seconds. */
GST_DEBUG ("pushing EOS");
- event = gst_event_new_eos ();
- res = gst_pad_send_event (sinkpad, event);
- fail_unless (res == TRUE, "no TRUE return");
+ GST_DEBUG ("starting thread");
+ thread = g_thread_create ((GThreadFunc) send_eos, sinkpad, TRUE, NULL);
+ fail_if (thread == NULL, "no thread");
+
+ /* wait for preroll */
+ gst_element_get_state (sink, NULL, NULL, -1);
/* check if position is still 10 seconds */
gst_element_query_position (sink, GST_FORMAT_TIME, &position);
GST_DEBUG ("EOS position %" GST_TIME_FORMAT, GST_TIME_ARGS (position));
fail_unless (position == 10 * GST_SECOND, "position is wrong");
- gst_object_unref (sinkpad);
-
gst_element_set_state (sink, GST_STATE_NULL);
+ g_thread_join (thread);
+
+ gst_object_unref (sinkpad);
gst_object_unref (sink);
gst_object_unref (bus);
}
g_return_if_fail (fakesrc && fakesink && pipeline);
- g_object_set (G_OBJECT (fakesink), "preroll-queue-len", 4, NULL);
-
gst_bin_add_many (GST_BIN (pipeline), fakesrc, fakesink, NULL);
gst_element_link (fakesrc, fakesink);