GSource *source;
guint id;
GstRTSPThread *thread;
+ GList *pending_pipeline_elements;
gboolean time_provider;
GstNetTimeProvider *nettime;
GList *payloads; /* protected by lock */
GstClockTime rtx_time; /* protected by lock */
+ gboolean do_retransmission; /* protected by lock */
guint latency; /* protected by lock */
GstClock *clock; /* protected by lock */
GstRTSPPublishClockMode publish_clock_mode;
#define DEFAULT_TRANSPORT_MODE GST_RTSP_TRANSPORT_MODE_PLAY
#define DEFAULT_STOP_ON_DISCONNECT TRUE
+#define DEFAULT_DO_RETRANSMISSION FALSE
+
/* define to dump received RTCP packets */
#undef DUMP_STATS
static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
+static gboolean check_complete (GstRTSPMedia * media);
+
#define C_ENUM(v) ((gint) v)
GType
priv->transport_mode = DEFAULT_TRANSPORT_MODE;
priv->stop_on_disconnect = DEFAULT_STOP_ON_DISCONNECT;
priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
+ priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
}
static void
g_ptr_array_unref (priv->streams);
g_list_free_full (priv->dynamic, gst_object_unref);
+ g_list_free_full (priv->pending_pipeline_elements, gst_object_unref);
if (priv->pipeline)
gst_object_unref (priv->pipeline);
typedef struct
{
gint64 position;
+ gboolean complete_streams_only;
gboolean ret;
} DoQueryPositionData;
{
gint64 tmp;
+ if (!gst_rtsp_stream_is_sender (stream))
+ return;
+
+ if (data->complete_streams_only && !gst_rtsp_stream_is_complete (stream)) {
+ GST_DEBUG_OBJECT (stream, "stream not complete, do not query position");
+ return;
+ }
+
if (gst_rtsp_stream_query_position (stream, &tmp)) {
data->position = MIN (data->position, tmp);
data->ret = TRUE;
data.position = G_MAXINT64;
data.ret = FALSE;
+ /* if the media is complete, i.e. one or more streams have been configured
+ * with sinks, then we want to query the position on those streams only.
+ * a query on an incmplete stream may return a position that originates from
+ * an earlier preroll */
+ if (check_complete (media))
+ data.complete_streams_only = TRUE;
+ else
+ data.complete_streams_only = FALSE;
+
g_ptr_array_foreach (priv->streams, (GFunc) do_query_position, &data);
if (!data.ret)
return rtpbin;
}
+static gboolean
+is_receive_only (GstRTSPMedia * media)
+{
+ GstRTSPMediaPrivate *priv = media->priv;
+ gboolean recive_only = TRUE;
+ guint i;
+
+ for (i = 0; i < priv->streams->len; i++) {
+ GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
+ if (gst_rtsp_stream_is_sender (stream) ||
+ !gst_rtsp_stream_is_receiver (stream)) {
+ recive_only = FALSE;
+ break;
+ }
+ }
+
+ return recive_only;
+}
+
/* must be called with state lock */
static void
check_seekable (GstRTSPMedia * media)
GstRTSPMediaPrivate *priv = media->priv;
/* Update the seekable state of the pipeline in case it changed */
- if ((priv->transport_mode & GST_RTSP_TRANSPORT_MODE_RECORD)) {
- /* TODO: Seeking for RECORD? */
+ if (is_receive_only (media)) {
+ /* TODO: Seeking for "receive-only"? */
priv->seekable = -1;
} else {
guint i, n = priv->streams->len;
GstRTSPMediaPrivate *priv;
GstElement *old;
GstNetTimeProvider *nettime;
+ GList *l;
g_return_if_fail (GST_IS_RTSP_MEDIA (media));
g_return_if_fail (GST_IS_PIPELINE (pipeline));
gst_object_unref (nettime);
gst_bin_add (GST_BIN_CAST (pipeline), priv->element);
+
+ for (l = priv->pending_pipeline_elements; l; l = l->next) {
+ gst_bin_add (GST_BIN_CAST (pipeline), l->data);
+ }
+ g_list_free (priv->pending_pipeline_elements);
+ priv->pending_pipeline_elements = NULL;
}
/**
gst_rtsp_stream_set_retransmission_time (stream, time);
}
-
- if (priv->rtpbin)
- g_object_set (priv->rtpbin, "do-retransmission", time > 0, NULL);
g_mutex_unlock (&priv->lock);
}
}
/**
+ * gst_rtsp_media_set_do_retransmission:
+ *
+ * Set whether retransmission requests will be sent
+ *
+ * Since: 1.16
+ */
+void
+gst_rtsp_media_set_do_retransmission (GstRTSPMedia * media,
+ gboolean do_retransmission)
+{
+ GstRTSPMediaPrivate *priv;
+
+ g_return_if_fail (GST_IS_RTSP_MEDIA (media));
+
+ priv = media->priv;
+
+ g_mutex_lock (&priv->lock);
+ priv->do_retransmission = do_retransmission;
+
+ if (priv->rtpbin)
+ g_object_set (priv->rtpbin, "do-retransmission", do_retransmission, NULL);
+ g_mutex_unlock (&priv->lock);
+}
+
+/**
+ * gst_rtsp_media_get_do_retransmission:
+ *
+ * Returns: Whether retransmission requests will be sent
+ *
+ * Since: 1.16
+ */
+gboolean
+gst_rtsp_media_get_do_retransmission (GstRTSPMedia * media)
+{
+ GstRTSPMediaPrivate *priv;
+ gboolean res;
+
+ g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
+
+ priv = media->priv;
+
+ g_mutex_lock (&priv->lock);
+ res = priv->do_retransmission;
+ g_mutex_unlock (&priv->lock);
+
+ return res;
+}
+
+/**
* gst_rtsp_media_set_latency:
* @media: a #GstRTSPMedia
* @latency: latency in milliseconds
gst_rtsp_media_set_latency (GstRTSPMedia * media, guint latency)
{
GstRTSPMediaPrivate *priv;
+ guint i;
g_return_if_fail (GST_IS_RTSP_MEDIA (media));
g_mutex_lock (&priv->lock);
priv->latency = latency;
- if (priv->rtpbin)
+ if (priv->rtpbin) {
g_object_set (priv->rtpbin, "latency", latency, NULL);
+
+ for (i = 0; i < media->priv->streams->len; i++) {
+ GObject *storage = NULL;
+
+ g_signal_emit_by_name (G_OBJECT (media->priv->rtpbin), "get-storage",
+ i, &storage);
+ if (storage)
+ g_object_set (storage, "size-time", (media->priv->latency + 50) * GST_MSECOND, NULL);
+ }
+ }
+
g_mutex_unlock (&priv->lock);
}
}
}
+typedef struct
+{
+ GstElement *appsink, *appsrc;
+ GstRTSPStream *stream;
+} AppSinkSrcData;
+
+static GstFlowReturn
+appsink_new_sample (GstAppSink * appsink, gpointer user_data)
+{
+ AppSinkSrcData *data = user_data;
+ GstSample *sample;
+ GstFlowReturn ret;
+
+ sample = gst_app_sink_pull_sample (appsink);
+ if (!sample)
+ return GST_FLOW_FLUSHING;
+
+
+ ret = gst_app_src_push_sample (GST_APP_SRC (data->appsrc), sample);
+ gst_sample_unref (sample);
+ return ret;
+}
+
+static GstAppSinkCallbacks appsink_callbacks = {
+ NULL,
+ NULL,
+ appsink_new_sample,
+};
+
+static GstPadProbeReturn
+appsink_pad_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+ AppSinkSrcData *data = user_data;
+
+ if (GST_IS_EVENT (info->data)
+ && GST_EVENT_TYPE (info->data) == GST_EVENT_LATENCY) {
+ GstClockTime min, max;
+
+ if (gst_base_sink_query_latency (GST_BASE_SINK (data->appsink), NULL, NULL,
+ &min, &max)) {
+ g_object_set (data->appsrc, "min-latency", min, "max-latency", max, NULL);
+ GST_DEBUG ("setting latency to min %" GST_TIME_FORMAT " max %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
+ }
+ } else if (GST_IS_QUERY (info->data)) {
+ GstPad *srcpad = gst_element_get_static_pad (data->appsrc, "src");
+ if (gst_pad_peer_query (srcpad, GST_QUERY_CAST (info->data))) {
+ gst_object_unref (srcpad);
+ return GST_PAD_PROBE_HANDLED;
+ }
+ gst_object_unref (srcpad);
+ }
+
+ return GST_PAD_PROBE_OK;
+}
+
+static GstPadProbeReturn
+appsrc_pad_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+ AppSinkSrcData *data = user_data;
+
+ if (GST_IS_QUERY (info->data)) {
+ GstPad *sinkpad = gst_element_get_static_pad (data->appsink, "sink");
+ if (gst_pad_peer_query (sinkpad, GST_QUERY_CAST (info->data))) {
+ gst_object_unref (sinkpad);
+ return GST_PAD_PROBE_HANDLED;
+ }
+ gst_object_unref (sinkpad);
+ }
+
+ return GST_PAD_PROBE_OK;
+}
+
/**
* gst_rtsp_media_create_stream:
* @media: a #GstRTSPMedia
{
GstRTSPMediaPrivate *priv;
GstRTSPStream *stream;
- GstPad *ghostpad;
+ GstPad *streampad;
gchar *name;
gint idx;
+ AppSinkSrcData *data = NULL;
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
else
name = g_strdup_printf ("sink_%u", idx);
- ghostpad = gst_ghost_pad_new (name, pad);
- gst_pad_set_active (ghostpad, TRUE);
- gst_element_add_pad (priv->element, ghostpad);
+ if ((GST_PAD_IS_SRC (pad) && priv->element->numsinkpads > 0) ||
+ (GST_PAD_IS_SINK (pad) && priv->element->numsrcpads > 0)) {
+ GstElement *appsink, *appsrc;
+ GstPad *sinkpad, *srcpad;
+
+ appsink = gst_element_factory_make ("appsink", NULL);
+ appsrc = gst_element_factory_make ("appsrc", NULL);
+
+ if (GST_PAD_IS_SINK (pad)) {
+ srcpad = gst_element_get_static_pad (appsrc, "src");
+
+ gst_bin_add (GST_BIN (priv->element), appsrc);
+
+ gst_pad_link (srcpad, pad);
+ gst_object_unref (srcpad);
+
+ streampad = gst_element_get_static_pad (appsink, "sink");
+
+ priv->pending_pipeline_elements =
+ g_list_prepend (priv->pending_pipeline_elements, appsink);
+ } else {
+ sinkpad = gst_element_get_static_pad (appsink, "sink");
+
+ gst_pad_link (pad, sinkpad);
+ gst_object_unref (sinkpad);
+
+ streampad = gst_element_get_static_pad (appsrc, "src");
+
+ priv->pending_pipeline_elements =
+ g_list_prepend (priv->pending_pipeline_elements, appsrc);
+ }
+
+ g_object_set (appsrc, "block", TRUE, "format", GST_FORMAT_TIME, "is-live",
+ TRUE, NULL);
+ g_object_set (appsink, "sync", FALSE, "async", FALSE, NULL);
+
+ data = g_new0 (AppSinkSrcData, 1);
+ data->appsink = appsink;
+ data->appsrc = appsrc;
+
+ sinkpad = gst_element_get_static_pad (appsink, "sink");
+ gst_pad_add_probe (sinkpad,
+ GST_PAD_PROBE_TYPE_EVENT_UPSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
+ appsink_pad_probe, data, NULL);
+ gst_object_unref (sinkpad);
+
+ srcpad = gst_element_get_static_pad (appsrc, "src");
+ gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
+ appsrc_pad_probe, data, NULL);
+ gst_object_unref (srcpad);
+
+ gst_app_sink_set_callbacks (GST_APP_SINK (appsink), &appsink_callbacks,
+ data, NULL);
+ g_object_set_data_full (G_OBJECT (streampad), "media-appsink-appsrc", data,
+ g_free);
+ } else {
+ streampad = gst_ghost_pad_new (name, pad);
+ gst_pad_set_active (streampad, TRUE);
+ gst_element_add_pad (priv->element, streampad);
+ }
g_free (name);
- stream = gst_rtsp_stream_new (idx, payloader, ghostpad);
+ stream = gst_rtsp_stream_new (idx, payloader, streampad);
+ if (data)
+ data->stream = stream;
if (priv->pool)
gst_rtsp_stream_set_address_pool (stream, priv->pool);
gst_rtsp_stream_set_multicast_iface (stream, priv->multicast_iface);
{
GstRTSPMediaPrivate *priv;
GstPad *srcpad;
+ AppSinkSrcData *data;
priv = media->priv;
g_mutex_lock (&priv->lock);
/* remove the ghostpad */
srcpad = gst_rtsp_stream_get_srcpad (stream);
- gst_element_remove_pad (priv->element, srcpad);
+ data = g_object_get_data (G_OBJECT (srcpad), "media-appsink-appsrc");
+ if (data) {
+ if (GST_OBJECT_PARENT (data->appsrc) == GST_OBJECT_CAST (priv->pipeline))
+ gst_bin_remove (GST_BIN_CAST (priv->pipeline), data->appsrc);
+ else if (GST_OBJECT_PARENT (data->appsrc) ==
+ GST_OBJECT_CAST (priv->element))
+ gst_bin_remove (GST_BIN_CAST (priv->element), data->appsrc);
+ if (GST_OBJECT_PARENT (data->appsink) == GST_OBJECT_CAST (priv->pipeline))
+ gst_bin_remove (GST_BIN_CAST (priv->pipeline), data->appsink);
+ else if (GST_OBJECT_PARENT (data->appsink) ==
+ GST_OBJECT_CAST (priv->element))
+ gst_bin_remove (GST_BIN_CAST (priv->element), data->appsink);
+ } else {
+ gst_element_remove_pad (priv->element, srcpad);
+ }
gst_object_unref (srcpad);
/* now remove the stream */
g_object_ref (stream);
GST_DEBUG ("%p: went from %s to %s (pending %s)", media,
gst_element_state_get_name (old), gst_element_state_get_name (new),
gst_element_state_get_name (pending));
- if ((priv->transport_mode & GST_RTSP_TRANSPORT_MODE_RECORD)
- && old == GST_STATE_READY && new == GST_STATE_PAUSED) {
+ if (priv->no_more_pads_pending == 0 && is_receive_only (media) &&
+ old == GST_STATE_READY && new == GST_STATE_PAUSED) {
GST_INFO ("%p: went to PAUSED, prepared now", media);
collect_media_stats (media);
GstRTSPMediaPrivate *priv = media->priv;
GstRTSPStream *stream = NULL;
guint i;
+ GstElement *res = NULL;
g_mutex_lock (&priv->lock);
for (i = 0; i < priv->streams->len; i++) {
if (sessid == gst_rtsp_stream_get_index (stream))
break;
+
+ stream = NULL;
}
g_mutex_unlock (&priv->lock);
- return gst_rtsp_stream_request_aux_sender (stream, sessid);
+ if (stream)
+ res = gst_rtsp_stream_request_aux_sender (stream, sessid);
+
+ return res;
+}
+
+static GstElement *
+request_aux_receiver (GstElement * rtpbin, guint sessid, GstRTSPMedia * media)
+{
+ GstRTSPMediaPrivate *priv = media->priv;
+ GstRTSPStream *stream = NULL;
+ guint i;
+ GstElement *res = NULL;
+
+ g_mutex_lock (&priv->lock);
+ for (i = 0; i < priv->streams->len; i++) {
+ stream = g_ptr_array_index (priv->streams, i);
+
+ if (sessid == gst_rtsp_stream_get_index (stream))
+ break;
+
+ stream = NULL;
+ }
+ g_mutex_unlock (&priv->lock);
+
+ if (stream)
+ res = gst_rtsp_stream_request_aux_receiver (stream, sessid);
+
+ return res;
+}
+
+static GstElement *
+request_fec_decoder (GstElement * rtpbin, guint sessid, GstRTSPMedia * media)
+{
+ GstRTSPMediaPrivate *priv = media->priv;
+ GstRTSPStream *stream = NULL;
+ guint i;
+ GstElement *res = NULL;
+
+ g_mutex_lock (&priv->lock);
+ for (i = 0; i < priv->streams->len; i++) {
+ stream = g_ptr_array_index (priv->streams, i);
+
+ if (sessid == gst_rtsp_stream_get_index (stream))
+ break;
+
+ stream = NULL;
+ }
+ g_mutex_unlock (&priv->lock);
+
+ if (stream) {
+ res = gst_rtsp_stream_request_ulpfec_decoder (stream, rtpbin, sessid);
+ }
+
+ return res;
+}
+
+static void
+new_storage_cb (GstElement * rtpbin, GObject * storage, guint sessid, GstRTSPMedia * media)
+{
+ g_object_set (storage, "size-time", (media->priv->latency + 50) * GST_MSECOND, NULL);
}
static gboolean
if (priv->status != GST_RTSP_MEDIA_STATUS_PREPARING)
goto no_longer_preparing;
+ g_signal_connect (priv->rtpbin, "new-storage", G_CALLBACK (new_storage_cb), media);
+ g_signal_connect (priv->rtpbin, "request-fec-decoder", G_CALLBACK (request_fec_decoder), media);
+
/* link streams we already have, other streams might appear when we have
* dynamic elements */
for (i = 0; i < priv->streams->len; i++) {
(GCallback) request_aux_sender, media);
}
+ if (priv->do_retransmission) {
+ g_signal_connect (priv->rtpbin, "request-aux-receiver",
+ (GCallback) request_aux_receiver, media);
+ }
+
if (!gst_rtsp_stream_join_bin (stream, GST_BIN (priv->pipeline),
priv->rtpbin, GST_STATE_NULL)) {
goto join_bin_failed;
}
if (priv->rtpbin)
- g_object_set (priv->rtpbin, "do-retransmission", priv->rtx_time > 0, NULL);
+ g_object_set (priv->rtpbin, "do-retransmission", priv->do_retransmission,
+ "do-lost", TRUE, NULL);
for (walk = priv->dynamic; walk; walk = g_list_next (walk)) {
GstElement *elem = walk->data;
g_object_set_data (G_OBJECT (elem), "gst-rtsp-dynpay-handlers", handlers);
}
- if (!start_preroll (media))
+ if (priv->nb_dynamic_elements == 0 && is_receive_only (media)) {
+ /* If we are receive_only (RECORD), do not try to preroll, to avoid
+ * a second ASYNC state change failing */
+ priv->is_live = TRUE;
+ gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
+ } else if (!start_preroll (media)) {
goto preroll_failed;
+ }
g_rec_mutex_unlock (&priv->state_lock);
s = gst_caps_get_structure (caps, 0);
gst_structure_set_name (s, "application/x-rtp");
+ if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "ULPFEC"))
+ gst_structure_set (s, "is-fec", G_TYPE_BOOLEAN, TRUE, NULL);
+
gst_rtsp_stream_set_pt_map (stream, pt, caps);
gst_caps_unref (caps);
}
do_set_seqnum (GstRTSPStream * stream)
{
guint16 seq_num;
- seq_num = gst_rtsp_stream_get_current_seqnum (stream);
- gst_rtsp_stream_set_seqnum_offset (stream, seq_num + 1);
+
+ if (gst_rtsp_stream_is_sender (stream)) {
+ seq_num = gst_rtsp_stream_get_current_seqnum (stream);
+ gst_rtsp_stream_set_seqnum_offset (stream, seq_num + 1);
+ }
}
/* call with state_lock */
switch (priv->suspend_mode) {
case GST_RTSP_SUSPEND_MODE_NONE:
- if ((priv->transport_mode & GST_RTSP_TRANSPORT_MODE_RECORD))
+ if (is_receive_only (media))
break;
if (media_streams_blocking (media)) {
gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING);