* #GstRTSPSession and #GstRTSPSessionMedia.
*
* The state of the media can be controlled with gst_rtsp_media_set_state ().
- * Seeking can be done with gst_rtsp_media_seek().
+ * Seeking can be done with gst_rtsp_media_seek(), or gst_rtsp_media_seek_full()
+ * or gst_rtsp_media_seek_trickmode() for finer control of the seek.
*
* With gst_rtsp_media_unprepare() the pipeline is stopped and shut down. When
* gst_rtsp_media_set_eos_shutdown() an EOS will be sent to the pipeline to
*
* Last reviewed on 2013-07-11 (1.0.0)
*/
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
#include <stdio.h>
#include <string.h>
#include "rtsp-media.h"
-#define GST_RTSP_MEDIA_GET_PRIVATE(obj) \
- (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_MEDIA, GstRTSPMediaPrivate))
-
struct _GstRTSPMediaPrivate
{
GMutex lock;
guint buffer_size;
GstRTSPAddressPool *pool;
gchar *multicast_iface;
+ guint max_mcast_ttl;
+ gboolean bind_mcast_address;
gboolean blocked;
GstRTSPTransportMode transport_mode;
gboolean stop_on_disconnect;
GstRTSPMediaStatus status; /* protected by lock */
gint prepare_count;
gint n_active;
- gboolean adding;
+ gboolean complete;
+ gboolean finishing_unprepare;
/* the pipeline for the media */
GstElement *pipeline;
- GstElement *fakesink; /* protected by lock */
GSource *source;
guint id;
GstRTSPThread *thread;
+ GList *pending_pipeline_elements;
gboolean time_provider;
GstNetTimeProvider *nettime;
GList *payloads; /* protected by lock */
GstClockTime rtx_time; /* protected by lock */
+ gboolean do_retransmission; /* protected by lock */
guint latency; /* protected by lock */
GstClock *clock; /* protected by lock */
+ gboolean do_rate_control; /* protected by lock */
GstRTSPPublishClockMode publish_clock_mode;
+
+ /* Dynamic element handling */
+ guint nb_dynamic_elements;
+ guint no_more_pads_pending;
+ gboolean expected_async_done;
};
#define DEFAULT_SHARED FALSE
#define DEFAULT_LATENCY 200
#define DEFAULT_TRANSPORT_MODE GST_RTSP_TRANSPORT_MODE_PLAY
#define DEFAULT_STOP_ON_DISCONNECT TRUE
+#define DEFAULT_MAX_MCAST_TTL 255
+#define DEFAULT_BIND_MCAST_ADDRESS FALSE
+#define DEFAULT_DO_RATE_CONTROL TRUE
+
+#define DEFAULT_DO_RETRANSMISSION FALSE
/* define to dump received RTCP packets */
#undef DUMP_STATS
PROP_TRANSPORT_MODE,
PROP_STOP_ON_DISCONNECT,
PROP_CLOCK,
+ PROP_MAX_MCAST_TTL,
+ PROP_BIND_MCAST_ADDRESS,
PROP_LAST
};
static gboolean wait_preroll (GstRTSPMedia * media);
-static GstElement *find_payload_element (GstElement * payloader);
+static GstElement *find_payload_element (GstElement * payloader, GstPad * pad);
static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
+static gboolean check_complete (GstRTSPMedia * media);
+
#define C_ENUM(v) ((gint) v)
+#define TRICKMODE_FLAGS (GST_SEEK_FLAG_TRICKMODE | GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | GST_SEEK_FLAG_TRICKMODE_FORWARD_PREDICTED)
+
GType
gst_rtsp_suspend_mode_get_type (void)
{
return (GType) id;
}
-G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
+G_DEFINE_TYPE_WITH_PRIVATE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
static void
gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
{
GObjectClass *gobject_class;
- g_type_class_add_private (klass, sizeof (GstRTSPMediaPrivate));
-
gobject_class = G_OBJECT_CLASS (klass);
gobject_class->get_property = gst_rtsp_media_get_property;
"Clock to be used by the media pipeline",
GST_TYPE_CLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_MAX_MCAST_TTL,
+ g_param_spec_uint ("max-mcast-ttl", "Maximum multicast ttl",
+ "The maximum time-to-live value of outgoing multicast packets", 1,
+ 255, DEFAULT_MAX_MCAST_TTL,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_BIND_MCAST_ADDRESS,
+ g_param_spec_boolean ("bind-mcast-address", "Bind mcast address",
+ "Whether the multicast sockets should be bound to multicast addresses "
+ "or INADDR_ANY",
+ DEFAULT_BIND_MCAST_ADDRESS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
gst_rtsp_media_signals[SIGNAL_NEW_STREAM] =
g_signal_new ("new-stream", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstRTSPMediaClass, new_stream), NULL, NULL,
static void
gst_rtsp_media_init (GstRTSPMedia * media)
{
- GstRTSPMediaPrivate *priv = GST_RTSP_MEDIA_GET_PRIVATE (media);
+ GstRTSPMediaPrivate *priv = gst_rtsp_media_get_instance_private (media);
media->priv = priv;
priv->transport_mode = DEFAULT_TRANSPORT_MODE;
priv->stop_on_disconnect = DEFAULT_STOP_ON_DISCONNECT;
priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
+ priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
+ priv->max_mcast_ttl = DEFAULT_MAX_MCAST_TTL;
+ priv->bind_mcast_address = DEFAULT_BIND_MCAST_ADDRESS;
+ priv->do_rate_control = DEFAULT_DO_RATE_CONTROL;
+ priv->expected_async_done = FALSE;
}
static void
g_ptr_array_unref (priv->streams);
g_list_free_full (priv->dynamic, gst_object_unref);
+ g_list_free_full (priv->pending_pipeline_elements, gst_object_unref);
if (priv->pipeline)
gst_object_unref (priv->pipeline);
g_object_unref (priv->pool);
if (priv->payloads)
g_list_free (priv->payloads);
+ if (priv->clock)
+ gst_object_unref (priv->clock);
g_free (priv->multicast_iface);
g_mutex_clear (&priv->lock);
g_cond_clear (&priv->cond);
case PROP_CLOCK:
g_value_take_object (value, gst_rtsp_media_get_clock (media));
break;
+ case PROP_MAX_MCAST_TTL:
+ g_value_set_uint (value, gst_rtsp_media_get_max_mcast_ttl (media));
+ break;
+ case PROP_BIND_MCAST_ADDRESS:
+ g_value_set_boolean (value, gst_rtsp_media_is_bind_mcast_address (media));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
}
case PROP_CLOCK:
gst_rtsp_media_set_clock (media, g_value_get_object (value));
break;
+ case PROP_MAX_MCAST_TTL:
+ gst_rtsp_media_set_max_mcast_ttl (media, g_value_get_uint (value));
+ break;
+ case PROP_BIND_MCAST_ADDRESS:
+ gst_rtsp_media_set_bind_mcast_address (media,
+ g_value_get_boolean (value));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
}
typedef struct
{
gint64 position;
+ gboolean complete_streams_only;
gboolean ret;
} DoQueryPositionData;
{
gint64 tmp;
+ if (!gst_rtsp_stream_is_sender (stream))
+ return;
+
+ if (data->complete_streams_only && !gst_rtsp_stream_is_complete (stream)) {
+ GST_DEBUG_OBJECT (stream, "stream not complete, do not query position");
+ return;
+ }
+
if (gst_rtsp_stream_query_position (stream, &tmp)) {
- data->position = MAX (data->position, tmp);
+ data->position = MIN (data->position, tmp);
data->ret = TRUE;
}
+
+ GST_INFO_OBJECT (stream, "media position: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (data->position));
}
static gboolean
priv = media->priv;
- data.position = -1;
+ data.position = G_MAXINT64;
data.ret = FALSE;
+ /* if the media is complete, i.e. one or more streams have been configured
+ * with sinks, then we want to query the position on those streams only.
+ * a query on an incmplete stream may return a position that originates from
+ * an earlier preroll */
+ if (check_complete (media))
+ data.complete_streams_only = TRUE;
+ else
+ data.complete_streams_only = FALSE;
+
g_ptr_array_foreach (priv->streams, (GFunc) do_query_position, &data);
- *position = data.position;
+ if (!data.ret)
+ *position = GST_CLOCK_TIME_NONE;
+ else
+ *position = data.position;
return data.ret;
}
static void
do_query_stop (GstRTSPStream * stream, DoQueryStopData * data)
{
- gint64 tmp;
+ gint64 tmp = 0;
if (gst_rtsp_stream_query_stop (stream, &tmp)) {
data->stop = MAX (data->stop, tmp);
GstRTSPMediaPrivate *priv = media->priv;
/* Update the seekable state of the pipeline in case it changed */
- if ((priv->transport_mode & GST_RTSP_TRANSPORT_MODE_RECORD)) {
- /* TODO: Seeking for RECORD? */
+ if (gst_rtsp_media_is_receive_only (media)) {
+ /* TODO: Seeking for "receive-only"? */
priv->seekable = -1;
} else {
guint i, n = priv->streams->len;
gint64 start, end;
gst_query_parse_seeking (query, &format, &seekable, &start, &end);
- priv->seekable = seekable ? G_MAXINT64 : 0.0;
+ priv->seekable = seekable ? G_MAXINT64 : 0;
+ } else if (priv->streams->len) {
+ gboolean seekable = TRUE;
+ guint i, n = priv->streams->len;
+
+ GST_DEBUG_OBJECT (media, "Checking %d streams", n);
+ for (i = 0; i < n; i++) {
+ GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
+ seekable &= gst_rtsp_stream_seekable (stream);
+ }
+ priv->seekable = seekable ? G_MAXINT64 : -1;
}
+ GST_DEBUG_OBJECT (media, "seekable:%" G_GINT64_FORMAT, priv->seekable);
+
gst_query_unref (query);
}
+/* must be called with state lock */
+static gboolean
+check_complete (GstRTSPMedia * media)
+{
+ GstRTSPMediaPrivate *priv = media->priv;
+
+ guint i, n = priv->streams->len;
+
+ for (i = 0; i < n; i++) {
+ GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
+
+ if (gst_rtsp_stream_is_complete (stream))
+ return TRUE;
+ }
+
+ return FALSE;
+}
/* must be called with state lock */
static void
GstRTSPMediaPrivate *priv;
GstElement *old;
GstNetTimeProvider *nettime;
+ GList *l;
g_return_if_fail (GST_IS_RTSP_MEDIA (media));
g_return_if_fail (GST_IS_PIPELINE (pipeline));
gst_object_unref (nettime);
gst_bin_add (GST_BIN_CAST (pipeline), priv->element);
+
+ for (l = priv->pending_pipeline_elements; l; l = l->next) {
+ gst_bin_add (GST_BIN_CAST (pipeline), l->data);
+ }
+ g_list_free (priv->pending_pipeline_elements);
+ priv->pending_pipeline_elements = NULL;
}
/**
* gst_rtsp_media_set_permissions:
* @media: a #GstRTSPMedia
- * @permissions: (transfer none): a #GstRTSPPermissions
+ * @permissions: (transfer none) (nullable): a #GstRTSPPermissions
*
* Set @permissions on @media.
*/
*
* Get the permissions object from @media.
*
- * Returns: (transfer full): a #GstRTSPPermissions object, unref after usage.
+ * Returns: (transfer full) (nullable): a #GstRTSPPermissions object, unref after usage.
*/
GstRTSPPermissions *
gst_rtsp_media_get_permissions (GstRTSPMedia * media)
gst_rtsp_stream_set_retransmission_time (stream, time);
}
-
- if (priv->rtpbin)
- g_object_set (priv->rtpbin, "do-retransmission", time > 0, NULL);
g_mutex_unlock (&priv->lock);
}
}
/**
+ * gst_rtsp_media_set_do_retransmission:
+ *
+ * Set whether retransmission requests will be sent
+ *
+ * Since: 1.16
+ */
+void
+gst_rtsp_media_set_do_retransmission (GstRTSPMedia * media,
+ gboolean do_retransmission)
+{
+ GstRTSPMediaPrivate *priv;
+
+ g_return_if_fail (GST_IS_RTSP_MEDIA (media));
+
+ priv = media->priv;
+
+ g_mutex_lock (&priv->lock);
+ priv->do_retransmission = do_retransmission;
+
+ if (priv->rtpbin)
+ g_object_set (priv->rtpbin, "do-retransmission", do_retransmission, NULL);
+ g_mutex_unlock (&priv->lock);
+}
+
+/**
+ * gst_rtsp_media_get_do_retransmission:
+ *
+ * Returns: Whether retransmission requests will be sent
+ *
+ * Since: 1.16
+ */
+gboolean
+gst_rtsp_media_get_do_retransmission (GstRTSPMedia * media)
+{
+ GstRTSPMediaPrivate *priv;
+ gboolean res;
+
+ g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
+
+ priv = media->priv;
+
+ g_mutex_lock (&priv->lock);
+ res = priv->do_retransmission;
+ g_mutex_unlock (&priv->lock);
+
+ return res;
+}
+
+/**
* gst_rtsp_media_set_latency:
* @media: a #GstRTSPMedia
* @latency: latency in milliseconds
gst_rtsp_media_set_latency (GstRTSPMedia * media, guint latency)
{
GstRTSPMediaPrivate *priv;
+ guint i;
g_return_if_fail (GST_IS_RTSP_MEDIA (media));
g_mutex_lock (&priv->lock);
priv->latency = latency;
- if (priv->rtpbin)
+ if (priv->rtpbin) {
g_object_set (priv->rtpbin, "latency", latency, NULL);
+
+ for (i = 0; i < media->priv->streams->len; i++) {
+ GObject *storage = NULL;
+
+ g_signal_emit_by_name (G_OBJECT (media->priv->rtpbin), "get-storage",
+ i, &storage);
+ if (storage)
+ g_object_set (storage, "size-time",
+ (media->priv->latency + 50) * GST_MSECOND, NULL);
+ }
+ }
+
g_mutex_unlock (&priv->lock);
}
/**
* gst_rtsp_media_set_clock:
* @media: a #GstRTSPMedia
- * @clock: #GstClock to be used
+ * @clock: (nullable): #GstClock to be used
*
* Configure the clock used for the media.
*/
/**
* gst_rtsp_media_set_address_pool:
* @media: a #GstRTSPMedia
- * @pool: (transfer none): a #GstRTSPAddressPool
+ * @pool: (transfer none) (nullable): a #GstRTSPAddressPool
*
* configure @pool to be used as the address pool of @media.
*/
*
* Get the #GstRTSPAddressPool used as the address pool of @media.
*
- * Returns: (transfer full): the #GstRTSPAddressPool of @media. g_object_unref() after
- * usage.
+ * Returns: (transfer full) (nullable): the #GstRTSPAddressPool of @media.
+ * g_object_unref() after usage.
*/
GstRTSPAddressPool *
gst_rtsp_media_get_address_pool (GstRTSPMedia * media)
/**
* gst_rtsp_media_set_multicast_iface:
* @media: a #GstRTSPMedia
- * @multicast_iface: (transfer none): a multicast interface name
+ * @multicast_iface: (transfer none) (nullable): a multicast interface name
*
* configure @multicast_iface to be used for @media.
*/
*
* Get the multicast interface used for @media.
*
- * Returns: (transfer full): the multicast interface for @media. g_free() after
- * usage.
+ * Returns: (transfer full) (nullable): the multicast interface for @media.
+ * g_free() after usage.
*/
gchar *
gst_rtsp_media_get_multicast_iface (GstRTSPMedia * media)
return result;
}
+/**
+ * gst_rtsp_media_set_max_mcast_ttl:
+ * @media: a #GstRTSPMedia
+ * @ttl: the new multicast ttl value
+ *
+ * Set the maximum time-to-live value of outgoing multicast packets.
+ *
+ * Returns: %TRUE if the requested ttl has been set successfully.
+ *
+ * Since: 1.16
+ */
+gboolean
+gst_rtsp_media_set_max_mcast_ttl (GstRTSPMedia * media, guint ttl)
+{
+ GstRTSPMediaPrivate *priv;
+ guint i;
+
+ g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
+
+ GST_LOG_OBJECT (media, "set max mcast ttl %u", ttl);
+
+ priv = media->priv;
+
+ g_mutex_lock (&priv->lock);
+
+ if (ttl == 0 || ttl > DEFAULT_MAX_MCAST_TTL) {
+ GST_WARNING_OBJECT (media, "The reqested mcast TTL value is not valid.");
+ g_mutex_unlock (&priv->lock);
+ return FALSE;
+ }
+ priv->max_mcast_ttl = ttl;
+
+ for (i = 0; i < priv->streams->len; i++) {
+ GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
+ gst_rtsp_stream_set_max_mcast_ttl (stream, ttl);
+ }
+ g_mutex_unlock (&priv->lock);
+
+ return TRUE;
+}
+
+/**
+ * gst_rtsp_media_get_max_mcast_ttl:
+ * @media: a #GstRTSPMedia
+ *
+ * Get the the maximum time-to-live value of outgoing multicast packets.
+ *
+ * Returns: the maximum time-to-live value of outgoing multicast packets.
+ *
+ * Since: 1.16
+ */
+guint
+gst_rtsp_media_get_max_mcast_ttl (GstRTSPMedia * media)
+{
+ GstRTSPMediaPrivate *priv;
+ guint res;
+
+ g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
+
+ priv = media->priv;
+
+ g_mutex_lock (&priv->lock);
+ res = priv->max_mcast_ttl;
+ g_mutex_unlock (&priv->lock);
+
+ return res;
+}
+
+/**
+ * gst_rtsp_media_set_bind_mcast_address:
+ * @media: a #GstRTSPMedia
+ * @bind_mcast_addr: the new value
+ *
+ * Decide whether the multicast socket should be bound to a multicast address or
+ * INADDR_ANY.
+ *
+ * Since: 1.16
+ */
+void
+gst_rtsp_media_set_bind_mcast_address (GstRTSPMedia * media,
+ gboolean bind_mcast_addr)
+{
+ GstRTSPMediaPrivate *priv;
+ guint i;
+
+ g_return_if_fail (GST_IS_RTSP_MEDIA (media));
+
+ priv = media->priv;
+
+ g_mutex_lock (&priv->lock);
+ priv->bind_mcast_address = bind_mcast_addr;
+ for (i = 0; i < priv->streams->len; i++) {
+ GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
+ gst_rtsp_stream_set_bind_mcast_address (stream, bind_mcast_addr);
+ }
+ g_mutex_unlock (&priv->lock);
+}
+
+/**
+ * gst_rtsp_media_is_bind_mcast_address:
+ * @media: a #GstRTSPMedia
+ *
+ * Check if multicast sockets are configured to be bound to multicast addresses.
+ *
+ * Returns: %TRUE if multicast sockets are configured to be bound to multicast addresses.
+ *
+ * Since: 1.16
+ */
+gboolean
+gst_rtsp_media_is_bind_mcast_address (GstRTSPMedia * media)
+{
+ GstRTSPMediaPrivate *priv;
+ gboolean result;
+
+ g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
+
+ priv = media->priv;
+
+ g_mutex_lock (&priv->lock);
+ result = priv->bind_mcast_address;
+ g_mutex_unlock (&priv->lock);
+
+ return result;
+}
+
static GList *
_find_payload_types (GstRTSPMedia * media)
{
pad = gst_element_get_static_pad (elem, "src");
/* find the real payload element in case elem is a GstBin */
- pay = find_payload_element (elem);
+ pay = find_payload_element (elem, pad);
/* create the stream */
if (pay == NULL) {
priv->dynamic = g_list_prepend (priv->dynamic, elem);
g_mutex_unlock (&priv->lock);
+ priv->nb_dynamic_elements++;
+
have_elem = TRUE;
more_elem_remaining = TRUE;
mode |= GST_RTSP_TRANSPORT_MODE_PLAY;
}
}
+typedef struct
+{
+ GstElement *appsink, *appsrc;
+ GstRTSPStream *stream;
+} AppSinkSrcData;
+
+static GstFlowReturn
+appsink_new_sample (GstAppSink * appsink, gpointer user_data)
+{
+ AppSinkSrcData *data = user_data;
+ GstSample *sample;
+ GstFlowReturn ret;
+
+ sample = gst_app_sink_pull_sample (appsink);
+ if (!sample)
+ return GST_FLOW_FLUSHING;
+
+
+ ret = gst_app_src_push_sample (GST_APP_SRC (data->appsrc), sample);
+ gst_sample_unref (sample);
+ return ret;
+}
+
+static GstAppSinkCallbacks appsink_callbacks = {
+ NULL,
+ NULL,
+ appsink_new_sample,
+};
+
+static GstPadProbeReturn
+appsink_pad_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+ AppSinkSrcData *data = user_data;
+
+ if (GST_IS_EVENT (info->data)
+ && GST_EVENT_TYPE (info->data) == GST_EVENT_LATENCY) {
+ GstClockTime min, max;
+
+ if (gst_base_sink_query_latency (GST_BASE_SINK (data->appsink), NULL, NULL,
+ &min, &max)) {
+ g_object_set (data->appsrc, "min-latency", min, "max-latency", max, NULL);
+ GST_DEBUG ("setting latency to min %" GST_TIME_FORMAT " max %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
+ }
+ } else if (GST_IS_QUERY (info->data)) {
+ GstPad *srcpad = gst_element_get_static_pad (data->appsrc, "src");
+ if (gst_pad_peer_query (srcpad, GST_QUERY_CAST (info->data))) {
+ gst_object_unref (srcpad);
+ return GST_PAD_PROBE_HANDLED;
+ }
+ gst_object_unref (srcpad);
+ }
+
+ return GST_PAD_PROBE_OK;
+}
+
+static GstPadProbeReturn
+appsrc_pad_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+ AppSinkSrcData *data = user_data;
+
+ if (GST_IS_QUERY (info->data)) {
+ GstPad *sinkpad = gst_element_get_static_pad (data->appsink, "sink");
+ if (gst_pad_peer_query (sinkpad, GST_QUERY_CAST (info->data))) {
+ gst_object_unref (sinkpad);
+ return GST_PAD_PROBE_HANDLED;
+ }
+ gst_object_unref (sinkpad);
+ }
+
+ return GST_PAD_PROBE_OK;
+}
+
/**
* gst_rtsp_media_create_stream:
* @media: a #GstRTSPMedia
{
GstRTSPMediaPrivate *priv;
GstRTSPStream *stream;
- GstPad *ghostpad;
+ GstPad *streampad;
gchar *name;
gint idx;
+ AppSinkSrcData *data = NULL;
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
g_mutex_lock (&priv->lock);
idx = priv->streams->len;
- GST_DEBUG ("media %p: creating stream with index %d", media, idx);
+ GST_DEBUG ("media %p: creating stream with index %d and payloader %"
+ GST_PTR_FORMAT, media, idx, payloader);
if (GST_PAD_IS_SRC (pad))
name = g_strdup_printf ("src_%u", idx);
else
name = g_strdup_printf ("sink_%u", idx);
- ghostpad = gst_ghost_pad_new (name, pad);
- gst_pad_set_active (ghostpad, TRUE);
- gst_element_add_pad (priv->element, ghostpad);
+ if ((GST_PAD_IS_SRC (pad) && priv->element->numsinkpads > 0) ||
+ (GST_PAD_IS_SINK (pad) && priv->element->numsrcpads > 0)) {
+ GstElement *appsink, *appsrc;
+ GstPad *sinkpad, *srcpad;
+
+ appsink = gst_element_factory_make ("appsink", NULL);
+ appsrc = gst_element_factory_make ("appsrc", NULL);
+
+ if (GST_PAD_IS_SINK (pad)) {
+ srcpad = gst_element_get_static_pad (appsrc, "src");
+
+ gst_bin_add (GST_BIN (priv->element), appsrc);
+
+ gst_pad_link (srcpad, pad);
+ gst_object_unref (srcpad);
+
+ streampad = gst_element_get_static_pad (appsink, "sink");
+
+ priv->pending_pipeline_elements =
+ g_list_prepend (priv->pending_pipeline_elements, appsink);
+ } else {
+ sinkpad = gst_element_get_static_pad (appsink, "sink");
+
+ gst_pad_link (pad, sinkpad);
+ gst_object_unref (sinkpad);
+
+ streampad = gst_element_get_static_pad (appsrc, "src");
+
+ priv->pending_pipeline_elements =
+ g_list_prepend (priv->pending_pipeline_elements, appsrc);
+ }
+
+ g_object_set (appsrc, "block", TRUE, "format", GST_FORMAT_TIME, "is-live",
+ TRUE, "emit-signals", FALSE, NULL);
+ g_object_set (appsink, "sync", FALSE, "async", FALSE, "emit-signals",
+ FALSE, "buffer-list", TRUE, NULL);
+
+ data = g_new0 (AppSinkSrcData, 1);
+ data->appsink = appsink;
+ data->appsrc = appsrc;
+
+ sinkpad = gst_element_get_static_pad (appsink, "sink");
+ gst_pad_add_probe (sinkpad,
+ GST_PAD_PROBE_TYPE_EVENT_UPSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
+ appsink_pad_probe, data, NULL);
+ gst_object_unref (sinkpad);
+
+ srcpad = gst_element_get_static_pad (appsrc, "src");
+ gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
+ appsrc_pad_probe, data, NULL);
+ gst_object_unref (srcpad);
+
+ gst_app_sink_set_callbacks (GST_APP_SINK (appsink), &appsink_callbacks,
+ data, NULL);
+ g_object_set_data_full (G_OBJECT (streampad), "media-appsink-appsrc", data,
+ g_free);
+ } else {
+ streampad = gst_ghost_pad_new (name, pad);
+ gst_pad_set_active (streampad, TRUE);
+ gst_element_add_pad (priv->element, streampad);
+ }
g_free (name);
- stream = gst_rtsp_stream_new (idx, payloader, ghostpad);
+ stream = gst_rtsp_stream_new (idx, payloader, streampad);
+ if (data)
+ data->stream = stream;
if (priv->pool)
gst_rtsp_stream_set_address_pool (stream, priv->pool);
gst_rtsp_stream_set_multicast_iface (stream, priv->multicast_iface);
+ gst_rtsp_stream_set_max_mcast_ttl (stream, priv->max_mcast_ttl);
+ gst_rtsp_stream_set_bind_mcast_address (stream, priv->bind_mcast_address);
gst_rtsp_stream_set_profiles (stream, priv->profiles);
gst_rtsp_stream_set_protocols (stream, priv->protocols);
gst_rtsp_stream_set_retransmission_time (stream, priv->rtx_time);
gst_rtsp_stream_set_buffer_size (stream, priv->buffer_size);
gst_rtsp_stream_set_publish_clock_mode (stream, priv->publish_clock_mode);
+ gst_rtsp_stream_set_rate_control (stream, priv->do_rate_control);
g_ptr_array_add (priv->streams, stream);
{
GstRTSPMediaPrivate *priv;
GstPad *srcpad;
+ AppSinkSrcData *data;
priv = media->priv;
g_mutex_lock (&priv->lock);
/* remove the ghostpad */
srcpad = gst_rtsp_stream_get_srcpad (stream);
- gst_element_remove_pad (priv->element, srcpad);
+ data = g_object_get_data (G_OBJECT (srcpad), "media-appsink-appsrc");
+ if (data) {
+ if (GST_OBJECT_PARENT (data->appsrc) == GST_OBJECT_CAST (priv->pipeline))
+ gst_bin_remove (GST_BIN_CAST (priv->pipeline), data->appsrc);
+ else if (GST_OBJECT_PARENT (data->appsrc) ==
+ GST_OBJECT_CAST (priv->element))
+ gst_bin_remove (GST_BIN_CAST (priv->element), data->appsrc);
+ if (GST_OBJECT_PARENT (data->appsink) == GST_OBJECT_CAST (priv->pipeline))
+ gst_bin_remove (GST_BIN_CAST (priv->pipeline), data->appsink);
+ else if (GST_OBJECT_PARENT (data->appsink) ==
+ GST_OBJECT_CAST (priv->element))
+ gst_bin_remove (GST_BIN_CAST (priv->element), data->appsink);
+ } else {
+ gst_element_remove_pad (priv->element, srcpad);
+ }
gst_object_unref (srcpad);
/* now remove the stream */
g_object_ref (stream);
* Get the current range as a string. @media must be prepared with
* gst_rtsp_media_prepare ().
*
- * Returns: (transfer full): The range as a string, g_free() after usage.
+ * Returns: (transfer full) (nullable): The range as a string, g_free() after usage.
*/
gchar *
gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play,
}
}
-static void
-stream_update_blocked (GstRTSPStream * stream, GstRTSPMedia * media)
+/**
+ * gst_rtsp_media_get_rates:
+ * @media: a #GstRTSPMedia
+ * @rate (allow-none): the rate of the current segment
+ * @applied_rate (allow-none): the applied_rate of the current segment
+ *
+ * Get the rate and applied_rate of the current segment.
+ *
+ * Returns: %FALSE if looking up the rate and applied rate failed. Otherwise
+ * %TRUE is returned and @rate and @applied_rate are set to the rate and
+ * applied_rate of the current segment.
+ * Since: 1.18
+ */
+gboolean
+gst_rtsp_media_get_rates (GstRTSPMedia * media, gdouble * rate,
+ gdouble * applied_rate)
{
- gst_rtsp_stream_set_blocked (stream, media->priv->blocked);
-}
+ GstRTSPMediaPrivate *priv;
+ GstRTSPStream *stream;
+ gdouble save_rate, save_applied_rate;
+ gboolean result = TRUE;
+ gboolean first_stream = TRUE;
+ gint i;
-static void
-media_streams_set_blocked (GstRTSPMedia * media, gboolean blocked)
-{
- GstRTSPMediaPrivate *priv = media->priv;
+ g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
- GST_DEBUG ("media %p set blocked %d", media, blocked);
- priv->blocked = blocked;
- g_ptr_array_foreach (priv->streams, (GFunc) stream_update_blocked, media);
-}
+ if (!rate && !applied_rate) {
+ GST_WARNING_OBJECT (media, "rate and applied_rate are both NULL");
+ return FALSE;
+ }
-static void
+ priv = media->priv;
+
+ g_mutex_lock (&priv->lock);
+
+ g_assert (priv->streams->len > 0);
+ for (i = 0; i < priv->streams->len; i++) {
+ stream = g_ptr_array_index (priv->streams, i);
+ if (gst_rtsp_stream_is_complete (stream)) {
+ if (gst_rtsp_stream_get_rates (stream, rate, applied_rate)) {
+ if (first_stream) {
+ save_rate = *rate;
+ save_applied_rate = *applied_rate;
+ first_stream = FALSE;
+ } else {
+ if (save_rate != *rate || save_applied_rate != *applied_rate) {
+ /* diffrent rate or applied_rate, weird */
+ g_assert (FALSE);
+ result = FALSE;
+ break;
+ }
+ }
+ } else {
+ /* complete stream withot rate and applied_rate, weird */
+ g_assert (FALSE);
+ result = FALSE;
+ break;
+ }
+ }
+ }
+
+ if (!result) {
+ GST_WARNING_OBJECT (media,
+ "failed to obtain consistent rate and applied_rate");
+ }
+
+ g_mutex_unlock (&priv->lock);
+
+ return result;
+}
+
+static void
+stream_update_blocked (GstRTSPStream * stream, GstRTSPMedia * media)
+{
+ gst_rtsp_stream_set_blocked (stream, media->priv->blocked);
+}
+
+static void
+media_streams_set_blocked (GstRTSPMedia * media, gboolean blocked)
+{
+ GstRTSPMediaPrivate *priv = media->priv;
+
+ GST_DEBUG ("media %p set blocked %d", media, blocked);
+ priv->blocked = blocked;
+ g_ptr_array_foreach (priv->streams, (GFunc) stream_update_blocked, media);
+}
+
+static void
+stream_unblock (GstRTSPStream * stream, GstRTSPMedia * media)
+{
+ gst_rtsp_stream_set_blocked (stream, FALSE);
+}
+
+static void
+media_unblock (GstRTSPMedia * media)
+{
+ GstRTSPMediaPrivate *priv = media->priv;
+
+ GST_DEBUG ("media %p unblocking streams", media);
+ /* media is not blocked any longer, as it contains active streams,
+ * streams that are complete */
+ priv->blocked = FALSE;
+ g_ptr_array_foreach (priv->streams, (GFunc) stream_unblock, media);
+}
+
+static void
gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
{
GstRTSPMediaPrivate *priv = media->priv;
}
/**
- * gst_rtsp_media_seek_full:
+ * gst_rtsp_media_seek_trickmode:
* @media: a #GstRTSPMedia
* @range: (transfer none): a #GstRTSPTimeRange
* @flags: The minimal set of #GstSeekFlags to use
+ * @rate: the rate to use in the seek
+ * @trickmode_interval: The trickmode interval to use for KEY_UNITS trick mode
*
- * Seek the pipeline of @media to @range. @media must be prepared with
- * gst_rtsp_media_prepare().
+ * Seek the pipeline of @media to @range with the given @flags and @rate,
+ * and @trickmode_interval.
+ * @media must be prepared with gst_rtsp_media_prepare().
+ * In order to perform the seek operation, the pipeline must contain all
+ * needed transport parts (transport sinks).
*
* Returns: %TRUE on success.
+ *
+ * Since: 1.18
*/
gboolean
-gst_rtsp_media_seek_full (GstRTSPMedia * media, GstRTSPTimeRange * range,
- GstSeekFlags flags)
+gst_rtsp_media_seek_trickmode (GstRTSPMedia * media,
+ GstRTSPTimeRange * range, GstSeekFlags flags, gdouble rate,
+ GstClockTime trickmode_interval)
{
GstRTSPMediaClass *klass;
GstRTSPMediaPrivate *priv;
GstClockTime start, stop;
GstSeekType start_type, stop_type;
gint64 current_position;
+ gboolean force_seek;
klass = GST_RTSP_MEDIA_GET_CLASS (media);
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
- g_return_val_if_fail (range != NULL, FALSE);
- g_return_val_if_fail (klass->convert_range != NULL, FALSE);
+ /* if there's a range then klass->convert_range must be set */
+ g_return_val_if_fail (range == NULL || klass->convert_range != NULL, FALSE);
+
+ GST_DEBUG ("flags=%x rate=%f", flags, rate);
priv = media->priv;
if (priv->status != GST_RTSP_MEDIA_STATUS_PREPARED)
goto not_prepared;
+ /* check if the media pipeline is complete in order to perform a
+ * seek operation on it */
+ if (!check_complete (media))
+ goto not_complete;
+
/* Update the seekable state of the pipeline in case it changed */
check_seekable (media);
}
start_type = stop_type = GST_SEEK_TYPE_NONE;
+ start = stop = GST_CLOCK_TIME_NONE;
- if (!klass->convert_range (media, range, GST_RTSP_RANGE_NPT))
- goto not_supported;
- gst_rtsp_range_get_times (range, &start, &stop);
+ /* if caller provided a range convert it to NPT format
+ * if no range provided the seek is assumed to be the same position but with
+ * e.g. the rate changed */
+ if (range != NULL) {
+ if (!klass->convert_range (media, range, GST_RTSP_RANGE_NPT))
+ goto not_supported;
+ gst_rtsp_range_get_times (range, &start, &stop);
- GST_INFO ("got %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
- GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
- GST_INFO ("current %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
- GST_TIME_ARGS (priv->range_start), GST_TIME_ARGS (priv->range_stop));
+ GST_INFO ("got %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
+ GST_INFO ("current %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (priv->range_start), GST_TIME_ARGS (priv->range_stop));
+ }
current_position = -1;
if (klass->query_position)
if (start != GST_CLOCK_TIME_NONE)
start_type = GST_SEEK_TYPE_SET;
- if (priv->range_stop == stop)
- stop = GST_CLOCK_TIME_NONE;
- else if (stop != GST_CLOCK_TIME_NONE)
+ if (stop != GST_CLOCK_TIME_NONE)
stop_type = GST_SEEK_TYPE_SET;
- if (start != GST_CLOCK_TIME_NONE || stop != GST_CLOCK_TIME_NONE) {
- gboolean had_flags = flags != 0;
+ /* we force a seek if any trickmode flag is set, or if the rate
+ * is non-standard, i.e. not 1.0 */
+ force_seek = (flags & TRICKMODE_FLAGS) || rate != 1.0;
+ if (start != GST_CLOCK_TIME_NONE || stop != GST_CLOCK_TIME_NONE || force_seek) {
GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
/* depends on the current playing state of the pipeline. We might need to
* queue this until we get EOS. */
- if (had_flags)
- flags |= GST_SEEK_FLAG_FLUSH;
- else
- flags = GST_SEEK_FLAG_FLUSH;
-
+ flags |= GST_SEEK_FLAG_FLUSH;
/* if range start was not supplied we must continue from current position.
* but since we're doing a flushing seek, let us query the current position
* so we end up at exactly the same position after the seek. */
- if (range->min.type == GST_RTSP_TIME_END) { /* Yepp, that's right! */
+ if (range == NULL || range->min.type == GST_RTSP_TIME_END) {
if (current_position == -1) {
GST_WARNING ("current position unknown");
} else {
GST_TIME_ARGS (current_position));
start = current_position;
start_type = GST_SEEK_TYPE_SET;
- if (!had_flags)
- flags |= GST_SEEK_FLAG_ACCURATE;
}
- } else {
- /* only set keyframe flag when modifying start */
- if (start_type != GST_SEEK_TYPE_NONE)
- if (!had_flags)
- flags |= GST_SEEK_FLAG_KEY_UNIT;
}
- if (start == current_position && stop_type == GST_SEEK_TYPE_NONE) {
- GST_DEBUG ("not seeking because no position change");
+ if (start == current_position && stop_type == GST_SEEK_TYPE_NONE &&
+ !force_seek) {
+ GST_DEBUG ("no position change, no flags set by caller, so not seeking");
res = TRUE;
} else {
+ GstEvent *seek_event;
+ gboolean unblock = FALSE;
+
+ /* Handle expected async-done before waiting on next async-done.
+ *
+ * Since the seek further down in code will cause a preroll and
+ * a async-done will be generated it's important to wait on async-done
+ * if that is expected. Otherwise there is the risk that the waiting
+ * for async-done after the seek is detecting the expected async-done
+ * instead of the one that corresponds to the seek. Then execution
+ * continue and act as if the pipeline is prerolled, but it's not.
+ *
+ * During wait_preroll message GST_MESSAGE_ASYNC_DONE will come
+ * and then the state will change from preparing to prepared */
+ if (priv->expected_async_done) {
+ GST_DEBUG (" expected to get async-done, waiting ");
+ gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING);
+ g_rec_mutex_unlock (&priv->state_lock);
+
+ /* wait until pipeline is prerolled */
+ if (!wait_preroll (media))
+ goto preroll_failed_expected_async_done;
+
+ g_rec_mutex_lock (&priv->state_lock);
+ GST_DEBUG (" got expected async-done");
+ }
+
gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING);
- if (priv->blocked)
+
+ if (rate < 0.0) {
+ GstClockTime temp_time = start;
+ GstSeekType temp_type = start_type;
+
+ start = stop;
+ start_type = stop_type;
+ stop = temp_time;
+ stop_type = temp_type;
+ }
+
+ seek_event = gst_event_new_seek (rate, GST_FORMAT_TIME, flags, start_type,
+ start, stop_type, stop);
+
+ gst_event_set_seek_trickmode_interval (seek_event, trickmode_interval);
+
+ if (!media->priv->blocked) {
+ /* Prevent a race condition with multiple streams,
+ * where one stream may have time to preroll before others
+ * have even started flushing, causing async-done to be
+ * posted too early.
+ */
media_streams_set_blocked (media, TRUE);
+ unblock = TRUE;
+ }
- /* FIXME, we only do forwards playback, no trick modes yet */
- res = gst_element_seek (priv->pipeline, 1.0, GST_FORMAT_TIME,
- flags, start_type, start, stop_type, stop);
+ res = gst_element_send_event (priv->pipeline, seek_event);
+
+ if (unblock)
+ media_streams_set_blocked (media, FALSE);
/* and block for the seek to complete */
GST_INFO ("done seeking %d", res);
GST_INFO ("media %p is not prepared", media);
return FALSE;
}
+not_complete:
+ {
+ g_rec_mutex_unlock (&priv->state_lock);
+ GST_INFO ("pipeline is not complete");
+ return FALSE;
+ }
not_seekable:
{
g_rec_mutex_unlock (&priv->state_lock);
GST_WARNING ("failed to preroll after seek");
return FALSE;
}
+preroll_failed_expected_async_done:
+ {
+ GST_WARNING ("failed to preroll");
+ return FALSE;
+ }
}
+/**
+ * gst_rtsp_media_seek_full:
+ * @media: a #GstRTSPMedia
+ * @range: (transfer none): a #GstRTSPTimeRange
+ * @flags: The minimal set of #GstSeekFlags to use
+ *
+ * Seek the pipeline of @media to @range with the given @flags.
+ * @media must be prepared with gst_rtsp_media_prepare().
+ *
+ * Returns: %TRUE on success.
+ * Since: 1.18
+ */
+gboolean
+gst_rtsp_media_seek_full (GstRTSPMedia * media, GstRTSPTimeRange * range,
+ GstSeekFlags flags)
+{
+ return gst_rtsp_media_seek_trickmode (media, range, flags, 1.0, 0);
+}
/**
* gst_rtsp_media_seek:
gboolean
gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
{
- return gst_rtsp_media_seek_full (media, range, 0);
+ return gst_rtsp_media_seek_trickmode (media, range, GST_SEEK_FLAG_NONE,
+ 1.0, 0);
}
-
static void
stream_collect_blocking (GstRTSPStream * stream, gboolean * blocked)
{
GST_DEBUG ("%p: went from %s to %s (pending %s)", media,
gst_element_state_get_name (old), gst_element_state_get_name (new),
gst_element_state_get_name (pending));
- if ((priv->transport_mode & GST_RTSP_TRANSPORT_MODE_RECORD)
- && old == GST_STATE_READY && new == GST_STATE_PAUSED) {
+ if (priv->no_more_pads_pending == 0
+ && gst_rtsp_media_is_receive_only (media) && old == GST_STATE_READY
+ && new == GST_STATE_PAUSED) {
GST_INFO ("%p: went to PAUSED, prepared now", media);
collect_media_stats (media);
s = gst_message_get_structure (message);
if (gst_structure_has_name (s, "GstRTSPStreamBlocking")) {
GST_DEBUG ("media received blocking message");
- if (priv->blocked && media_streams_blocking (media)) {
- GST_DEBUG ("media is blocking");
+ if (priv->blocked && media_streams_blocking (media) &&
+ priv->no_more_pads_pending == 0) {
+ GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message), "media is blocking");
collect_media_stats (media);
if (priv->status == GST_RTSP_MEDIA_STATUS_PREPARING)
case GST_MESSAGE_STREAM_STATUS:
break;
case GST_MESSAGE_ASYNC_DONE:
- if (priv->adding) {
- /* when we are dynamically adding pads, the addition of the udpsrc will
- * temporarily produce ASYNC_DONE messages. We have to ignore them and
- * wait for the final ASYNC_DONE after everything prerolled */
- GST_INFO ("%p: ignoring ASYNC_DONE", media);
- } else {
- GST_INFO ("%p: got ASYNC_DONE", media);
- collect_media_stats (media);
-
+ if (priv->expected_async_done)
+ priv->expected_async_done = FALSE;
+ if (priv->complete) {
+ /* receive the final ASYNC_DONE, that is posted by the media pipeline
+ * after all the transport parts have been successfully added to
+ * the media streams. */
+ GST_DEBUG_OBJECT (media, "got async-done");
if (priv->status == GST_RTSP_MEDIA_STATUS_PREPARING)
gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
}
g_object_unref (media);
}
+static gboolean
+is_payloader (GstElement * element)
+{
+ GstElementClass *eclass = GST_ELEMENT_GET_CLASS (element);
+ const gchar *klass;
+
+ klass = gst_element_class_get_metadata (eclass, GST_ELEMENT_METADATA_KLASS);
+ if (klass == NULL)
+ return FALSE;
+
+ if (strstr (klass, "Payloader") && strstr (klass, "RTP")) {
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
static GstElement *
-find_payload_element (GstElement * payloader)
+find_payload_element (GstElement * payloader, GstPad * pad)
{
GstElement *pay = NULL;
if (GST_IS_BIN (payloader)) {
GstIterator *iter;
GValue item = { 0 };
+ gchar *pad_name, *payloader_name;
+ GstElement *element;
+
+ if ((element = gst_bin_get_by_name (GST_BIN (payloader), "pay"))) {
+ if (is_payloader (element))
+ return element;
+ gst_object_unref (element);
+ }
+
+ pad_name = gst_object_get_name (GST_OBJECT (pad));
+ payloader_name = g_strdup_printf ("pay_%s", pad_name);
+ g_free (pad_name);
+ if ((element = gst_bin_get_by_name (GST_BIN (payloader), payloader_name))) {
+ g_free (payloader_name);
+ if (is_payloader (element))
+ return element;
+ gst_object_unref (element);
+ } else {
+ g_free (payloader_name);
+ }
iter = gst_bin_iterate_recurse (GST_BIN (payloader));
while (gst_iterator_next (iter, &item) == GST_ITERATOR_OK) {
- GstElement *element = (GstElement *) g_value_get_object (&item);
- GstElementClass *eclass = GST_ELEMENT_GET_CLASS (element);
- const gchar *klass;
+ element = (GstElement *) g_value_get_object (&item);
- klass =
- gst_element_class_get_metadata (eclass, GST_ELEMENT_METADATA_KLASS);
- if (klass == NULL)
- continue;
-
- if (strstr (klass, "Payloader") && strstr (klass, "RTP")) {
+ if (is_payloader (element)) {
pay = gst_object_ref (element);
g_value_unset (&item);
break;
GstElement *pay;
/* find the real payload element */
- pay = find_payload_element (element);
+ pay = find_payload_element (element, pad);
stream = gst_rtsp_media_create_stream (media, pay, pad);
gst_object_unref (pay);
g_object_set_data (G_OBJECT (pad), "gst-rtsp-dynpad-stream", stream);
- /* we will be adding elements below that will cause ASYNC_DONE to be
- * posted in the bus. We want to ignore those messages until the
- * pipeline really prerolled. */
- priv->adding = TRUE;
-
/* join the element in the PAUSED state because this callback is
* called from the streaming thread and it is PAUSED */
if (!gst_rtsp_stream_join_bin (stream, GST_BIN (priv->pipeline),
GST_WARNING ("failed to join bin element");
}
- priv->adding = FALSE;
+ if (priv->blocked)
+ gst_rtsp_stream_set_blocked (stream, TRUE);
+
g_rec_mutex_unlock (&priv->state_lock);
return;
}
static void
-remove_fakesink (GstRTSPMediaPrivate * priv)
-{
- GstElement *fakesink;
-
- g_mutex_lock (&priv->lock);
- if ((fakesink = priv->fakesink))
- gst_object_ref (fakesink);
- priv->fakesink = NULL;
- g_mutex_unlock (&priv->lock);
-
- if (fakesink) {
- gst_bin_remove (GST_BIN (priv->pipeline), fakesink);
- gst_element_set_state (fakesink, GST_STATE_NULL);
- gst_object_unref (fakesink);
- GST_INFO ("removed fakesink");
- }
-}
-
-static void
no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
{
GstRTSPMediaPrivate *priv = media->priv;
- GST_INFO ("no more pads");
- remove_fakesink (priv);
+ GST_INFO_OBJECT (element, "no more pads");
+ g_mutex_lock (&priv->lock);
+ priv->no_more_pads_pending--;
+ g_mutex_unlock (&priv->lock);
}
typedef struct _DynPaySignalHandlers DynPaySignalHandlers;
GstStateChangeReturn ret;
GST_INFO ("setting pipeline to PAUSED for media %p", media);
- /* first go to PAUSED */
+
+ /* start blocked since it is possible that there are no sink elements yet */
+ media_streams_set_blocked (media, TRUE);
ret = set_target_state (media, GST_STATE_PAUSED, TRUE);
switch (ret) {
* seeking query in preroll instead */
priv->seekable = -1;
priv->is_live = TRUE;
- if (!(priv->transport_mode & GST_RTSP_TRANSPORT_MODE_RECORD)) {
- /* start blocked to make sure nothing goes to the sink */
- media_streams_set_blocked (media, TRUE);
- }
+
ret = set_state (media, GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE)
goto state_failed;
GstRTSPMediaPrivate *priv = media->priv;
GstRTSPStream *stream = NULL;
guint i;
+ GstElement *res = NULL;
+
+ g_mutex_lock (&priv->lock);
+ for (i = 0; i < priv->streams->len; i++) {
+ stream = g_ptr_array_index (priv->streams, i);
+
+ if (sessid == gst_rtsp_stream_get_index (stream))
+ break;
+
+ stream = NULL;
+ }
+ g_mutex_unlock (&priv->lock);
+
+ if (stream)
+ res = gst_rtsp_stream_request_aux_sender (stream, sessid);
+
+ return res;
+}
+
+static GstElement *
+request_aux_receiver (GstElement * rtpbin, guint sessid, GstRTSPMedia * media)
+{
+ GstRTSPMediaPrivate *priv = media->priv;
+ GstRTSPStream *stream = NULL;
+ guint i;
+ GstElement *res = NULL;
+
+ g_mutex_lock (&priv->lock);
+ for (i = 0; i < priv->streams->len; i++) {
+ stream = g_ptr_array_index (priv->streams, i);
+
+ if (sessid == gst_rtsp_stream_get_index (stream))
+ break;
+
+ stream = NULL;
+ }
+ g_mutex_unlock (&priv->lock);
+
+ if (stream)
+ res = gst_rtsp_stream_request_aux_receiver (stream, sessid);
+
+ return res;
+}
+
+static GstElement *
+request_fec_decoder (GstElement * rtpbin, guint sessid, GstRTSPMedia * media)
+{
+ GstRTSPMediaPrivate *priv = media->priv;
+ GstRTSPStream *stream = NULL;
+ guint i;
+ GstElement *res = NULL;
g_mutex_lock (&priv->lock);
for (i = 0; i < priv->streams->len; i++) {
if (sessid == gst_rtsp_stream_get_index (stream))
break;
+
+ stream = NULL;
}
g_mutex_unlock (&priv->lock);
- return gst_rtsp_stream_request_aux_sender (stream, sessid);
+ if (stream) {
+ res = gst_rtsp_stream_request_ulpfec_decoder (stream, rtpbin, sessid);
+ }
+
+ return res;
+}
+
+static void
+new_storage_cb (GstElement * rtpbin, GObject * storage, guint sessid,
+ GstRTSPMedia * media)
+{
+ g_object_set (storage, "size-time", (media->priv->latency + 50) * GST_MSECOND,
+ NULL);
}
static gboolean
if (priv->status != GST_RTSP_MEDIA_STATUS_PREPARING)
goto no_longer_preparing;
+ g_signal_connect (priv->rtpbin, "new-storage", G_CALLBACK (new_storage_cb),
+ media);
+ g_signal_connect (priv->rtpbin, "request-fec-decoder",
+ G_CALLBACK (request_fec_decoder), media);
+
/* link streams we already have, other streams might appear when we have
* dynamic elements */
for (i = 0; i < priv->streams->len; i++) {
(GCallback) request_aux_sender, media);
}
+ if (priv->do_retransmission) {
+ g_signal_connect (priv->rtpbin, "request-aux-receiver",
+ (GCallback) request_aux_receiver, media);
+ }
+
if (!gst_rtsp_stream_join_bin (stream, GST_BIN (priv->pipeline),
priv->rtpbin, GST_STATE_NULL)) {
goto join_bin_failed;
}
if (priv->rtpbin)
- g_object_set (priv->rtpbin, "do-retransmission", priv->rtx_time > 0, NULL);
+ g_object_set (priv->rtpbin, "do-retransmission", priv->do_retransmission,
+ "do-lost", TRUE, NULL);
for (walk = priv->dynamic; walk; walk = g_list_next (walk)) {
GstElement *elem = walk->data;
(GCallback) no_more_pads_cb, media);
g_object_set_data (G_OBJECT (elem), "gst-rtsp-dynpay-handlers", handlers);
-
- if (!priv->fakesink) {
- /* we add a fakesink here in order to make the state change async. We remove
- * the fakesink again in the no-more-pads callback. */
- priv->fakesink = gst_element_factory_make ("fakesink", "fakesink");
- gst_bin_add (GST_BIN (priv->pipeline), priv->fakesink);
- }
}
- if (!start_preroll (media))
+ if (priv->nb_dynamic_elements == 0 && gst_rtsp_media_is_receive_only (media)) {
+ /* If we are receive_only (RECORD), do not try to preroll, to avoid
+ * a second ASYNC state change failing */
+ priv->is_live = TRUE;
+ gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
+ } else if (!start_preroll (media)) {
goto preroll_failed;
+ }
g_rec_mutex_unlock (&priv->state_lock);
priv->is_live = FALSE;
priv->seekable = -1;
priv->buffering = FALSE;
+ priv->no_more_pads_pending = priv->nb_dynamic_elements;
/* we're preparing now */
gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING);
gint i;
GList *walk;
+ if (priv->finishing_unprepare)
+ return;
+ priv->finishing_unprepare = TRUE;
+
GST_DEBUG ("shutting down");
/* release the lock on shutdown, otherwise pad_added_cb might try to
set_state (media, GST_STATE_NULL);
g_rec_mutex_lock (&priv->state_lock);
- if (priv->status != GST_RTSP_MEDIA_STATUS_UNPREPARING)
- return;
-
- remove_fakesink (priv);
+ media_streams_set_blocked (media, FALSE);
for (i = 0; i < priv->streams->len; i++) {
GstRTSPStream *stream;
GST_DEBUG ("stop thread");
gst_rtsp_thread_stop (priv->thread);
}
+
+ priv->finishing_unprepare = FALSE;
}
/* called with state-lock */
goto is_busy;
GST_INFO ("unprepare media %p", media);
- if (priv->blocked)
- media_streams_set_blocked (media, FALSE);
set_target_state (media, GST_STATE_NULL, FALSE);
success = TRUE;
*
* @media must be prepared before this method returns a valid clock object.
*
- * Returns: (transfer full): the #GstClock used by @media. unref after usage.
+ * Returns: (transfer full) (nullable): the #GstClock used by @media. unref after usage.
*/
GstClock *
gst_rtsp_media_get_clock (GstRTSPMedia * media)
s = gst_caps_get_structure (caps, 0);
gst_structure_set_name (s, "application/x-rtp");
+ if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "ULPFEC"))
+ gst_structure_set (s, "is-fec", G_TYPE_BOOLEAN, TRUE, NULL);
+
gst_rtsp_stream_set_pt_map (stream, pt, caps);
gst_caps_unref (caps);
}
do_set_seqnum (GstRTSPStream * stream)
{
guint16 seq_num;
- seq_num = gst_rtsp_stream_get_current_seqnum (stream);
- gst_rtsp_stream_set_seqnum_offset (stream, seq_num + 1);
+
+ if (gst_rtsp_stream_is_sender (stream)) {
+ seq_num = gst_rtsp_stream_get_current_seqnum (stream);
+ gst_rtsp_stream_set_seqnum_offset (stream, seq_num + 1);
+ }
}
/* call with state_lock */
{
GstRTSPMediaPrivate *priv = media->priv;
GstStateChangeReturn ret;
- gboolean unblock = FALSE;
switch (priv->suspend_mode) {
case GST_RTSP_SUSPEND_MODE_NONE:
ret = set_target_state (media, GST_STATE_PAUSED, TRUE);
if (ret == GST_STATE_CHANGE_FAILURE)
goto state_failed;
- unblock = TRUE;
break;
case GST_RTSP_SUSPEND_MODE_RESET:
GST_DEBUG ("media %p suspend to NULL", media);
* is actually from NULL to PLAY will create a new sequence
* number. */
g_ptr_array_foreach (priv->streams, (GFunc) do_set_seqnum, NULL);
- unblock = TRUE;
break;
default:
break;
}
- /* let the streams do the state changes freely, if any */
- if (unblock)
- media_streams_set_blocked (media, FALSE);
-
return TRUE;
/* ERRORS */
switch (priv->suspend_mode) {
case GST_RTSP_SUSPEND_MODE_NONE:
- gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
+ if (gst_rtsp_media_is_receive_only (media))
+ break;
+ if (media_streams_blocking (media)) {
+ gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING);
+ /* at this point the media pipeline has been updated and contain all
+ * specific transport parts: all active streams contain at least one sink
+ * element and it's safe to unblock all blocked streams */
+ media_unblock (media);
+ } else {
+ /* streams are not blocked and media is suspended from PAUSED */
+ gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
+ }
+ g_rec_mutex_unlock (&priv->state_lock);
+ if (gst_rtsp_media_get_status (media) == GST_RTSP_MEDIA_STATUS_ERROR) {
+ g_rec_mutex_lock (&priv->state_lock);
+ goto preroll_failed;
+ }
+ g_rec_mutex_lock (&priv->state_lock);
break;
case GST_RTSP_SUSPEND_MODE_PAUSE:
gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
case GST_RTSP_SUSPEND_MODE_RESET:
{
gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING);
+ /* at this point the media pipeline has been updated and contain all
+ * specific transport parts: all active streams contain at least one sink
+ * element and it's safe to unblock all blocked streams */
+ media_unblock (media);
if (!start_preroll (media))
goto start_failed;
media_set_pipeline_state_locked (GstRTSPMedia * media, GstState state)
{
GstRTSPMediaPrivate *priv = media->priv;
+ GstStateChangeReturn set_state_ret;
+ priv->expected_async_done = FALSE;
if (state == GST_STATE_NULL) {
gst_rtsp_media_unprepare (media);
} else {
if (state == GST_STATE_PLAYING)
/* make sure pads are not blocking anymore when going to PLAYING */
- media_streams_set_blocked (media, FALSE);
+ media_unblock (media);
- set_state (media, state);
-
- /* and suspend after pause */
- if (state == GST_STATE_PAUSED)
+ if (state == GST_STATE_PAUSED) {
+ set_state_ret = set_state (media, state);
+ if (set_state_ret == GST_STATE_CHANGE_ASYNC)
+ priv->expected_async_done = TRUE;
+ /* and suspend after pause */
gst_rtsp_media_suspend (media);
+ } else {
+ set_state (media, state);
+ }
}
}
}
priv = media->priv;
g_rec_mutex_lock (&priv->state_lock);
+
+ if (priv->status == GST_RTSP_MEDIA_STATUS_PREPARING
+ && gst_rtsp_media_is_shared (media)) {
+ g_rec_mutex_unlock (&priv->state_lock);
+ gst_rtsp_media_get_status (media);
+ g_rec_mutex_lock (&priv->state_lock);
+ }
if (priv->status == GST_RTSP_MEDIA_STATUS_ERROR)
goto error_status;
if (priv->status != GST_RTSP_MEDIA_STATUS_PREPARED &&
/* we just activated the first media, do the playing state change */
if (old_active == 0 && activate)
do_state = TRUE;
- /* if we have no more active media, do the downward state changes */
- else if (priv->n_active == 0)
+ /* if we have no more active media and prepare count is not indicate
+ * that there are new session/sessions ongoing,
+ * do the downward state changes */
+ else if (priv->n_active == 0 && priv->prepare_count <= 1)
do_state = TRUE;
else
do_state = FALSE;
}
/**
- * gst_rtsp_media_get_seekbale:
+ * gst_rtsp_media_seekable:
* @media: a #GstRTSPMedia
*
* Check if the pipeline for @media seek and up to what point in time,
*
* Returns: -1 if the stream is not seekable, 0 if seekable only to the beginning
* and > 0 to indicate the longest duration between any two random access points.
- * G_MAXINT64 means any value is possible.
+ * %G_MAXINT64 means any value is possible.
+ *
+ * Since: 1.14
*/
GstClockTimeDiff
gst_rtsp_media_seekable (GstRTSPMedia * media)
{
+ GstRTSPMediaPrivate *priv;
+ GstClockTimeDiff res;
+
g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
+ priv = media->priv;
+
/* Currently we are not able to seek on live streams,
* and no stream is seekable only to the beginning */
- return media->priv->seekable;
+ g_mutex_lock (&priv->lock);
+ res = priv->seekable;
+ g_mutex_unlock (&priv->lock);
+
+ return res;
+}
+
+/**
+ * gst_rtsp_media_complete_pipeline:
+ * @media: a #GstRTSPMedia
+ * @transports: (element-type GstRTSPTransport): a list of #GstRTSPTransport
+ *
+ * Add a receiver and sender parts to the pipeline based on the transport from
+ * SETUP.
+ *
+ * Returns: %TRUE if the media pipeline has been sucessfully updated.
+ *
+ * Since: 1.14
+ */
+gboolean
+gst_rtsp_media_complete_pipeline (GstRTSPMedia * media, GPtrArray * transports)
+{
+ GstRTSPMediaPrivate *priv;
+ guint i;
+
+ g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
+ g_return_val_if_fail (transports, FALSE);
+
+ GST_DEBUG_OBJECT (media, "complete pipeline");
+
+ priv = media->priv;
+
+ g_mutex_lock (&priv->lock);
+ for (i = 0; i < priv->streams->len; i++) {
+ GstRTSPStreamTransport *transport;
+ GstRTSPStream *stream;
+ const GstRTSPTransport *rtsp_transport;
+
+ transport = g_ptr_array_index (transports, i);
+ if (!transport)
+ continue;
+
+ stream = gst_rtsp_stream_transport_get_stream (transport);
+ if (!stream)
+ continue;
+
+ rtsp_transport = gst_rtsp_stream_transport_get_transport (transport);
+
+ if (!gst_rtsp_stream_complete_stream (stream, rtsp_transport)) {
+ g_mutex_unlock (&priv->lock);
+ return FALSE;
+ }
+ }
+
+ priv->complete = TRUE;
+ g_mutex_unlock (&priv->lock);
+
+ return TRUE;
+}
+
+/**
+ * gst_rtsp_media_is_receive_only:
+ *
+ * Returns: %TRUE if @media is receive-only, %FALSE otherwise.
+ * Since: 1.18
+ */
+gboolean
+gst_rtsp_media_is_receive_only (GstRTSPMedia * media)
+{
+ GstRTSPMediaPrivate *priv = media->priv;
+ gboolean receive_only = TRUE;
+ guint i;
+
+ for (i = 0; i < priv->streams->len; i++) {
+ GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
+ if (gst_rtsp_stream_is_sender (stream) ||
+ !gst_rtsp_stream_is_receiver (stream)) {
+ receive_only = FALSE;
+ break;
+ }
+ }
+
+ return receive_only;
+}
+
+/**
+ * gst_rtsp_media_has_completed_sender:
+ *
+ * See gst_rtsp_stream_is_complete(), gst_rtsp_stream_is_sender().
+ *
+ * Returns: whether @media has at least one complete sender stream.
+ * Since: 1.18
+ */
+gboolean
+gst_rtsp_media_has_completed_sender (GstRTSPMedia * media)
+{
+ GstRTSPMediaPrivate *priv = media->priv;
+ gboolean sender = FALSE;
+ guint i;
+
+ g_mutex_lock (&priv->lock);
+ for (i = 0; i < priv->streams->len; i++) {
+ GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
+ if (gst_rtsp_stream_is_complete (stream))
+ if (gst_rtsp_stream_is_sender (stream) ||
+ !gst_rtsp_stream_is_receiver (stream)) {
+ sender = TRUE;
+ break;
+ }
+ }
+ g_mutex_unlock (&priv->lock);
+
+ return sender;
+}
+
+/**
+ * gst_rtsp_media_set_rate_control:
+ *
+ * Define whether @media will follow the Rate-Control=no behaviour as specified
+ * in the ONVIF replay spec.
+ *
+ * Since: 1.18
+ */
+void
+gst_rtsp_media_set_rate_control (GstRTSPMedia * media, gboolean enabled)
+{
+ GstRTSPMediaPrivate *priv;
+ guint i;
+
+ g_return_if_fail (GST_IS_RTSP_MEDIA (media));
+
+ GST_LOG_OBJECT (media, "%s rate control", enabled ? "Enabling" : "Disabling");
+
+ priv = media->priv;
+
+ g_mutex_lock (&priv->lock);
+ priv->do_rate_control = enabled;
+ for (i = 0; i < priv->streams->len; i++) {
+ GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
+
+ gst_rtsp_stream_set_rate_control (stream, enabled);
+
+ }
+ g_mutex_unlock (&priv->lock);
+}
+
+/**
+ * gst_rtsp_media_get_rate_control:
+ *
+ * Returns: whether @media will follow the Rate-Control=no behaviour as specified
+ * in the ONVIF replay spec.
+ *
+ * Since: 1.18
+ */
+gboolean
+gst_rtsp_media_get_rate_control (GstRTSPMedia * media)
+{
+ GstRTSPMediaPrivate *priv;
+ gboolean res;
+
+ g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
+
+ priv = media->priv;
+
+ g_mutex_lock (&priv->lock);
+ res = priv->do_rate_control;
+ g_mutex_unlock (&priv->lock);
+
+ return res;
}