#endif
#include "gststreamsynchronizer.h"
-#include "gst/glib-compat-private.h"
GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
#define GST_CAT_DEFAULT stream_synchronizer_debug
GST_LOG_OBJECT (obj, \
"locking from thread %p", \
g_thread_self ()); \
- g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
+ g_mutex_lock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
GST_LOG_OBJECT (obj, \
"locked from thread %p", \
g_thread_self ()); \
GST_LOG_OBJECT (obj, \
"unlocking from thread %p", \
g_thread_self ()); \
- g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
+ g_mutex_unlock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
} G_STMT_END
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
-static const gboolean passthrough = FALSE;
-
#define gst_stream_synchronizer_parent_class parent_class
G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
GST_TYPE_ELEMENT);
GstPad *sinkpad;
GstSegment segment;
- gboolean wait;
+ gboolean wait; /* TRUE if waiting/blocking */
gboolean new_stream;
gboolean drop_discont;
- gboolean is_eos;
+ gboolean is_eos; /* TRUE if EOS was received */
gboolean seen_data;
- gint64 running_time_diff;
+ GCond stream_finish_cond;
- GCond *stream_finish_cond;
+ /* seqnum of the previously received STREAM_START
+ * default: G_MAXUINT32 */
+ guint32 stream_start_seqnum;
+ guint32 segment_seqnum;
} GstStream;
/* Must be called with lock! */
-static GstPad *
+static inline GstPad *
gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
{
if (stream->sinkpad == pad)
return gst_object_ref (stream->srcpad);
- else if (stream->srcpad == pad)
+ if (stream->srcpad == pad)
return gst_object_ref (stream->sinkpad);
return NULL;
}
static GstPad *
-gst_stream_get_other_pad_from_pad (GstPad * pad)
+gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad)
{
- GstObject *parent = gst_pad_get_parent (pad);
- GstStreamSynchronizer *self;
GstStream *stream;
GstPad *opad = NULL;
- /* released pad does not have parent anymore */
- if (!G_LIKELY (parent))
- goto exit;
-
- self = GST_STREAM_SYNCHRONIZER (parent);
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
if (!stream)
out:
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
- gst_object_unref (self);
-exit:
if (!opad)
GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
GstIterator *it = NULL;
GstPad *opad;
- opad = gst_stream_get_other_pad_from_pad (pad);
+ opad =
+ gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
if (opad) {
GValue value = { 0, };
GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
- opad = gst_stream_get_other_pad_from_pad (pad);
+ opad =
+ gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
if (opad) {
ret = gst_pad_peer_query (opad, query);
gst_object_unref (opad);
GstPad *opad;
gboolean ret = FALSE;
- if (passthrough)
- goto skip_adjustments;
-
GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
GST_EVENT_TYPE_NAME (event), event);
gdouble proportion;
GstClockTimeDiff diff;
GstClockTime timestamp;
- gint64 running_time_diff;
+ gint64 running_time_diff = -1;
GstStream *stream;
gst_event_parse_qos (event, NULL, &proportion, &diff, ×tamp);
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
if (stream)
- running_time_diff = stream->running_time_diff;
- else
- running_time_diff = -1;
+ running_time_diff = stream->segment.base;
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
if (running_time_diff == -1) {
GST_WARNING_OBJECT (pad, "QOS event before group start");
goto out;
- } else if (timestamp < running_time_diff) {
+ }
+ if (timestamp < running_time_diff) {
GST_DEBUG_OBJECT (pad, "QOS event from previous group");
goto out;
}
break;
}
-skip_adjustments:
-
- opad = gst_stream_get_other_pad_from_pad (pad);
+ opad = gst_stream_get_other_pad_from_pad (self, pad);
if (opad) {
ret = gst_pad_push_event (opad, event);
gst_object_unref (opad);
GstPad *opad;
gboolean ret = FALSE;
- if (passthrough)
- goto skip_adjustments;
-
GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
GST_EVENT_TYPE_NAME (event), event);
case GST_EVENT_STREAM_START:
{
GstStream *stream;
+ guint32 seqnum = gst_event_get_seqnum (event);
+ GList *l;
+ gboolean all_wait = TRUE;
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
- if (stream) {
- GList *l;
- gboolean all_wait = TRUE;
+ if (stream && stream->stream_start_seqnum != seqnum) {
GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
for (l = self->streams; l; l = l->next) {
GstStream *ostream = l->data;
- g_cond_broadcast (ostream->stream_finish_cond);
+ g_cond_broadcast (&ostream->stream_finish_cond);
}
}
- }
+ } else
+ GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
}
break;
if (stream) {
if (stream->wait) {
GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
- g_cond_wait (stream->stream_finish_cond, self->lock);
+ g_cond_wait (&stream->stream_finish_cond, &self->lock);
stream = gst_pad_get_element_private (pad);
if (stream)
stream->wait = FALSE;
if (stream && segment.format == GST_FORMAT_TIME) {
if (stream->new_stream) {
- gint64 position_running_time = 0;
- gint64 stop_running_time = 0;
-
- if (stream->segment.format == GST_FORMAT_TIME) {
- position_running_time =
- gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
- stream->segment.position);
- position_running_time = MAX (position_running_time, 0);
- stop_running_time =
- gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
- stream->segment.stop);
- stop_running_time = MAX (position_running_time, 0);
-
- if (stop_running_time != position_running_time) {
- GST_WARNING_OBJECT (pad,
- "Gap between position and segment stop: %" GST_TIME_FORMAT
- " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
- GST_TIME_ARGS (position_running_time));
- }
-
- if (stop_running_time < position_running_time) {
- GST_DEBUG_OBJECT (pad, "Updating stop position");
- stream->segment.stop = stream->segment.position;
- gst_pad_push_event (stream->srcpad,
- gst_event_new_segment (&stream->segment));
- }
- stop_running_time = MAX (stop_running_time, position_running_time);
- GST_DEBUG_OBJECT (pad,
- "Stop running time of last group: %" GST_TIME_FORMAT,
- GST_TIME_ARGS (stop_running_time));
- }
stream->new_stream = FALSE;
stream->drop_discont = TRUE;
-
-
- segment.base = stop_running_time;
+ segment.base = self->group_start_time;
}
GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
gst_segment_copy_into (&segment, &stream->segment);
GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
&stream->segment);
+ stream->segment_seqnum = gst_event_get_seqnum (event);
GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->segment.base));
- stream->running_time_diff = stream->segment.base;
{
GstEvent *tmpev;
tmpev = gst_event_new_segment (&stream->segment);
+ gst_event_set_seqnum (tmpev, stream->segment_seqnum);
gst_event_unref (event);
event = tmpev;
}
stream = gst_pad_get_element_private (pad);
if (stream) {
GST_DEBUG_OBJECT (pad, "Flushing streams");
- g_cond_broadcast (stream->stream_finish_cond);
+ g_cond_broadcast (&stream->stream_finish_cond);
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
break;
}
-skip_adjustments:
-
- opad = gst_stream_get_other_pad_from_pad (pad);
+ opad = gst_stream_get_other_pad_from_pad (self, pad);
if (opad) {
ret = gst_pad_push_event (opad, event);
gst_object_unref (opad);
GstClockTime timestamp = GST_CLOCK_TIME_NONE;
GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
- if (passthrough) {
- opad = gst_stream_get_other_pad_from_pad (pad);
- if (opad) {
- ret = gst_pad_push (opad, buffer);
- gst_object_unref (opad);
- }
- goto done;
- }
-
GST_LOG_OBJECT (pad, "Handling buffer %p: size=%" G_GSIZE_FORMAT
", timestamp=%" GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
" offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
- if (stream)
+ if (stream) {
stream->seen_data = TRUE;
- if (stream && stream->drop_discont) {
- buffer = gst_buffer_make_writable (buffer);
- GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
- stream->drop_discont = FALSE;
- }
+ if (stream->drop_discont) {
+ buffer = gst_buffer_make_writable (buffer);
+ GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
+ stream->drop_discont = FALSE;
+ }
- if (stream && stream->segment.format == GST_FORMAT_TIME
- && GST_CLOCK_TIME_IS_VALID (timestamp)) {
- GST_LOG_OBJECT (pad,
- "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
- GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
- stream->segment.position = timestamp;
+ if (stream->segment.format == GST_FORMAT_TIME
+ && GST_CLOCK_TIME_IS_VALID (timestamp)) {
+ GST_LOG_OBJECT (pad,
+ "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
+ stream->segment.position = timestamp;
+ }
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
- opad = gst_stream_get_other_pad_from_pad (pad);
+ opad = gst_stream_get_other_pad_from_pad (self, pad);
if (opad) {
ret = gst_pad_push (opad, buffer);
gst_object_unref (opad);
if (stream && stream->segment.format == GST_FORMAT_TIME
&& GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
GST_LOG_OBJECT (pad,
- "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
+ "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->segment.position),
GST_TIME_ARGS (timestamp_end));
stream->segment.position = timestamp_end;
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
}
-done:
return ret;
}
stream = g_slice_new0 (GstStream);
stream->transform = self;
stream->stream_number = self->current_stream_number;
- stream->stream_finish_cond = g_cond_new ();
+ g_cond_init (&stream->stream_finish_cond);
+ stream->stream_start_seqnum = G_MAXUINT32;
+ stream->segment_seqnum = G_MAXUINT32;
tmp = g_strdup_printf ("sink_%u", self->current_stream_number);
stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
stream->segment.position);
stop_running_time = MAX (stop_running_time, position_running_time);
- GST_DEBUG_OBJECT (stream->sinkpad,
- "Stop running time was: %" GST_TIME_FORMAT,
- GST_TIME_ARGS (stop_running_time));
+ if (stop_running_time > self->group_start_time) {
+ GST_DEBUG_OBJECT (stream->sinkpad,
+ "Updating global start running time from %" GST_TIME_FORMAT " to %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time),
+ GST_TIME_ARGS (stop_running_time));
- self->group_start_time = MAX (self->group_start_time, stop_running_time);
+ self->group_start_time = stop_running_time;
+ }
}
- g_cond_free (stream->stream_finish_cond);
+ g_cond_clear (&stream->stream_finish_cond);
g_slice_free (GstStream, stream);
/* NOTE: In theory we have to check here if all streams
GstStateChange transition)
{
GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
- GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
+ GstStateChangeReturn ret;
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
self->group_start_time = 0;
self->shutdown = FALSE;
break;
- case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
- GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
- break;
case GST_STATE_CHANGE_PAUSED_TO_READY:{
GList *l;
GST_STREAM_SYNCHRONIZER_LOCK (self);
for (l = self->streams; l; l = l->next) {
GstStream *ostream = l->data;
- g_cond_broadcast (ostream->stream_finish_cond);
+ g_cond_broadcast (&ostream->stream_finish_cond);
}
self->shutdown = TRUE;
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
}
- {
- GstStateChangeReturn bret;
-
- bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
- GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
- if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
- return ret;
- }
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+ GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", ret);
+ if (G_UNLIKELY (ret != GST_STATE_CHANGE_SUCCESS))
+ return ret;
switch (transition) {
- case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
- GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
- break;
case GST_STATE_CHANGE_PAUSED_TO_READY:{
GList *l;
GST_DEBUG_OBJECT (self, "State change READY->NULL");
GST_STREAM_SYNCHRONIZER_LOCK (self);
- while (self->streams)
- gst_stream_synchronizer_release_stream (self, self->streams->data);
self->current_stream_number = 0;
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
{
GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
- if (self->lock) {
- g_mutex_free (self->lock);
- self->lock = NULL;
- }
+ g_mutex_clear (&self->lock);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
gst_stream_synchronizer_init (GstStreamSynchronizer * self)
{
- self->lock = g_mutex_new ();
+ g_mutex_init (&self->lock);
}
static void