* queues is filled.
* Both signals are emitted from the context of the streaming thread.
* </para>
+ * <para>
+ * When using #GstMultiQueue:sync-by-running-time the unlinked streams will
+ * be throttled by the highest running-time of linked streams. This allows
+ * further relinking of those unlinked streams without them being in the
+ * future (i.e. to achieve gapless playback).
+ * When dealing with streams which have got different consumption requirements
+ * downstream (ex: video decoders which will consume more buffer (in time) than
+ * audio decoders), it is recommended to group streams of the same type
+ * by using the pad "group-id" property. This will further throttle streams
+ * in time within that group.
+ * </para>
* </refsect2>
*/
{
/* unique identifier of the queue */
guint id;
+ /* group of streams to which this queue belongs to */
+ guint groupid;
GstMultiQueue *mqueue;
static void wake_up_next_non_linked (GstMultiQueue * mq);
static void compute_high_id (GstMultiQueue * mq);
-static void compute_high_time (GstMultiQueue * mq);
+static GstClockTimeDiff compute_high_time (GstMultiQueue * mq, guint groupid);
static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
PROP_LAST
};
+/* GstMultiQueuePad */
+
+#define DEFAULT_PAD_GROUP_ID 0
+
+enum
+{
+ PROP_PAD_0,
+ PROP_PAD_GROUP_ID,
+};
+
+#define GST_TYPE_MULTIQUEUE_PAD (gst_multiqueue_pad_get_type())
+#define GST_MULTIQUEUE_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_MULTIQUEUE_PAD,GstMultiQueuePad))
+#define GST_IS_MULTIQUEUE_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_MULTIQUEUE_PAD))
+#define GST_MULTIQUEUE_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass) ,GST_TYPE_MULTIQUEUE_PAD,GstMultiQueuePadClass))
+#define GST_IS_MULTIQUEUE_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass) ,GST_TYPE_MULTIQUEUE_PAD))
+#define GST_MULTIQUEUE_PAD_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj) ,GST_TYPE_MULTIQUEUE_PAD,GstMultiQueuePadClass))
+
+struct _GstMultiQueuePad
+{
+ GstPad parent;
+
+ GstSingleQueue *sq;
+};
+
+struct _GstMultiQueuePadClass
+{
+ GstPadClass parent_class;
+};
+
+GType gst_multiqueue_pad_get_type (void);
+
+G_DEFINE_TYPE (GstMultiQueuePad, gst_multiqueue_pad, GST_TYPE_PAD);
+static void
+gst_multiqueue_pad_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstMultiQueuePad *pad = GST_MULTIQUEUE_PAD (object);
+
+ switch (prop_id) {
+ case PROP_PAD_GROUP_ID:
+ if (pad->sq)
+ g_value_set_uint (value, pad->sq->groupid);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_multiqueue_pad_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstMultiQueuePad *pad = GST_MULTIQUEUE_PAD (object);
+
+ switch (prop_id) {
+ case PROP_PAD_GROUP_ID:
+ GST_OBJECT_LOCK (pad);
+ if (pad->sq)
+ pad->sq->groupid = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (pad);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_multiqueue_pad_class_init (GstMultiQueuePadClass * klass)
+{
+ GObjectClass *gobject_class = (GObjectClass *) klass;
+
+ gobject_class->set_property = gst_multiqueue_pad_set_property;
+ gobject_class->get_property = gst_multiqueue_pad_get_property;
+
+ g_object_class_install_property (gobject_class, PROP_PAD_GROUP_ID,
+ g_param_spec_uint ("group-id", "Group ID",
+ "Group to which this pad belongs", 0, G_MAXUINT32,
+ DEFAULT_PAD_GROUP_ID, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+}
+
+static void
+gst_multiqueue_pad_init (GstMultiQueuePad * pad)
+{
+
+}
+
+
#define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
g_mutex_lock (&q->qlock); \
} G_STMT_END
sq->cached_sinktime = GST_CLOCK_STIME_NONE;
gst_data_queue_set_flushing (sq->queue, FALSE);
+ /* We will become active again on the next buffer/gap */
+ sq->active = FALSE;
+
/* Reset high time to be recomputed next */
mq->high_time = GST_CLOCK_STIME_NONE;
sq->oldid = sq->last_oldid;
if (sq->srcresult == GST_FLOW_NOT_LINKED) {
+ GstClockTimeDiff high_time;
/* Go to sleep until it's time to push this buffer */
/* Recompute the highid */
compute_high_id (mq);
/* Recompute the high time */
- compute_high_time (mq);
+ high_time = compute_high_time (mq, sq->groupid);
+
+ GST_DEBUG_OBJECT (mq,
+ "groupid %d high_time %" GST_STIME_FORMAT " next_time %"
+ GST_STIME_FORMAT, sq->groupid, GST_STIME_ARGS (high_time),
+ GST_STIME_ARGS (next_time));
while (((mq->sync_by_running_time && GST_CLOCK_STIME_IS_VALID (next_time)
- && (mq->high_time == GST_CLOCK_STIME_NONE
- || next_time > mq->high_time))
+ && (high_time == GST_CLOCK_STIME_NONE
+ || next_time > high_time))
|| (!mq->sync_by_running_time && newid > mq->highid))
&& sq->srcresult == GST_FLOW_NOT_LINKED) {
"queue %d sleeping for not-linked wakeup with "
"newid %u, highid %u, next_time %" GST_STIME_FORMAT
", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid,
- GST_STIME_ARGS (next_time), GST_STIME_ARGS (mq->high_time));
+ GST_STIME_ARGS (next_time), GST_STIME_ARGS (high_time));
/* Wake up all non-linked pads before we sleep */
wake_up_next_non_linked (mq);
}
/* Recompute the high time and ID */
- compute_high_time (mq);
+ high_time = compute_high_time (mq, sq->groupid);
compute_high_id (mq);
GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
"wakeup with newid %u, highid %u, next_time %" GST_STIME_FORMAT
", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid,
- GST_STIME_ARGS (next_time), GST_STIME_ARGS (mq->high_time));
+ GST_STIME_ARGS (next_time), GST_STIME_ARGS (high_time));
}
/* Re-compute the high_id in case someone else pushed */
compute_high_id (mq);
- compute_high_time (mq);
+ compute_high_time (mq, sq->groupid);
} else {
compute_high_id (mq);
- compute_high_time (mq);
+ compute_high_time (mq, sq->groupid);
/* Wake up all non-linked pads */
wake_up_next_non_linked (mq);
}
/* Need to make sure wake up any sleeping pads when we exit */
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
if (mq->numwaiting > 0 && GST_PAD_IS_EOS (sq->srcpad)) {
- compute_high_time (mq);
+ compute_high_time (mq, sq->groupid);
compute_high_id (mq);
wake_up_next_non_linked (mq);
}
}
/* WITH LOCK TAKEN */
-static void
-compute_high_time (GstMultiQueue * mq)
+static GstClockTimeDiff
+compute_high_time (GstMultiQueue * mq, guint groupid)
{
/* The high-time is either the highest last time among the linked
* pads, or if all pads are not-linked, it's the lowest nex time of
GList *tmp;
GstClockTimeDiff highest = GST_CLOCK_STIME_NONE;
GstClockTimeDiff lowest = GST_CLOCK_STIME_NONE;
+ GstClockTimeDiff group_high = GST_CLOCK_STIME_NONE;
+ GstClockTimeDiff group_low = GST_CLOCK_STIME_NONE;
+ /* Number of streams which belong to groupid */
+ guint group_count = 0;
if (!mq->sync_by_running_time)
- return;
+ return GST_CLOCK_STIME_NONE;
for (tmp = mq->queues; tmp; tmp = tmp->next) {
GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
GST_LOG_OBJECT (mq,
- "inspecting sq:%d , next_time:%" GST_STIME_FORMAT ", last_time:%"
- GST_STIME_FORMAT ", srcresult:%s", sq->id,
+ "inspecting sq:%d (group:%d) , next_time:%" GST_STIME_FORMAT
+ ", last_time:%" GST_STIME_FORMAT ", srcresult:%s", sq->id, sq->groupid,
GST_STIME_ARGS (sq->next_time), GST_STIME_ARGS (sq->last_time),
gst_flow_get_name (sq->srcresult));
+ if (sq->groupid == groupid)
+ group_count++;
+
if (sq->srcresult == GST_FLOW_NOT_LINKED) {
/* No need to consider queues which are not waiting */
if (!GST_CLOCK_STIME_IS_VALID (sq->next_time)) {
if (lowest == GST_CLOCK_STIME_NONE || sq->next_time < lowest)
lowest = sq->next_time;
+ if (sq->groupid == groupid && (group_low == GST_CLOCK_STIME_NONE
+ || sq->next_time < group_low))
+ group_low = sq->next_time;
} else if (!GST_PAD_IS_EOS (sq->srcpad)) {
/* If we don't have a global high time, or the global high time
* is lower than this single queue's last outputted time, store
if (highest == GST_CLOCK_STIME_NONE
|| (sq->last_time != GST_CLOCK_STIME_NONE && sq->last_time > highest))
highest = sq->last_time;
+ if (sq->groupid == groupid && (group_high == GST_CLOCK_STIME_NONE
+ || (sq->last_time != GST_CLOCK_STIME_NONE
+ && sq->last_time > group_high)))
+ group_high = sq->last_time;
}
GST_LOG_OBJECT (mq,
"highest now %" GST_STIME_FORMAT " lowest %" GST_STIME_FORMAT,
GST_STIME_ARGS (highest), GST_STIME_ARGS (lowest));
+ if (sq->groupid == groupid)
+ GST_LOG_OBJECT (mq,
+ "grouphigh %" GST_STIME_FORMAT " grouplow %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (group_high), GST_STIME_ARGS (group_low));
}
if (highest == GST_CLOCK_STIME_NONE)
else
mq->high_time = highest;
+ GST_LOG_OBJECT (mq, "group count %d for groupid %u", group_count, groupid);
GST_LOG_OBJECT (mq,
"High time is now : %" GST_STIME_FORMAT ", lowest non-linked %"
GST_STIME_FORMAT, GST_STIME_ARGS (mq->high_time),
GST_STIME_ARGS (lowest));
+
+ /* If there's only one stream of a given type, use the global high */
+ if (group_count < 2)
+ return mq->high_time;
+
+ if (group_high == GST_CLOCK_STIME_NONE)
+ return group_low;
+ return group_high;
}
#define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \
gst_single_queue_new (GstMultiQueue * mqueue, guint id)
{
GstSingleQueue *sq;
+ GstMultiQueuePad *mqpad;
+ GstPadTemplate *templ;
gchar *name;
GList *tmp;
guint temp_id = (id == -1) ? 0 : id;
sq = g_new0 (GstSingleQueue, 1);
mqueue->nbqueues++;
sq->id = temp_id;
+ sq->groupid = DEFAULT_PAD_GROUP_ID;
mqueue->queues = g_list_insert_before (mqueue->queues, tmp, sq);
mqueue->queues_cookie++;
sq->src_tainted = TRUE;
name = g_strdup_printf ("sink_%u", sq->id);
- sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, name);
+ templ = gst_static_pad_template_get (&sinktemplate);
+ sq->sinkpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name,
+ "direction", templ->direction, "template", templ, NULL);
+ gst_object_unref (templ);
g_free (name);
+ mqpad = (GstMultiQueuePad *) sq->sinkpad;
+ mqpad->sq = sq;
+
gst_pad_set_chain_function (sq->sinkpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_chain));
gst_pad_set_activatemode_function (sq->sinkpad,