g_clear_error (&error);
}
+ /* Reset expected pktseq */
+ self->next_pktseq = 0;
+
return ret;
}
GstMapInfo info;
GError *err = NULL;
gssize recv_len;
+ GstClock *clock;
+ GstClockTime base_time;
+ GstClockTime capture_time;
+ GstClockTime delay;
+ SRT_MSGCTRL mctrl;
if (g_cancellable_is_cancelled (self->cancellable)) {
ret = GST_FLOW_FLUSHING;
goto out;
}
+ /* Get clock and values */
+ clock = gst_element_get_clock (GST_ELEMENT (src));
+ base_time = gst_element_get_base_time (GST_ELEMENT (src));
+
recv_len = gst_srt_object_read (self->srtobject, info.data,
- gst_buffer_get_size (outbuf), self->cancellable, &err);
+ gst_buffer_get_size (outbuf), self->cancellable, &err, &mctrl);
+
+ /* Capture clock values ASAP */
+ capture_time = gst_clock_get_time (clock);
+#if SRT_VERSION_VALUE >= 0x10402
+ /* Use SRT clock value if available (SRT > 1.4.2) */
+ delay = (srt_time_now () - mctrl.srctime) * GST_USECOND;
+#else
+ /* Else use the unix epoch monotonic clock */
+ delay = (g_get_real_time () - mctrl.srctime) * GST_USECOND;
+#endif
+ gst_object_unref (clock);
gst_buffer_unmap (outbuf, &info);
+ GST_LOG_OBJECT (src,
+ "recv_len:%" G_GSIZE_FORMAT " pktseq:%d msgno:%d srctime:%"
+ G_GUINT64_FORMAT, recv_len, mctrl.pktseq, mctrl.msgno, mctrl.srctime);
+
if (g_cancellable_is_cancelled (self->cancellable)) {
ret = GST_FLOW_FLUSHING;
goto out;
goto out;
}
+ /* Detect discontinuities */
+ if (mctrl.pktseq != self->next_pktseq) {
+ GST_WARNING_OBJECT (src, "discont detected %d (expected: %d)",
+ mctrl.pktseq, self->next_pktseq);
+ GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
+ }
+ /* pktseq is a 31bit field */
+ self->next_pktseq = (mctrl.pktseq + 1) % G_MAXINT32;
+
+ /* Subtract the base_time (since the pipeline started) ... */
+ if (capture_time > base_time)
+ capture_time -= base_time;
+ else
+ capture_time = 0;
+ /* And adjust by the delay */
+ if (capture_time > delay)
+ capture_time -= delay;
+ else
+ capture_time = 0;
+ GST_BUFFER_TIMESTAMP (outbuf) = capture_time;
+
+ GST_DEBUG_OBJECT (src, "delay:%" GST_TIME_FORMAT, GST_TIME_ARGS (delay));
+
gst_buffer_resize (outbuf, 0, recv_len);
GST_LOG_OBJECT (src,
gst_base_src_set_format (GST_BASE_SRC (self), GST_FORMAT_TIME);
gst_base_src_set_live (GST_BASE_SRC (self), TRUE);
- gst_base_src_set_do_timestamp (GST_BASE_SRC (self), TRUE);
+ /* We do the timing ourselves */
+ gst_base_src_set_do_timestamp (GST_BASE_SRC (self), FALSE);
gst_srt_object_set_uri (self->srtobject, GST_SRT_DEFAULT_URI, NULL);
}
}
+static gboolean
+gst_srt_src_query (GstBaseSrc * basesrc, GstQuery * query)
+{
+ GstSRTSrc *self = GST_SRT_SRC (basesrc);
+
+ if (GST_QUERY_TYPE (query) == GST_QUERY_LATENCY) {
+ gint latency;
+ if (!gst_structure_get_int (self->srtobject->parameters, "latency",
+ &latency))
+ latency = GST_SRT_DEFAULT_LATENCY;
+ gst_query_set_latency (query, TRUE, latency * GST_MSECOND,
+ latency * GST_MSECOND);
+ return TRUE;
+ } else {
+ return GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query);
+ }
+}
+
static void
gst_srt_src_class_init (GstSRTSrcClass * klass)
{
gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_srt_src_stop);
gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_srt_src_unlock);
gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_srt_src_unlock_stop);
+ gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_srt_src_query);
gstpushsrc_class->fill = GST_DEBUG_FUNCPTR (gst_srt_src_fill);
}