*/
/**
* SECTION:element-rtspsrc
+ * @title: rtspsrc
*
* Makes a connection to an RTSP server and read the data.
* rtspsrc strictly follows RFC 2326 and therefore does not (yet) support
* rtspsrc acts like a live source and will therefore only generate data in the
* PLAYING state.
*
- * <refsect2>
- * <title>Example launch line</title>
+ * If a RTP session times out then the rtspsrc will generate an element message
+ * named "GstRTSPSrcTimeout". Currently this is only supported for timeouts
+ * triggered by RTCP.
+ *
+ * The message's structure contains three fields:
+ *
+ * #GstRTSPSrcTimeoutCause `cause`: the cause of the timeout.
+ *
+ * #gint `stream-number`: an internal identifier of the stream that timed out.
+ *
+ * #guint `ssrc`: the SSRC of the stream that timed out.
+ *
+ * ## Example launch line
* |[
* gst-launch-1.0 rtspsrc location=rtsp://some.server/url ! fakesink
* ]| Establish a connection to an RTSP server and send the raw RTP packets to a
* fakesink.
- * </refsect2>
+ *
*/
#ifdef HAVE_CONFIG_H
SIGNAL_ACCEPT_CERTIFICATE,
SIGNAL_BEFORE_SEND,
SIGNAL_PUSH_BACKCHANNEL_BUFFER,
+ SIGNAL_GET_PARAMETER,
+ SIGNAL_GET_PARAMETERS,
+ SIGNAL_SET_PARAMETER,
LAST_SIGNAL
};
#define DEFAULT_MAX_TS_OFFSET G_GINT64_CONSTANT(3000000000)
#define DEFAULT_VERSION GST_RTSP_VERSION_1_0
#define DEFAULT_BACKCHANNEL GST_RTSP_BACKCHANNEL_NONE
+#define DEFAULT_TEARDOWN_TIMEOUT (100 * GST_MSECOND)
enum
{
PROP_MAX_TS_OFFSET,
PROP_DEFAULT_VERSION,
PROP_BACKCHANNEL,
+ PROP_TEARDOWN_TIMEOUT,
};
#define GST_TYPE_RTSP_NAT_METHOD (gst_rtsp_nat_method_get_type())
"rtsp-status-reason", G_TYPE_STRING, GST_STR_NULL((response_msg)->type_data.response.reason), NULL)); \
} while (0)
+typedef struct _ParameterRequest
+{
+ gint cmd;
+ gchar *content_type;
+ GString *body;
+ GstPromise *promise;
+} ParameterRequest;
+
static void gst_rtspsrc_finalize (GObject * object);
static void gst_rtspsrc_set_property (GObject * object, guint prop_id,
static void
gst_rtspsrc_print_sdp_message (GstRTSPSrc * src, const GstSDPMessage * msg);
+static GstRTSPResult
+gst_rtspsrc_get_parameter (GstRTSPSrc * src, ParameterRequest * req);
+
+static GstRTSPResult
+gst_rtspsrc_set_parameter (GstRTSPSrc * src, ParameterRequest * req);
+
+static gboolean get_parameter (GstRTSPSrc * src, const gchar * parameter,
+ const gchar * content_type, GstPromise * promise);
+
+static gboolean get_parameters (GstRTSPSrc * src, gchar ** parameters,
+ const gchar * content_type, GstPromise * promise);
+
+static gboolean set_parameter (GstRTSPSrc * src, const gchar * name,
+ const gchar * value, const gchar * content_type, GstPromise * promise);
+
static GstFlowReturn gst_rtspsrc_push_backchannel_buffer (GstRTSPSrc * src,
guint id, GstSample * sample);
} PtMapItem;
/* commands we send to out loop to notify it of events */
-#define CMD_OPEN (1 << 0)
-#define CMD_PLAY (1 << 1)
-#define CMD_PAUSE (1 << 2)
-#define CMD_CLOSE (1 << 3)
-#define CMD_WAIT (1 << 4)
-#define CMD_RECONNECT (1 << 5)
-#define CMD_LOOP (1 << 6)
+#define CMD_OPEN (1 << 0)
+#define CMD_PLAY (1 << 1)
+#define CMD_PAUSE (1 << 2)
+#define CMD_CLOSE (1 << 3)
+#define CMD_WAIT (1 << 4)
+#define CMD_RECONNECT (1 << 5)
+#define CMD_LOOP (1 << 6)
+#define CMD_GET_PARAMETER (1 << 7)
+#define CMD_SET_PARAMETER (1 << 8)
/* mask for all commands */
-#define CMD_ALL ((CMD_LOOP << 1) - 1)
+#define CMD_ALL ((CMD_SET_PARAMETER << 1) - 1)
#define GST_ELEMENT_PROGRESS(el, type, code, text) \
G_STMT_START { \
return "RECONNECT";
case CMD_LOOP:
return "LOOP";
+ case CMD_GET_PARAMETER:
+ return "GET_PARAMETER";
+ case CMD_SET_PARAMETER:
+ return "SET_PARAMETER";
}
return "unknown";
/**
* GstRTSPSrc:port-range:
*
- * Configure the client port numbers that can be used to recieve RTP and
+ * Configure the client port numbers that can be used to receive RTP and
* RTCP.
*/
g_object_class_install_property (gobject_class, PROP_PORT_RANGE,
G_PARAM_STATIC_STRINGS));
/**
- * GstRtpBin:max-ts-offset:
+ * GstRTSPSrc:max-ts-offset:
*
* Used to set an upper limit of how large a time offset may be. This
* is used to protect against unrealistic values as a result of either
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
- * GstRtpSrc:backchannel
+ * GstRTSPSrc:backchannel
*
* Select a type of backchannel to setup with the RTSP server.
* Default value is "none". Allowed values are "none" and "onvif".
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
+ * GstRtspSrc:teardown-timeout
+ *
+ * When transitioning PAUSED-READY, allow up to timeout (in nanoseconds)
+ * delay in order to send teardown (0 = disabled)
+ *
+ * Since: 1.14
+ */
+ g_object_class_install_property (gobject_class, PROP_TEARDOWN_TIMEOUT,
+ g_param_spec_uint64 ("teardown-timeout", "Teardown Timeout",
+ "When transitioning PAUSED-READY, allow up to timeout (in nanoseconds) "
+ "delay in order to send teardown (0 = disabled)",
+ 0, G_MAXUINT64, DEFAULT_TEARDOWN_TIMEOUT,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
* GstRTSPSrc::handle-request:
* @rtspsrc: a #GstRTSPSrc
* @request: a #GstRTSPMessage
* @rtspsrc: a #GstRTSPSrc
* @sdp: a #GstSDPMessage
*
- * Emited when the client has retrieved the SDP and before it configures the
+ * Emitted when the client has retrieved the SDP and before it configures the
* streams in the SDP. @sdp can be inspected and modified.
*
* This signal is called from the streaming thread, you should therefore not
* @num: the stream number
* @caps: the stream caps
*
- * Emited before the client decides to configure the stream @num with
+ * Emitted before the client decides to configure the stream @num with
* @caps.
*
* Returns: %TRUE when the stream should be selected, %FALSE when the stream
* @rtspsrc: a #GstRTSPSrc
* @manager: a #GstElement
*
- * Emited after a new manager (like rtpbin) was created and the default
+ * Emitted after a new manager (like rtpbin) was created and the default
* properties were configured.
*
* Since: 1.4
* @rtspsrc: a #GstRTSPSrc
* @num: the stream number
*
- * Signal emited to get the crypto parameters relevant to the RTCP
+ * Signal emitted to get the crypto parameters relevant to the RTCP
* stream. User should provide the key and the RTCP encryption ciphers
* and authentication, and return them wrapped in a GstCaps.
*
push_backchannel_buffer), NULL, NULL, NULL, GST_TYPE_FLOW_RETURN, 2,
G_TYPE_UINT, GST_TYPE_BUFFER);
+ /**
+ * GstRTSPSrc::get-parameter:
+ * @rtspsrc: a #GstRTSPSrc
+ * @parameter: the parameter name
+ * @parameter: the content type
+ * @parameter: a pointer to #GstPromise
+ *
+ * Handle the GET_PARAMETER signal.
+ *
+ * Returns: %TRUE when the command could be issued, %FALSE otherwise
+ *
+ */
+ gst_rtspsrc_signals[SIGNAL_GET_PARAMETER] =
+ g_signal_new ("get-parameter", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRTSPSrcClass,
+ get_parameter), NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_BOOLEAN, 3, G_TYPE_STRING, G_TYPE_STRING, GST_TYPE_PROMISE);
+
+ /**
+ * GstRTSPSrc::get-parameters:
+ * @rtspsrc: a #GstRTSPSrc
+ * @parameter: a NULL-terminated array of parameters
+ * @parameter: the content type
+ * @parameter: a pointer to #GstPromise
+ *
+ * Handle the GET_PARAMETERS signal.
+ *
+ * Returns: %TRUE when the command could be issued, %FALSE otherwise
+ *
+ */
+ gst_rtspsrc_signals[SIGNAL_GET_PARAMETERS] =
+ g_signal_new ("get-parameters", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRTSPSrcClass,
+ get_parameters), NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_BOOLEAN, 3, G_TYPE_STRV, G_TYPE_STRING, GST_TYPE_PROMISE);
+
+ /**
+ * GstRTSPSrc::set-parameter:
+ * @rtspsrc: a #GstRTSPSrc
+ * @parameter: the parameter name
+ * @parameter: the parameter value
+ * @parameter: the content type
+ * @parameter: a pointer to #GstPromise
+ *
+ * Handle the SET_PARAMETER signal.
+ *
+ * Returns: %TRUE when the command could be issued, %FALSE otherwise
+ *
+ */
+ gst_rtspsrc_signals[SIGNAL_SET_PARAMETER] =
+ g_signal_new ("set-parameter", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRTSPSrcClass,
+ set_parameter), NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_BOOLEAN, 4, G_TYPE_STRING, G_TYPE_STRING, G_TYPE_STRING,
+ GST_TYPE_PROMISE);
+
gstelement_class->send_event = gst_rtspsrc_send_event;
gstelement_class->provide_clock = gst_rtspsrc_provide_clock;
gstelement_class->change_state = gst_rtspsrc_change_state;
gstbin_class->handle_message = gst_rtspsrc_handle_message;
klass->push_backchannel_buffer = gst_rtspsrc_push_backchannel_buffer;
+ klass->get_parameter = GST_DEBUG_FUNCPTR (get_parameter);
+ klass->get_parameters = GST_DEBUG_FUNCPTR (get_parameters);
+ klass->set_parameter = GST_DEBUG_FUNCPTR (set_parameter);
gst_rtsp_ext_list_init ();
}
+static gboolean
+validate_set_get_parameter_name (const gchar * parameter_name)
+{
+ gchar *ptr = (gchar *) parameter_name;
+
+ while (*ptr) {
+ /* Don't allow '\r', '\n', \'t', ' ' etc in the parameter name */
+ if (g_ascii_isspace (*ptr) || g_ascii_iscntrl (*ptr)) {
+ GST_DEBUG ("invalid parameter name '%s'", parameter_name);
+ return FALSE;
+ }
+ ptr++;
+ }
+ return TRUE;
+}
+
+static gboolean
+validate_set_get_parameters (gchar ** parameter_names)
+{
+ while (*parameter_names) {
+ if (!validate_set_get_parameter_name (*parameter_names)) {
+ return FALSE;
+ }
+ parameter_names++;
+ }
+ return TRUE;
+}
+
+static gboolean
+get_parameter (GstRTSPSrc * src, const gchar * parameter,
+ const gchar * content_type, GstPromise * promise)
+{
+ gchar *parameters[] = { (gchar *) parameter, NULL };
+
+ GST_LOG_OBJECT (src, "get_parameter: %s", GST_STR_NULL (parameter));
+
+ if (parameter == NULL || parameter[0] == '\0' || promise == NULL) {
+ GST_DEBUG ("invalid input");
+ return FALSE;
+ }
+
+ return get_parameters (src, parameters, content_type, promise);
+}
+
+static gboolean
+get_parameters (GstRTSPSrc * src, gchar ** parameters,
+ const gchar * content_type, GstPromise * promise)
+{
+ ParameterRequest *req;
+
+ GST_LOG_OBJECT (src, "get_parameters: %d", g_strv_length (parameters));
+
+ if (parameters == NULL || promise == NULL) {
+ GST_DEBUG ("invalid input");
+ return FALSE;
+ }
+
+ if (src->state == GST_RTSP_STATE_INVALID) {
+ GST_DEBUG ("invalid state");
+ return FALSE;
+ }
+
+ if (!validate_set_get_parameters (parameters)) {
+ return FALSE;
+ }
+
+ req = g_new0 (ParameterRequest, 1);
+ req->promise = gst_promise_ref (promise);
+ req->cmd = CMD_GET_PARAMETER;
+ /* Set the request body according to RFC 2326 or RFC 7826 */
+ req->body = g_string_new (NULL);
+ while (*parameters) {
+ g_string_append_printf (req->body, "%s:\r\n", *parameters);
+ parameters++;
+ }
+ if (content_type)
+ req->content_type = g_strdup (content_type);
+
+ GST_OBJECT_LOCK (src);
+ g_queue_push_tail (&src->set_get_param_q, req);
+ GST_OBJECT_UNLOCK (src);
+
+ gst_rtspsrc_loop_send_cmd (src, CMD_GET_PARAMETER, CMD_LOOP);
+
+ return TRUE;
+}
+
+static gboolean
+set_parameter (GstRTSPSrc * src, const gchar * name, const gchar * value,
+ const gchar * content_type, GstPromise * promise)
+{
+ ParameterRequest *req;
+
+ GST_LOG_OBJECT (src, "set_parameter: %s: %s", GST_STR_NULL (name),
+ GST_STR_NULL (value));
+
+ if (name == NULL || name[0] == '\0' || value == NULL || promise == NULL) {
+ GST_DEBUG ("invalid input");
+ return FALSE;
+ }
+
+ if (src->state == GST_RTSP_STATE_INVALID) {
+ GST_DEBUG ("invalid state");
+ return FALSE;
+ }
+
+ if (!validate_set_get_parameter_name (name)) {
+ return FALSE;
+ }
+
+ req = g_new0 (ParameterRequest, 1);
+ req->cmd = CMD_SET_PARAMETER;
+ req->promise = gst_promise_ref (promise);
+ req->body = g_string_new (NULL);
+ /* Set the request body according to RFC 2326 or RFC 7826 */
+ g_string_append_printf (req->body, "%s: %s\r\n", name, value);
+ if (content_type)
+ req->content_type = g_strdup (content_type);
+
+ GST_OBJECT_LOCK (src);
+ g_queue_push_tail (&src->set_get_param_q, req);
+ GST_OBJECT_UNLOCK (src);
+
+ gst_rtspsrc_loop_send_cmd (src, CMD_SET_PARAMETER, CMD_LOOP);
+
+ return TRUE;
+}
+
static void
gst_rtspsrc_init (GstRTSPSrc * src)
{
src->max_ts_offset_is_set = FALSE;
src->default_version = DEFAULT_VERSION;
src->version = GST_RTSP_VERSION_INVALID;
+ src->teardown_timeout = DEFAULT_TEARDOWN_TIMEOUT;
/* get a list of all extensions */
src->extensions = gst_rtsp_ext_list_get ();
/* protects our state changes from multiple invocations */
g_rec_mutex_init (&src->state_rec_lock);
+ g_queue_init (&src->set_get_param_q);
+
src->state = GST_RTSP_STATE_INVALID;
g_mutex_init (&src->conninfo.send_lock);
g_mutex_init (&src->conninfo.recv_lock);
+ g_cond_init (&src->cmd_cond);
GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_SOURCE);
gst_bin_set_suppressed_flags (GST_BIN (src),
}
static void
+free_param_data (ParameterRequest * req)
+{
+ gst_promise_unref (req->promise);
+ if (req->body)
+ g_string_free (req->body, TRUE);
+ g_free (req->content_type);
+ g_free (req);
+}
+
+static void
+free_param_queue (gpointer data)
+{
+ ParameterRequest *req = data;
+
+ gst_promise_expire (req->promise);
+ free_param_data (req);
+}
+
+static void
gst_rtspsrc_finalize (GObject * object)
{
GstRTSPSrc *rtspsrc;
g_mutex_clear (&rtspsrc->conninfo.send_lock);
g_mutex_clear (&rtspsrc->conninfo.recv_lock);
+ g_cond_clear (&rtspsrc->cmd_cond);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
const gchar *str;
str = g_value_get_string (value);
- if (sscanf (str, "%u-%u", &rtspsrc->client_port_range.min,
+ if (str == NULL || sscanf (str, "%u-%u", &rtspsrc->client_port_range.min,
&rtspsrc->client_port_range.max) != 2) {
rtspsrc->client_port_range.min = 0;
rtspsrc->client_port_range.max = 0;
case PROP_BACKCHANNEL:
rtspsrc->backchannel = g_value_get_enum (value);
break;
+ case PROP_TEARDOWN_TIMEOUT:
+ rtspsrc->teardown_timeout = g_value_get_uint64 (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_BACKCHANNEL:
g_value_set_enum (value, rtspsrc->backchannel);
break;
+ case PROP_TEARDOWN_TIMEOUT:
+ g_value_set_uint64 (value, rtspsrc->teardown_timeout);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
else
goto unknown_proto;
- if (gst_sdp_media_get_attribute_val (media, "recvonly") != NULL &&
+ if (gst_sdp_media_get_attribute_val (media, "sendonly") != NULL &&
/* We want to setup caps for streams configured as backchannel */
!stream->is_backchannel && src->backchannel != BACKCHANNEL_NONE)
- goto recvonly_media;
+ goto sendonly_media;
/* Parse global SDP attributes once */
global_caps = gst_caps_new_empty_simple ("application/x-unknown");
GST_ERROR_OBJECT (src, "unknown proto in media: '%s'", proto);
return;
}
-recvonly_media:
+sendonly_media:
{
- GST_WARNING_OBJECT (src, "recvonly media ignored, no backchannel");
+ GST_DEBUG_OBJECT (src, "sendonly media ignored, no backchannel");
return;
}
}
g_mutex_init (&stream->conninfo.recv_lock);
g_array_set_clear_func (stream->ptmap, (GDestroyNotify) clear_ptmap_item);
- /* stream is recvonly and onvif backchannel is requested */
- if (gst_sdp_media_get_attribute_val (media, "recvonly") != NULL &&
+ /* stream is sendonly and onvif backchannel is requested */
+ if (gst_sdp_media_get_attribute_val (media, "sendonly") != NULL &&
src->backchannel != BACKCHANNEL_NONE)
stream->is_backchannel = TRUE;
for (i = 0; i < 2; i++) {
if (stream->udpsrc[i]) {
gst_element_set_state (stream->udpsrc[i], GST_STATE_NULL);
- gst_bin_remove (GST_BIN_CAST (src), stream->udpsrc[i]);
+ if (gst_object_has_as_parent (GST_OBJECT (stream->udpsrc[i]),
+ GST_OBJECT (src)))
+ gst_bin_remove (GST_BIN_CAST (src), stream->udpsrc[i]);
gst_object_unref (stream->udpsrc[i]);
}
if (stream->channelpad[i])
if (stream->udpsink[i]) {
gst_element_set_state (stream->udpsink[i], GST_STATE_NULL);
- gst_bin_remove (GST_BIN_CAST (src), stream->udpsink[i]);
+ if (gst_object_has_as_parent (GST_OBJECT (stream->udpsink[i]),
+ GST_OBJECT (src)))
+ gst_bin_remove (GST_BIN_CAST (src), stream->udpsink[i]);
gst_object_unref (stream->udpsink[i]);
}
}
gst_object_unref (src->provided_clock);
src->provided_clock = NULL;
}
+
+ /* free parameter requests queue */
+ if (!g_queue_is_empty (&src->set_get_param_q))
+ g_queue_free_full (&src->set_get_param_q, free_param_queue);
+
}
static gboolean
}
static void
-gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing)
+gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing,
+ guint32 seqnum)
{
GstEvent *event;
gint cmd;
if (flush) {
event = gst_event_new_flush_start ();
+ gst_event_set_seqnum (event, seqnum);
GST_DEBUG_OBJECT (src, "start flush");
cmd = CMD_WAIT;
state = GST_STATE_PAUSED;
} else {
event = gst_event_new_flush_stop (FALSE);
+ gst_event_set_seqnum (event, seqnum);
GST_DEBUG_OBJECT (src, "stop flush; playing %d", playing);
cmd = CMD_LOOP;
if (playing)
GList *walk;
const gchar *seek_style = NULL;
- if (event) {
- GST_DEBUG_OBJECT (src, "doing seek with event %" GST_PTR_FORMAT, event);
+ GST_DEBUG_OBJECT (src, "doing seek with event %" GST_PTR_FORMAT, event);
- gst_event_parse_seek (event, &rate, &format, &flags,
- &cur_type, &cur, &stop_type, &stop);
+ gst_event_parse_seek (event, &rate, &format, &flags,
+ &cur_type, &cur, &stop_type, &stop);
- /* no negative rates yet */
- if (rate < 0.0)
- goto negative_rate;
+ /* no negative rates yet */
+ if (rate < 0.0)
+ goto negative_rate;
- /* we need TIME format */
- if (format != src->segment.format)
- goto no_format;
+ /* we need TIME format */
+ if (format != src->segment.format)
+ goto no_format;
- /* Check if we are not at all seekable */
- if (src->seekable == -1.0)
- goto not_seekable;
+ /* Check if we are not at all seekable */
+ if (src->seekable == -1.0)
+ goto not_seekable;
- /* Additional seeking-to-beginning-only check */
- if (src->seekable == 0.0 && cur != 0)
- goto not_seekable;
- } else {
- GST_DEBUG_OBJECT (src, "doing seek without event");
- flags = 0;
- cur_type = GST_SEEK_TYPE_SET;
- stop_type = GST_SEEK_TYPE_SET;
- }
+ /* Additional seeking-to-beginning-only check */
+ if (src->seekable == 0.0 && cur != 0)
+ goto not_seekable;
if (flags & GST_SEEK_FLAG_SEGMENT)
goto invalid_segment_flag;
* blocking in preroll). */
if (flush) {
GST_DEBUG_OBJECT (src, "starting flush");
- gst_rtspsrc_flush (src, TRUE, FALSE);
+ gst_rtspsrc_flush (src, TRUE, FALSE, gst_event_get_seqnum (event));
} else {
if (src->task) {
gst_task_pause (src->task);
/* configure the seek parameters in the seeksegment. We will then have the
* right values in the segment to perform the seek */
- if (event) {
- GST_DEBUG_OBJECT (src, "configuring seek");
- gst_segment_do_seek (&seeksegment, rate, format, flags,
- cur_type, cur, stop_type, stop, &update);
- }
+ GST_DEBUG_OBJECT (src, "configuring seek");
+ gst_segment_do_seek (&seeksegment, rate, format, flags,
+ cur_type, cur, stop_type, stop, &update);
/* figure out the last position we need to play. If it's configured (stop !=
* -1), use that, else we play until the total duration of the file */
if (flush) {
/* if we started flush, we stop now */
GST_DEBUG_OBJECT (src, "stopping flush");
- gst_rtspsrc_flush (src, FALSE, playing);
+ gst_rtspsrc_flush (src, FALSE, playing, gst_event_get_seqnum (event));
}
/* now we did the seek and can activate the new segment values */
}
}
+static GstPadProbeReturn
+udpsrc_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+ guint32 *segment_seqnum = user_data;
+
+ switch (GST_EVENT_TYPE (info->data)) {
+ case GST_EVENT_SEGMENT:
+ if (!gst_event_is_writable (info->data))
+ info->data = gst_event_make_writable (info->data);
+
+ *segment_seqnum = gst_event_get_seqnum (info->data);
+ default:
+ break;
+ }
+
+ return GST_PAD_PROBE_OK;
+}
+
static gboolean
copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
{
gst_pad_set_active (stream->srcpad, TRUE);
gst_pad_sticky_events_foreach (pad, copy_sticky_events, stream->srcpad);
- /* don't add the srcpad if this is a recvonly stream */
+ /* don't add the srcpad if this is a sendonly stream */
if (stream->is_backchannel)
add_backchannel_fakesink (src, stream, stream->srcpad);
else
}
static void
-on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
+on_timeout_common (GObject * session, GObject * source, GstRTSPStream * stream)
{
GstRTSPSrc *src = stream->parent;
guint ssrc;
}
static void
+on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
+{
+ GstRTSPSrc *src = stream->parent;
+
+ /* timeout, post element message */
+ gst_element_post_message (GST_ELEMENT_CAST (src),
+ gst_message_new_element (GST_OBJECT_CAST (src),
+ gst_structure_new ("GstRTSPSrcTimeout",
+ "cause", G_TYPE_ENUM, GST_RTSP_SRC_TIMEOUT_CAUSE_RTCP,
+ "stream-number", G_TYPE_INT, stream->id, "ssrc", G_TYPE_UINT,
+ stream->ssrc, NULL)));
+
+ on_timeout_common (session, source, stream);
+}
+
+static void
on_npt_stop (GstElement * rtpbin, guint session, guint ssrc, GstRTSPSrc * src)
{
GstRTSPStream *stream;
g_signal_connect (rtpsession, "on-bye-ssrc", (GCallback) on_bye_ssrc,
stream);
- g_signal_connect (rtpsession, "on-bye-timeout", (GCallback) on_timeout,
- stream);
+ g_signal_connect (rtpsession, "on-bye-timeout",
+ (GCallback) on_timeout_common, stream);
g_signal_connect (rtpsession, "on-timeout", (GCallback) on_timeout,
stream);
g_signal_connect (rtpsession, "on-ssrc-active",
GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocked, src, NULL);
+ gst_pad_add_probe (stream->blockedpad,
+ GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, udpsrc_probe_cb,
+ &(stream->segment_seqnum[0]), NULL);
+
if (stream->channelpad[0]) {
GST_DEBUG_OBJECT (src, "connecting UDP source 0 to manager");
/* configure for UDP delivery, we need to connect the UDP pads to
GST_DEBUG_OBJECT (src, "connecting UDP source 1 to manager");
pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
+ gst_pad_add_probe (pad,
+ GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, udpsrc_probe_cb,
+ &(stream->segment_seqnum[1]), NULL);
gst_pad_link_full (pad, stream->channelpad[1],
GST_PAD_LINK_CHECK_NOTHING);
gst_object_unref (pad);
goto done;
if (stream->udpsrc[0]) {
- gst_event_ref (event);
- res = gst_element_send_event (stream->udpsrc[0], event);
+ GstEvent *sent_event;
+
+ if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
+ sent_event = gst_event_new_eos ();
+ gst_event_set_seqnum (sent_event, stream->segment_seqnum[0]);
+ } else {
+ sent_event = gst_event_ref (event);
+ }
+
+ res = gst_element_send_event (stream->udpsrc[0], sent_event);
} else if (stream->channelpad[0]) {
gst_event_ref (event);
if (GST_PAD_IS_SRC (stream->channelpad[0]))
}
if (stream->udpsrc[1]) {
- gst_event_ref (event);
- res &= gst_element_send_event (stream->udpsrc[1], event);
+ GstEvent *sent_event;
+
+ if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
+ sent_event = gst_event_new_eos ();
+ if (stream->segment_seqnum[1] != GST_SEQNUM_INVALID) {
+ gst_event_set_seqnum (sent_event, stream->segment_seqnum[1]);
+ }
+ } else {
+ sent_event = gst_event_ref (event);
+ }
+
+ res &= gst_element_send_event (stream->udpsrc[1], sent_event);
} else if (stream->channelpad[1]) {
gst_event_ref (event);
if (GST_PAD_IS_SRC (stream->channelpad[1]))
case CMD_PAUSE:
GST_ELEMENT_PROGRESS (src, START, "request", ("Sending PAUSE request"));
break;
+ case CMD_GET_PARAMETER:
+ GST_ELEMENT_PROGRESS (src, START, "request",
+ ("Sending GET_PARAMETER request"));
+ break;
+ case CMD_SET_PARAMETER:
+ GST_ELEMENT_PROGRESS (src, START, "request",
+ ("Sending SET_PARAMETER request"));
+ break;
case CMD_CLOSE:
GST_ELEMENT_PROGRESS (src, START, "close", ("Closing Stream"));
break;
case CMD_PAUSE:
GST_ELEMENT_PROGRESS (src, COMPLETE, "request", ("Sent PAUSE request"));
break;
+ case CMD_GET_PARAMETER:
+ GST_ELEMENT_PROGRESS (src, COMPLETE, "request",
+ ("Sent GET_PARAMETER request"));
+ break;
+ case CMD_SET_PARAMETER:
+ GST_ELEMENT_PROGRESS (src, COMPLETE, "request",
+ ("Sent SET_PARAMETER request"));
+ break;
case CMD_CLOSE:
GST_ELEMENT_PROGRESS (src, COMPLETE, "close", ("Closed Stream"));
break;
case CMD_PAUSE:
GST_ELEMENT_PROGRESS (src, CANCELED, "request", ("PAUSE canceled"));
break;
+ case CMD_GET_PARAMETER:
+ GST_ELEMENT_PROGRESS (src, CANCELED, "request",
+ ("GET_PARAMETER canceled"));
+ break;
+ case CMD_SET_PARAMETER:
+ GST_ELEMENT_PROGRESS (src, CANCELED, "request",
+ ("SET_PARAMETER canceled"));
+ break;
case CMD_CLOSE:
GST_ELEMENT_PROGRESS (src, CANCELED, "close", ("Close canceled"));
break;
case CMD_PAUSE:
GST_ELEMENT_PROGRESS (src, ERROR, "request", ("PAUSE failed"));
break;
+ case CMD_GET_PARAMETER:
+ GST_ELEMENT_PROGRESS (src, ERROR, "request", ("GET_PARAMETER failed"));
+ break;
+ case CMD_SET_PARAMETER:
+ GST_ELEMENT_PROGRESS (src, ERROR, "request", ("SET_PARAMETER failed"));
+ break;
case CMD_CLOSE:
GST_ELEMENT_PROGRESS (src, ERROR, "close", ("Close failed"));
break;
GST_OBJECT_LOCK (src);
old = src->pending_cmd;
+
if (old == CMD_RECONNECT) {
GST_DEBUG_OBJECT (src, "ignore, we were reconnecting");
cmd = CMD_RECONNECT;
* still the pending command. */
GST_DEBUG_OBJECT (src, "ignore, we were closing");
cmd = CMD_CLOSE;
+ } else if (old == CMD_SET_PARAMETER) {
+ GST_DEBUG_OBJECT (src, "ignore, we have a pending %s", cmd_to_string (old));
+ cmd = CMD_SET_PARAMETER;
+ } else if (old == CMD_GET_PARAMETER) {
+ GST_DEBUG_OBJECT (src, "ignore, we have a pending %s", cmd_to_string (old));
+ cmd = CMD_GET_PARAMETER;
} else if (old != CMD_WAIT) {
src->pending_cmd = CMD_WAIT;
GST_OBJECT_UNLOCK (src);
}
static gboolean
+gst_rtspsrc_loop_send_cmd_and_wait (GstRTSPSrc * src, gint cmd, gint mask,
+ GstClockTime timeout)
+{
+ gboolean flushed = gst_rtspsrc_loop_send_cmd (src, cmd, mask);
+
+ if (timeout > 0) {
+ gint64 end_time = g_get_monotonic_time () + (timeout / 1000);
+ GST_OBJECT_LOCK (src);
+ while (src->pending_cmd == cmd || src->busy_cmd == cmd) {
+ if (!g_cond_wait_until (&src->cmd_cond, GST_OBJECT_GET_LOCK (src),
+ end_time)) {
+ GST_WARNING_OBJECT (src,
+ "Timed out waiting for TEARDOWN to be processed.");
+ break; /* timeout passed */
+ }
+ }
+ GST_OBJECT_UNLOCK (src);
+ }
+ return flushed;
+}
+
+static gboolean
gst_rtspsrc_loop (GstRTSPSrc * src)
{
GstFlowReturn ret;
/* do TEARDOWN */
res =
gst_rtspsrc_init_request (src, &request, GST_RTSP_TEARDOWN, setup_url);
+ GST_LOG_OBJECT (src, "Teardown on %s", setup_url);
if (res < 0)
goto create_request_failed;
gst_rtspsrc_thread (GstRTSPSrc * src)
{
gint cmd;
+ ParameterRequest *req = NULL;
GST_OBJECT_LOCK (src);
cmd = src->pending_cmd;
if (cmd == CMD_RECONNECT || cmd == CMD_PLAY || cmd == CMD_PAUSE
- || cmd == CMD_LOOP || cmd == CMD_OPEN)
- src->pending_cmd = CMD_LOOP;
- else
+ || cmd == CMD_LOOP || cmd == CMD_OPEN || cmd == CMD_GET_PARAMETER
+ || cmd == CMD_SET_PARAMETER) {
+ if (g_queue_is_empty (&src->set_get_param_q)) {
+ src->pending_cmd = CMD_LOOP;
+ } else {
+ ParameterRequest *next_req;
+ req = g_queue_pop_head (&src->set_get_param_q);
+ next_req = g_queue_peek_head (&src->set_get_param_q);
+ src->pending_cmd = next_req ? next_req->cmd : CMD_LOOP;
+ }
+ } else
src->pending_cmd = CMD_WAIT;
GST_DEBUG_OBJECT (src, "got command %s", cmd_to_string (cmd));
case CMD_CLOSE:
gst_rtspsrc_close (src, TRUE, FALSE);
break;
+ case CMD_GET_PARAMETER:
+ gst_rtspsrc_get_parameter (src, req);
+ break;
+ case CMD_SET_PARAMETER:
+ gst_rtspsrc_set_parameter (src, req);
+ break;
case CMD_LOOP:
gst_rtspsrc_loop (src);
break;
}
GST_OBJECT_LOCK (src);
+ /* No more cmds, wake any waiters */
+ g_cond_broadcast (&src->cmd_cond);
/* and go back to sleep */
if (src->pending_cmd == CMD_WAIT) {
if (src->task)
ret = GST_STATE_CHANGE_NO_PREROLL;
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
- gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_CLOSE, CMD_ALL);
+ gst_rtspsrc_loop_send_cmd_and_wait (rtspsrc, CMD_CLOSE, CMD_ALL,
+ rtspsrc->teardown_timeout);
ret = GST_STATE_CHANGE_SUCCESS;
break;
case GST_STATE_CHANGE_READY_TO_NULL:
iface->set_uri = gst_rtspsrc_uri_set_uri;
}
+
+/* send GET_PARAMETER */
+static GstRTSPResult
+gst_rtspsrc_get_parameter (GstRTSPSrc * src, ParameterRequest * req)
+{
+ GstRTSPMessage request = { 0 };
+ GstRTSPMessage response = { 0 };
+ GstRTSPResult res;
+ GstRTSPStatusCode code = GST_RTSP_STS_OK;
+ const gchar *control;
+ gchar *recv_body = NULL;
+ guint recv_body_len;
+
+ GST_DEBUG_OBJECT (src, "creating server get_parameter");
+
+ if ((res = gst_rtspsrc_ensure_open (src, FALSE)) < 0)
+ goto open_failed;
+
+ control = get_aggregate_control (src);
+ if (control == NULL)
+ goto no_control;
+
+ if (!(src->methods & GST_RTSP_GET_PARAMETER))
+ goto not_supported;
+
+ gst_rtspsrc_connection_flush (src, FALSE);
+
+ res = gst_rtsp_message_init_request (&request, GST_RTSP_GET_PARAMETER,
+ control);
+ if (res < 0)
+ goto create_request_failed;
+
+ res = gst_rtsp_message_add_header (&request, GST_RTSP_HDR_CONTENT_TYPE,
+ req->content_type == NULL ? "text/parameters" : req->content_type);
+ if (res < 0)
+ goto add_content_hdr_failed;
+
+ if (req->body && req->body->len) {
+ res =
+ gst_rtsp_message_set_body (&request, (guint8 *) req->body->str,
+ req->body->len);
+ if (res < 0)
+ goto set_body_failed;
+ }
+
+ if ((res = gst_rtspsrc_send (src, &src->conninfo,
+ &request, &response, &code, NULL)) < 0)
+ goto send_error;
+
+ res = gst_rtsp_message_get_body (&response, (guint8 **) & recv_body,
+ &recv_body_len);
+ if (res < 0)
+ goto get_body_failed;
+
+done:
+ {
+ gst_promise_reply (req->promise,
+ gst_structure_new ("get-parameter-reply",
+ "rtsp-result", G_TYPE_INT, res,
+ "rtsp-code", G_TYPE_INT, code,
+ "rtsp-reason", G_TYPE_STRING, gst_rtsp_status_as_text (code),
+ "body", G_TYPE_STRING, GST_STR_NULL (recv_body), NULL));
+ free_param_data (req);
+
+
+ gst_rtsp_message_unset (&request);
+ gst_rtsp_message_unset (&response);
+
+ return res;
+ }
+
+ /* ERRORS */
+open_failed:
+ {
+ GST_DEBUG_OBJECT (src, "failed to open stream");
+ goto done;
+ }
+no_control:
+ {
+ GST_DEBUG_OBJECT (src, "no control url to send GET_PARAMETER");
+ res = GST_RTSP_ERROR;
+ goto done;
+ }
+not_supported:
+ {
+ GST_DEBUG_OBJECT (src, "GET_PARAMETER is not supported");
+ res = GST_RTSP_ERROR;
+ goto done;
+ }
+create_request_failed:
+ {
+ GST_DEBUG_OBJECT (src, "could not create GET_PARAMETER request");
+ goto done;
+ }
+add_content_hdr_failed:
+ {
+ GST_DEBUG_OBJECT (src, "could not add content header");
+ goto done;
+ }
+set_body_failed:
+ {
+ GST_DEBUG_OBJECT (src, "could not set body");
+ goto done;
+ }
+send_error:
+ {
+ gchar *str = gst_rtsp_strresult (res);
+
+ GST_ELEMENT_WARNING (src, RESOURCE, WRITE, (NULL),
+ ("Could not send get-parameter. (%s)", str));
+ g_free (str);
+ goto done;
+ }
+get_body_failed:
+ {
+ GST_DEBUG_OBJECT (src, "could not get body");
+ goto done;
+ }
+}
+
+/* send SET_PARAMETER */
+static GstRTSPResult
+gst_rtspsrc_set_parameter (GstRTSPSrc * src, ParameterRequest * req)
+{
+ GstRTSPMessage request = { 0 };
+ GstRTSPMessage response = { 0 };
+ GstRTSPResult res = GST_RTSP_OK;
+ GstRTSPStatusCode code = GST_RTSP_STS_OK;
+ const gchar *control;
+
+ GST_DEBUG_OBJECT (src, "creating server set_parameter");
+
+ if ((res = gst_rtspsrc_ensure_open (src, FALSE)) < 0)
+ goto open_failed;
+
+ control = get_aggregate_control (src);
+ if (control == NULL)
+ goto no_control;
+
+ if (!(src->methods & GST_RTSP_SET_PARAMETER))
+ goto not_supported;
+
+ gst_rtspsrc_connection_flush (src, FALSE);
+
+ res =
+ gst_rtsp_message_init_request (&request, GST_RTSP_SET_PARAMETER, control);
+ if (res < 0)
+ goto send_error;
+
+ res = gst_rtsp_message_add_header (&request, GST_RTSP_HDR_CONTENT_TYPE,
+ req->content_type == NULL ? "text/parameters" : req->content_type);
+ if (res < 0)
+ goto add_content_hdr_failed;
+
+ if (req->body && req->body->len) {
+ res =
+ gst_rtsp_message_set_body (&request, (guint8 *) req->body->str,
+ req->body->len);
+
+ if (res < 0)
+ goto set_body_failed;
+ }
+
+ if ((res = gst_rtspsrc_send (src, &src->conninfo,
+ &request, &response, &code, NULL)) < 0)
+ goto send_error;
+
+done:
+ {
+ gst_promise_reply (req->promise, gst_structure_new ("set-parameter-reply",
+ "rtsp-result", G_TYPE_INT, res,
+ "rtsp-code", G_TYPE_INT, code,
+ "rtsp-reason", G_TYPE_STRING, gst_rtsp_status_as_text (code),
+ NULL));
+ free_param_data (req);
+
+ gst_rtsp_message_unset (&request);
+ gst_rtsp_message_unset (&response);
+
+ return res;
+ }
+
+ /* ERRORS */
+open_failed:
+ {
+ GST_DEBUG_OBJECT (src, "failed to open stream");
+ goto done;
+ }
+no_control:
+ {
+ GST_DEBUG_OBJECT (src, "no control url to send SET_PARAMETER");
+ res = GST_RTSP_ERROR;
+ goto done;
+ }
+not_supported:
+ {
+ GST_DEBUG_OBJECT (src, "SET_PARAMETER is not supported");
+ res = GST_RTSP_ERROR;
+ goto done;
+ }
+add_content_hdr_failed:
+ {
+ GST_DEBUG_OBJECT (src, "could not add content header");
+ goto done;
+ }
+set_body_failed:
+ {
+ GST_DEBUG_OBJECT (src, "could not set body");
+ goto done;
+ }
+send_error:
+ {
+ gchar *str = gst_rtsp_strresult (res);
+
+ GST_ELEMENT_WARNING (src, RESOURCE, WRITE, (NULL),
+ ("Could not send set-parameter. (%s)", str));
+ g_free (str);
+ goto done;
+ }
+}
+
typedef struct _RTSPKeyValue
{
GstRTSPHeaderField field;