static void gst_rtspsrc_base_init (gpointer g_class);
static void gst_rtspsrc_class_init (GstRTSPSrc * klass);
static void gst_rtspsrc_init (GstRTSPSrc * rtspsrc);
+static void gst_rtspsrc_finalize (GObject * object);
static void gst_rtspsrc_uri_handler_init (gpointer g_iface,
gpointer iface_data);
gobject_class->set_property = gst_rtspsrc_set_property;
gobject_class->get_property = gst_rtspsrc_get_property;
+ gobject_class->finalize = gst_rtspsrc_finalize;
+
g_object_class_install_property (gobject_class, PROP_LOCATION,
g_param_spec_string ("location", "RTSP Location",
"Location of the RTSP url to read",
static void
gst_rtspsrc_init (GstRTSPSrc * src)
{
+ src->stream_rec_lock = g_new (GStaticRecMutex, 1);
+ g_static_rec_mutex_init (src->stream_rec_lock);
+}
+
+static void
+gst_rtspsrc_finalize (GObject * object)
+{
+ GstRTSPSrc *rtspsrc;
+
+ rtspsrc = GST_RTSPSRC (object);
+
+ g_static_rec_mutex_free (rtspsrc->stream_rec_lock);
+ g_free (rtspsrc->stream_rec_lock);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
return s;
}
+#if 0
+static void
+gst_rtspsrc_free_stream (GstRTSPSrc * src, GstRTSPStream * stream)
+{
+ if (stream->caps) {
+ gst_caps_unref (stream->caps);
+ stream->caps = NULL;
+ }
+
+ src->streams = g_list_remove (src->streams, stream);
+ src->numstreams--;
+
+ g_free (stream);
+}
+#endif
+
static gboolean
gst_rtspsrc_add_element (GstRTSPSrc * src, GstElement * element)
{
*
* m=<media> <udp port> RTP/AVP <payload>
* a=rtpmap:<payload> <encoding_name>/<clock_rate>[/<encoding_params>]
- * a=fmtp:<payload> <param>=<value>;...
+ * a=fmtp:<payload> <param>[=<value>];...
*/
static GstCaps *
gst_rtspsrc_media_to_caps (SDPMedia * media)
p = fmtp;
+ /* p is now of the format <payload> <param>[=<value>];... */
PARSE_INT (p, " ", payload);
if (payload != -1 && payload == pt) {
gchar **pairs;
gint i;
+ /* <param>[=<value>] are separated with ';' */
pairs = g_strsplit (p, ";", 0);
for (i = 0; pairs[i]; i++) {
- gchar **keyval;
-
- keyval = g_strsplit (pairs[i], "=", 0);
- if (keyval[0]) {
- gchar *val, *key;
-
- if (keyval[1])
- val = g_strstrip (keyval[1]);
- else
- val = "1";
-
- key = g_strstrip (keyval[0]);
-
- gst_structure_set (s, key, G_TYPE_STRING, val, NULL);
+ gchar *valpos;
+ gchar *val, *key;
+
+ /* the key may not have a '=', the value can have other '='s */
+ valpos = strstr (pairs[i], "=");
+ if (valpos) {
+ /* we have a '=' and thus a value, remove the '=' with \0 */
+ *valpos = '\0';
+ /* value is everything between '=' and ';' */
+ val = g_strstrip (valpos + 1);
+ } else {
+ /* simple <param>;.. is translated into <param>=1;... */
+ val = "1";
}
- g_strfreev (keyval);
+ /* strip the key of spaces */
+ key = g_strstrip (pairs[i]);
+
+ gst_structure_set (s, key, G_TYPE_STRING, val, NULL);
}
g_strfreev (pairs);
}
}
-
return caps;
}
/* ERRORS */
no_udp_rtp_protocol:
{
- GST_DEBUG ("could not get UDP source for RTP");
+ GST_DEBUG_OBJECT (src, "could not get UDP source for RTP");
goto cleanup;
}
start_rtp_failure:
{
- GST_DEBUG ("could not start UDP source for RTP");
+ GST_DEBUG_OBJECT (src, "could not start UDP source for RTP");
goto cleanup;
}
no_ports:
{
- GST_DEBUG ("could not allocate UDP port pair after %d retries", count);
+ GST_DEBUG_OBJECT (src, "could not allocate UDP port pair after %d retries",
+ count);
goto cleanup;
}
no_udp_rtcp_protocol:
{
- GST_DEBUG ("could not get UDP source for RTCP");
+ GST_DEBUG_OBJECT (src, "could not get UDP source for RTCP");
goto cleanup;
}
start_rtcp_failure:
{
- GST_DEBUG ("could not start UDP source for RTCP");
+ GST_DEBUG_OBJECT (src, "could not start UDP source for RTCP");
goto cleanup;
}
port_error:
{
- GST_DEBUG ("ports don't match rtp: %d<->%d, rtcp: %d<->%d",
+ GST_DEBUG_OBJECT (src, "ports don't match rtp: %d<->%d, rtcp: %d<->%d",
tmp_rtp, *rtpport, tmp_rtcp, *rtcpport);
goto cleanup;
}
static gboolean
gst_rtspsrc_stream_configure_transport (GstRTSPStream * stream,
- RTSPTransport * transport)
+ SDPMedia * media, RTSPTransport * transport)
{
GstRTSPSrc *src;
GstPad *pad;
src = stream->parent;
+ GST_DEBUG ("configuring RTP transport for stream %p", stream);
+
if (!(stream->rtpdec = gst_element_factory_make ("rtpdec", NULL)))
goto no_element;
/* configure for interleaved delivery, nothing needs to be done
* here, the loop function will call the chain functions of the
* rtp session manager. */
+ stream->rtpchannel = transport->interleaved.min;
+ stream->rtcpchannel = transport->interleaved.max;
+ GST_DEBUG ("stream %p on channels %d-%d", stream,
+ stream->rtpchannel, stream->rtcpchannel);
+
+ /* also store the caps in the stream */
+ stream->caps = gst_rtspsrc_media_to_caps (media);
} else {
/* configure for UDP delivery, we need to connect the udp pads to
* the rtp session plugin. */
}
pad = gst_element_get_pad (stream->rtpdec, "srcrtp");
+ if (stream->caps) {
+ gst_pad_use_fixed_caps (pad);
+ gst_pad_set_caps (pad, stream->caps);
+ }
name = g_strdup_printf ("rtp_stream%d", stream->id);
gst_element_add_pad (GST_ELEMENT_CAST (src), gst_ghost_pad_new (name, pad));
g_free (name);
return TRUE;
+ /* ERRORS */
no_element:
{
- GST_DEBUG ("no rtpdec element found");
+ GST_DEBUG_OBJECT (src, "no rtpdec element found");
return FALSE;
}
start_rtpdec_failure:
{
- GST_DEBUG ("could not start RTP session");
+ GST_DEBUG_OBJECT (src, "could not start RTP session");
return FALSE;
}
}
return -1;
}
+static GstFlowReturn
+gst_rtspsrc_combine_flows (GstRTSPSrc * src, GstRTSPStream * stream,
+ GstFlowReturn ret)
+{
+ GList *streams;
+
+ /* store the value */
+ stream->last_ret = ret;
+
+ /* if it's success we can return the value right away */
+ if (GST_FLOW_IS_SUCCESS (ret))
+ goto done;
+
+ /* any other error that is not-linked can be returned right
+ * away */
+ if (ret != GST_FLOW_NOT_LINKED)
+ goto done;
+
+ /* only return NOT_LINKED if all other pads returned NOT_LINKED */
+ for (streams = src->streams; streams; streams = g_list_next (streams)) {
+ GstRTSPStream *ostream = (GstRTSPStream *) streams->data;
+
+ ret = ostream->last_ret;
+ /* some other return value (must be SUCCESS but we can return
+ * other values as well) */
+ if (ret != GST_FLOW_NOT_LINKED)
+ goto done;
+ }
+ /* if we get here, all other pads were unlinked and we return
+ * NOT_LINKED then */
+done:
+ return ret;
+}
+
static void
gst_rtspsrc_loop (GstRTSPSrc * src)
{
GstPad *outpad = NULL;
guint8 *data;
guint size;
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstCaps *caps = NULL;
do {
- GST_DEBUG ("doing reveive");
+ GST_DEBUG_OBJECT (src, "doing receive");
if ((res = rtsp_connection_receive (src->connection, &response)) < 0)
goto receive_error;
- GST_DEBUG ("got packet");
+ GST_DEBUG_OBJECT (src, "got packet type %d", response.type);
}
while (response.type != RTSP_MESSAGE_DATA);
goto unknown_stream;
stream = (GstRTSPStream *) lstream->data;
- if (channel == stream->rtpchannel)
+ if (channel == stream->rtpchannel) {
outpad = stream->rtpdecrtp;
- else if (channel == stream->rtcpchannel)
+ caps = stream->caps;
+ } else if (channel == stream->rtcpchannel) {
outpad = stream->rtpdecrtcp;
+ }
rtsp_message_get_body (&response, &data, &size);
{
GstBuffer *buf;
+ /* strip the trailing \0 */
+ size -= 1;
+
buf = gst_buffer_new_and_alloc (size);
memcpy (GST_BUFFER_DATA (buf), data, size);
- if (gst_pad_chain (outpad, buf) != GST_FLOW_OK)
- goto need_pause;
- }
+ if (caps)
+ gst_buffer_set_caps (buf, caps);
-unknown_stream:
+ GST_DEBUG_OBJECT (src, "pushing data of size %d on channel %d", size,
+ channel);
+
+ /* chain to the peer pad */
+ ret = gst_pad_chain (outpad, buf);
+ /* combine all streams */
+ ret = gst_rtspsrc_combine_flows (src, stream, ret);
+ if (ret != GST_FLOW_OK)
+ goto need_pause;
+ }
return;
+ /* ERRORS */
+unknown_stream:
+ {
+ GST_DEBUG_OBJECT (src, "unknown stream on channel %d, ignored", channel);
+ return;
+ }
receive_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, WRITE,
("Could not receive message."), (NULL));
+ ret = GST_FLOW_UNEXPECTED;
/*
gst_pad_push_event (src->srcpad, gst_event_new (GST_EVENT_EOS));
*/
}
need_pause:
{
+ GST_DEBUG_OBJECT (src, "pausing task, reason %d (%s)", ret,
+ gst_flow_get_name (ret));
gst_task_pause (src->task);
return;
}
GstRTSPProto protocols;
/* parse url */
- GST_DEBUG ("parsing url...");
+ GST_DEBUG_OBJECT (src, "parsing url...");
if ((res = rtsp_url_parse (src->location, &url)) < 0)
goto invalid_url;
/* open connection */
- GST_DEBUG ("opening connection...");
+ GST_DEBUG_OBJECT (src, "opening connection...");
if ((res = rtsp_connection_open (url, &src->connection)) < 0)
goto could_not_open;
/* create OPTIONS */
- GST_DEBUG ("create options...");
+ GST_DEBUG_OBJECT (src, "create options...");
if ((res =
rtsp_message_init_request (RTSP_OPTIONS, src->location,
&request)) < 0)
goto create_request_failed;
/* send OPTIONS */
- GST_DEBUG ("send options...");
+ GST_DEBUG_OBJECT (src, "send options...");
if (!gst_rtspsrc_send (src, &request, &response, NULL))
goto send_error;
}
/* create DESCRIBE */
- GST_DEBUG ("create describe...");
+ GST_DEBUG_OBJECT (src, "create describe...");
if ((res =
rtsp_message_init_request (RTSP_DESCRIBE, src->location,
&request)) < 0)
rtsp_message_add_header (&request, RTSP_HDR_ACCEPT, "application/sdp");
/* send DESCRIBE */
- GST_DEBUG ("send describe...");
+ GST_DEBUG_OBJECT (src, "send describe...");
if (!gst_rtspsrc_send (src, &request, &response, NULL))
goto send_error;
/* parse SDP */
rtsp_message_get_body (&response, &data, &size);
- GST_DEBUG ("parse sdp...");
+ GST_DEBUG_OBJECT (src, "parse sdp...");
sdp_message_init (&sdp);
sdp_message_parse_buffer (data, size, &sdp);
stream = gst_rtspsrc_create_stream (src);
- GST_DEBUG ("setup media %d", i);
+ GST_DEBUG_OBJECT (src, "setup media %d", i);
control_url = sdp_media_get_attribute_val (media, "control");
if (control_url == NULL) {
- GST_DEBUG ("no control url found, skipping stream");
+ GST_DEBUG_OBJECT (src, "no control url found, skipping stream");
continue;
}
setup_url = g_strdup_printf ("%s/%s", src->location, control_url);
}
- GST_DEBUG ("setup %s", setup_url);
+ GST_DEBUG_OBJECT (src, "setup %s", setup_url);
/* create SETUP request */
if ((res =
rtsp_message_init_request (RTSP_SETUP, setup_url,
if (!gst_rtspsrc_stream_setup_rtp (stream, media, &rtpport, &rtcpport))
goto setup_rtp_failed;
+ GST_DEBUG_OBJECT (src, "setting up RTP ports %d-%d", rtpport, rtcpport);
+
trxparams = g_strdup_printf ("client_port=%d-%d", rtpport, rtcpport);
new = g_strconcat (transports, "RTP/AVP/UDP;unicast;", trxparams, NULL);
g_free (trxparams);
if (protocols & GST_RTSP_PROTO_UDP_MULTICAST) {
gchar *new;
+ GST_DEBUG_OBJECT (src, "setting up MULTICAST");
+
new =
g_strconcat (transports, transports[0] ? "," : "",
"RTP/AVP/UDP;multicast", NULL);
if (protocols & GST_RTSP_PROTO_TCP) {
gchar *new;
+ GST_DEBUG_OBJECT (src, "setting up TCP");
+
new =
g_strconcat (transports, transports[0] ? "," : "", "RTP/AVP/TCP",
NULL);
rtsp_transport_parse (resptrans, &transport);
/* update allowed transports for other streams */
if (transport.lower_transport == RTSP_LOWER_TRANS_TCP) {
+ GST_DEBUG_OBJECT (src, "stream %d as TCP", i);
protocols = GST_RTSP_PROTO_TCP;
src->interleaved = TRUE;
} else {
if (transport.multicast) {
/* disable unicast */
+ GST_DEBUG_OBJECT (src, "stream %d as MULTICAST", i);
protocols = GST_RTSP_PROTO_UDP_MULTICAST;
} else {
/* disable multicast */
+ GST_DEBUG_OBJECT (src, "stream %d as UNICAST", i);
protocols = GST_RTSP_PROTO_UDP_UNICAST;
}
}
/* now configure the stream with the transport */
- if (!gst_rtspsrc_stream_configure_transport (stream, &transport)) {
- GST_DEBUG ("could not configure stream transport, skipping stream");
+ if (!gst_rtspsrc_stream_configure_transport (stream, media, &transport)) {
+ GST_DEBUG_OBJECT (src,
+ "could not configure stream transport, skipping stream");
}
/* clean up our transport struct */
rtsp_transport_init (&transport);
RTSPMessage response = { 0 };
RTSPResult res;
- GST_DEBUG ("TEARDOWN...");
+ GST_DEBUG_OBJECT (src, "TEARDOWN...");
/* stop task if any */
if (src->task) {
gst_task_stop (src->task);
+
+ /* make sure it is not running */
+ g_static_rec_mutex_lock (src->stream_rec_lock);
+ g_static_rec_mutex_unlock (src->stream_rec_lock);
+
+ /* no wait for the task to finish */
+ gst_task_join (src->task);
+
+ /* and free the task */
gst_object_unref (GST_OBJECT (src->task));
src->task = NULL;
}
}
/* close connection */
- GST_DEBUG ("closing connection...");
+ GST_DEBUG_OBJECT (src, "closing connection...");
if ((res = rtsp_connection_close (src->connection)) < 0)
goto close_failed;
if (!(src->options & RTSP_PLAY))
return TRUE;
- GST_DEBUG ("PLAY...");
+ GST_DEBUG_OBJECT (src, "PLAY...");
/* do play */
if ((res =
if (src->interleaved) {
src->task = gst_task_create ((GstTaskFunction) gst_rtspsrc_loop, src);
+ gst_task_set_lock (src->task, src->stream_rec_lock);
gst_task_start (src->task);
}
if (!(src->options & RTSP_PAUSE))
return TRUE;
- GST_DEBUG ("PAUSE...");
+ GST_DEBUG_OBJECT (src, "PAUSE...");
/* do pause */
if ((res =
rtsp_message_init_request (RTSP_PAUSE, src->location, &request)) < 0)