* default: G_MAXUINT32 */
guint32 stream_start_seqnum;
guint32 segment_seqnum;
+ guint group_id;
} GstStream;
/* Must be called with lock! */
{
GstStream *stream, *ostream;
guint32 seqnum = gst_event_get_seqnum (event);
+ guint group_id;
+ gboolean have_group_id;
GList *l;
gboolean all_wait = TRUE;
gboolean new_stream = TRUE;
+ have_group_id = gst_event_parse_group_id (event, &group_id);
+
GST_STREAM_SYNCHRONIZER_LOCK (self);
+ self->have_group_id &= have_group_id;
+ have_group_id = self->have_group_id;
+
stream = gst_pad_get_element_private (pad);
- if (stream && stream->stream_start_seqnum != seqnum) {
+
+ if (!stream) {
+ GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
+ GST_STREAM_SYNCHRONIZER_UNLOCK (self);
+ break;
+ }
+
+ if ((have_group_id && stream->group_id != group_id) || (!have_group_id
+ && stream->stream_start_seqnum != seqnum)) {
stream->is_eos = FALSE;
stream->stream_start_seqnum = seqnum;
+ stream->group_id = group_id;
stream->drop_discont = TRUE;
- /* Check if this belongs to a stream that is already there,
- * e.g. we got the visualizations for an audio stream */
- for (l = self->streams; l; l = l->next) {
- ostream = l->data;
+ if (!have_group_id) {
+ /* Check if this belongs to a stream that is already there,
+ * e.g. we got the visualizations for an audio stream */
+ for (l = self->streams; l; l = l->next) {
+ ostream = l->data;
+
+ if (ostream != stream && ostream->stream_start_seqnum == seqnum
+ && !ostream->wait) {
+ new_stream = FALSE;
+ break;
+ }
+ }
- if (ostream != stream && ostream->stream_start_seqnum == seqnum
- && !ostream->wait) {
- new_stream = FALSE;
+ if (!new_stream) {
+ GST_DEBUG_OBJECT (pad,
+ "Stream %d belongs to running stream %d, no waiting",
+ stream->stream_number, ostream->stream_number);
+ stream->wait = FALSE;
+ stream->new_stream = FALSE;
+
+ GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
}
+ } else if (group_id == self->group_id) {
+ GST_DEBUG_OBJECT (pad, "Stream %d belongs to running group %d, "
+ "no waiting", stream->stream_number, group_id);
+ GST_STREAM_SYNCHRONIZER_UNLOCK (self);
+ break;
}
- if (!new_stream) {
- GST_DEBUG_OBJECT (pad,
- "Stream %d belongs to running stream %d, no waiting",
- stream->stream_number, ostream->stream_number);
- stream->wait = FALSE;
- stream->new_stream = FALSE;
- } else {
- GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
+ GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
- stream->wait = TRUE;
- stream->new_stream = TRUE;
+ stream->wait = TRUE;
+ stream->new_stream = TRUE;
- for (l = self->streams; l; l = l->next) {
- GstStream *ostream = l->data;
+ for (l = self->streams; l; l = l->next) {
+ GstStream *ostream = l->data;
- all_wait = all_wait && ostream->wait;
- if (!all_wait)
- break;
- }
- if (all_wait) {
- gint64 position = 0;
+ all_wait = all_wait && ostream->wait && (!have_group_id
+ || ostream->group_id == group_id);
+ if (!all_wait)
+ break;
+ }
+ if (all_wait) {
+ gint64 position = 0;
+
+ if (have_group_id)
+ GST_DEBUG_OBJECT (self,
+ "All streams have changed to group id %u -- unblocking",
+ group_id);
+ else
GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
- for (l = self->streams; l; l = l->next) {
- GstStream *ostream = l->data;
- gint64 stop_running_time;
- gint64 position_running_time;
-
- ostream->wait = FALSE;
-
- if (ostream->segment.format == GST_FORMAT_TIME) {
- stop_running_time =
- gst_segment_to_running_time (&ostream->segment,
- GST_FORMAT_TIME, ostream->segment.stop);
- position_running_time =
- gst_segment_to_running_time (&ostream->segment,
- GST_FORMAT_TIME, ostream->segment.position);
- position =
- MAX (position, MAX (stop_running_time,
- position_running_time));
- }
+ self->group_id = group_id;
+
+ for (l = self->streams; l; l = l->next) {
+ GstStream *ostream = l->data;
+ gint64 stop_running_time;
+ gint64 position_running_time;
+
+ ostream->wait = FALSE;
+
+ if (ostream->segment.format == GST_FORMAT_TIME) {
+ stop_running_time =
+ gst_segment_to_running_time (&ostream->segment,
+ GST_FORMAT_TIME, ostream->segment.stop);
+ position_running_time =
+ gst_segment_to_running_time (&ostream->segment,
+ GST_FORMAT_TIME, ostream->segment.position);
+ position =
+ MAX (position, MAX (stop_running_time,
+ position_running_time));
}
- position = MAX (0, position);
- self->group_start_time = MAX (self->group_start_time, position);
+ }
+ position = MAX (0, position);
+ self->group_start_time = MAX (self->group_start_time, position);
- GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
- GST_TIME_ARGS (self->group_start_time));
+ GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (self->group_start_time));
- for (l = self->streams; l; l = l->next) {
- GstStream *ostream = l->data;
- g_cond_broadcast (&ostream->stream_finish_cond);
- }
+ for (l = self->streams; l; l = l->next) {
+ GstStream *ostream = l->data;
+ g_cond_broadcast (&ostream->stream_finish_cond);
}
}
- } else {
- GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
g_cond_init (&stream->stream_finish_cond);
stream->stream_start_seqnum = G_MAXUINT32;
stream->segment_seqnum = G_MAXUINT32;
+ stream->group_id = G_MAXUINT;
tmp = g_strdup_printf ("sink_%u", self->current_stream_number);
stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
}
}
g_assert (l != NULL);
+ if (self->streams == NULL) {
+ self->have_group_id = TRUE;
+ self->group_id = G_MAXUINT;
+ }
/* we can drop the lock, since stream exists now only local.
* Moreover, we should drop, to prevent deadlock with STREAM_LOCK
case GST_STATE_CHANGE_READY_TO_PAUSED:
GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
self->group_start_time = 0;
+ self->have_group_id = TRUE;
+ self->group_id = G_MAXUINT;
self->shutdown = FALSE;
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:{