X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtsp%2Fgstrtspsrc.c;h=05f532d54bcf6d73d8267c92924eaaed621ea174;hb=a858bf46db860965089611674bfc43289830b646;hp=9293baa1f88953eb78ef0e636149db5d76daec9a;hpb=bca1463a666bfc149a5fa2cf2acaa480f15b906a;p=platform%2Fupstream%2Fgstreamer.git diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index 9293baa..05f532d 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -14,8 +14,8 @@ * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. */ /* * Unless otherwise indicated, Source Code is licensed under MIT license. @@ -69,7 +69,7 @@ * * Example launch line * |[ - * gst-launch rtspsrc location=rtsp://some.server/url ! fakesink + * gst-launch-1.0 rtspsrc location=rtsp://some.server/url ! fakesink * ]| Establish a connection to an RTSP server and send the raw RTP packets to a * fakesink. * @@ -86,7 +86,6 @@ #endif /* HAVE_UNISTD_H */ #include #include -#include #include #include @@ -167,6 +166,7 @@ gst_rtsp_src_buffer_mode_get_type (void) #define DEFAULT_UDP_BUFFER_SIZE 0x80000 #define DEFAULT_TCP_TIMEOUT 20000000 #define DEFAULT_LATENCY_MS 2000 +#define DEFAULT_DROP_ON_LATENCY FALSE #define DEFAULT_CONNECTION_SPEED 0 #define DEFAULT_NAT_METHOD GST_RTSP_NAT_DUMMY #define DEFAULT_DO_RTCP TRUE @@ -178,6 +178,8 @@ gst_rtsp_src_buffer_mode_get_type (void) #define DEFAULT_BUFFER_MODE BUFFER_MODE_AUTO #define DEFAULT_PORT_RANGE NULL #define DEFAULT_SHORT_HEADER FALSE +#define DEFAULT_PROBATION 2 +#define DEFAULT_UDP_RECONNECT TRUE enum { @@ -189,6 +191,7 @@ enum PROP_TIMEOUT, PROP_TCP_TIMEOUT, PROP_LATENCY, + PROP_DROP_ON_LATENCY, PROP_CONNECTION_SPEED, PROP_NAT_METHOD, PROP_DO_RTCP, @@ -201,6 +204,8 @@ enum PROP_PORT_RANGE, PROP_UDP_BUFFER_SIZE, PROP_SHORT_HEADER, + PROP_PROBATION, + PROP_UDP_RECONNECT, PROP_LAST }; @@ -248,36 +253,35 @@ static void gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message); static gboolean gst_rtspsrc_setup_auth (GstRTSPSrc * src, GstRTSPMessage * response); -static void gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd); +static void gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gint mask); static GstRTSPResult gst_rtspsrc_send_cb (GstRTSPExtension * ext, GstRTSPMessage * request, GstRTSPMessage * response, GstRTSPSrc * src); static GstRTSPResult gst_rtspsrc_open (GstRTSPSrc * src, gboolean async); static GstRTSPResult gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, gboolean async); -static GstRTSPResult gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle, - gboolean async); +static GstRTSPResult gst_rtspsrc_pause (GstRTSPSrc * src, gboolean async); static GstRTSPResult gst_rtspsrc_close (GstRTSPSrc * src, gboolean async, gboolean only_close); static gboolean gst_rtspsrc_uri_set_uri (GstURIHandler * handler, const gchar * uri, GError ** error); +static gchar *gst_rtspsrc_uri_get_uri (GstURIHandler * handler); static gboolean gst_rtspsrc_activate_streams (GstRTSPSrc * src); static gboolean gst_rtspsrc_loop (GstRTSPSrc * src); static gboolean gst_rtspsrc_stream_push_event (GstRTSPSrc * src, - GstRTSPStream * stream, GstEvent * event, gboolean source); -static gboolean gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event, - gboolean source); + GstRTSPStream * stream, GstEvent * event); +static gboolean gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event); /* commands we send to out loop to notify it of events */ -#define CMD_OPEN 0 -#define CMD_PLAY 1 -#define CMD_PAUSE 2 -#define CMD_CLOSE 3 -#define CMD_WAIT 4 -#define CMD_RECONNECT 5 -#define CMD_LOOP 6 +#define CMD_OPEN (1 << 0) +#define CMD_PLAY (1 << 1) +#define CMD_PAUSE (1 << 2) +#define CMD_CLOSE (1 << 3) +#define CMD_WAIT (1 << 4) +#define CMD_RECONNECT (1 << 5) +#define CMD_LOOP (1 << 6) #define GST_ELEMENT_PROGRESS(el, type, code, text) \ G_STMT_START { \ @@ -349,6 +353,12 @@ gst_rtspsrc_class_init (GstRTSPSrcClass * klass) "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY, + g_param_spec_boolean ("drop-on-latency", + "Drop buffers when maximum latency is reached", + "Tells the jitterbuffer to never exceed the given latency in size", + DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED, g_param_spec_uint64 ("connection-speed", "Connection Speed", "Network connection speed in kbps (0 = unknown)", @@ -476,13 +486,24 @@ gst_rtspsrc_class_init (GstRTSPSrcClass * klass) "Only send the basic RTSP headers for broken encoders", DEFAULT_SHORT_HEADER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_PROBATION, + g_param_spec_uint ("probation", "Number of probations", + "Consecutive packet sequence numbers to accept the source", + 0, G_MAXUINT, DEFAULT_PROBATION, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_UDP_RECONNECT, + g_param_spec_boolean ("udp-reconnect", "Reconnect to the server", + "Reconnect to the server if RTSP connection is closed when doing UDP", + DEFAULT_UDP_RECONNECT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gstelement_class->send_event = gst_rtspsrc_send_event; gstelement_class->change_state = gst_rtspsrc_change_state; gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&rtptemplate)); - gst_element_class_set_details_simple (gstelement_class, + gst_element_class_set_static_metadata (gstelement_class, "RTSP packet receiver", "Source/Network", "Receive data over the network via RTSP (RFC 2326)", "Wim Taymans , " @@ -505,6 +526,7 @@ gst_rtspsrc_init (GstRTSPSrc * src) src->udp_timeout = DEFAULT_TIMEOUT; gst_rtspsrc_set_tcp_timeout (src, DEFAULT_TCP_TIMEOUT); src->latency = DEFAULT_LATENCY_MS; + src->drop_on_latency = DEFAULT_DROP_ON_LATENCY; src->connection_speed = DEFAULT_CONNECTION_SPEED; src->nat_method = DEFAULT_NAT_METHOD; src->do_rtcp = DEFAULT_DO_RTCP; @@ -518,6 +540,8 @@ gst_rtspsrc_init (GstRTSPSrc * src) src->client_port_range.max = 0; src->udp_buffer_size = DEFAULT_UDP_BUFFER_SIZE; src->short_header = DEFAULT_SHORT_HEADER; + src->probation = DEFAULT_PROBATION; + src->udp_reconnect = DEFAULT_UDP_RECONNECT; /* get a list of all extensions */ src->extensions = gst_rtsp_ext_list_get (); @@ -658,6 +682,9 @@ gst_rtspsrc_set_property (GObject * object, guint prop_id, const GValue * value, case PROP_LATENCY: rtspsrc->latency = g_value_get_uint (value); break; + case PROP_DROP_ON_LATENCY: + rtspsrc->drop_on_latency = g_value_get_boolean (value); + break; case PROP_CONNECTION_SPEED: rtspsrc->connection_speed = g_value_get_uint64 (value); break; @@ -709,6 +736,12 @@ gst_rtspsrc_set_property (GObject * object, guint prop_id, const GValue * value, case PROP_SHORT_HEADER: rtspsrc->short_header = g_value_get_boolean (value); break; + case PROP_PROBATION: + rtspsrc->probation = g_value_get_uint (value); + break; + case PROP_UDP_RECONNECT: + rtspsrc->udp_reconnect = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -751,6 +784,9 @@ gst_rtspsrc_get_property (GObject * object, guint prop_id, GValue * value, case PROP_LATENCY: g_value_set_uint (value, rtspsrc->latency); break; + case PROP_DROP_ON_LATENCY: + g_value_set_boolean (value, rtspsrc->drop_on_latency); + break; case PROP_CONNECTION_SPEED: g_value_set_uint64 (value, rtspsrc->connection_speed); break; @@ -807,6 +843,12 @@ gst_rtspsrc_get_property (GObject * object, guint prop_id, GValue * value, case PROP_SHORT_HEADER: g_value_set_boolean (value, rtspsrc->short_header); break; + case PROP_PROBATION: + g_value_set_uint (value, rtspsrc->probation); + break; + case PROP_UDP_RECONNECT: + g_value_set_boolean (value, rtspsrc->udp_reconnect); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -1205,6 +1247,10 @@ gst_rtspsrc_cleanup (GstRTSPSrc * src) gst_sdp_message_free (src->sdp); src->sdp = NULL; } + if (src->start_segment) { + gst_event_unref (src->start_segment); + src->start_segment = NULL; + } } #define PARSE_INT(p, del, res) \ @@ -1519,7 +1565,7 @@ again: tmp_rtp >= src->client_port_range.max) goto no_ports; - udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL); + udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL); if (udpsrc0 == NULL) goto no_udp_protocol; g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, "reuse", FALSE, NULL); @@ -1540,6 +1586,7 @@ again: GST_DEBUG_OBJECT (src, "free RTP udpsrc"); gst_element_set_state (udpsrc0, GST_STATE_NULL); gst_object_unref (udpsrc0); + udpsrc0 = NULL; GST_DEBUG_OBJECT (src, "retry %d", count); goto again; @@ -1561,6 +1608,7 @@ again: GST_DEBUG_OBJECT (src, "free RTP udpsrc"); gst_element_set_state (udpsrc0, GST_STATE_NULL); gst_object_unref (udpsrc0); + udpsrc0 = NULL; GST_DEBUG_OBJECT (src, "retry %d", count); tmp_rtp++; @@ -1568,13 +1616,13 @@ again: } /* allocate port+1 for RTCP now */ - udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL); + udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL); if (udpsrc1 == NULL) goto no_udp_rtcp_protocol; /* set port */ tmp_rtcp = tmp_rtp + 1; - if (src->client_port_range.max > 0 && tmp_rtcp >= src->client_port_range.max) + if (src->client_port_range.max > 0 && tmp_rtcp > src->client_port_range.max) goto no_ports; g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, "reuse", FALSE, NULL); @@ -1591,6 +1639,7 @@ again: GST_DEBUG_OBJECT (src, "free RTP udpsrc"); gst_element_set_state (udpsrc0, GST_STATE_NULL); gst_object_unref (udpsrc0); + udpsrc0 = NULL; GST_DEBUG_OBJECT (src, "free RTCP udpsrc"); gst_element_set_state (udpsrc1, GST_STATE_NULL); @@ -1666,8 +1715,6 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing) gint cmd, i; GstState state; GList *walk; - GstClock *clock; - GstClockTime base_time = GST_CLOCK_TIME_NONE; if (flush) { event = gst_event_new_flush_start (); @@ -1675,31 +1722,20 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing) cmd = CMD_WAIT; state = GST_STATE_PAUSED; } else { - event = gst_event_new_flush_stop (TRUE); + event = gst_event_new_flush_stop (FALSE); GST_DEBUG_OBJECT (src, "stop flush; playing %d", playing); cmd = CMD_LOOP; if (playing) state = GST_STATE_PLAYING; else state = GST_STATE_PAUSED; - clock = gst_element_get_clock (GST_ELEMENT_CAST (src)); - if (clock) { - base_time = gst_clock_get_time (clock); - gst_object_unref (clock); - } } - gst_rtspsrc_push_event (src, event, FALSE); - gst_rtspsrc_loop_send_cmd (src, cmd); + gst_rtspsrc_push_event (src, event); + gst_rtspsrc_loop_send_cmd (src, cmd, CMD_LOOP); - /* set up manager before data-flow resumes */ /* to manage jitterbuffer buffer mode */ - if (src->manager) { - gst_element_set_base_time (GST_ELEMENT_CAST (src->manager), base_time); - /* and to have base_time trickle further down, - * e.g. to jitterbuffer for its timeout handling */ - if (base_time != -1) - gst_element_set_state (GST_ELEMENT_CAST (src->manager), state); - } + if (src->manager) + gst_element_set_state (GST_ELEMENT_CAST (src->manager), state); /* make running time start start at 0 again */ for (walk = src->streams; walk; walk = g_list_next (walk)) { @@ -1708,15 +1744,10 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing) for (i = 0; i < 2; i++) { /* for udp case */ if (stream->udpsrc[i]) { - if (base_time != -1) - gst_element_set_base_time (stream->udpsrc[i], base_time); gst_element_set_state (stream->udpsrc[i], state); } } } - /* for tcp interleaved case */ - if (base_time != -1) - gst_element_set_base_time (GST_ELEMENT_CAST (src), base_time); } static GstRTSPResult @@ -1866,8 +1897,9 @@ gst_rtspsrc_perform_seek (GstRTSPSrc * src, GstEvent * event) if (playing) { /* obtain current position in case seek fails */ gst_rtspsrc_get_position (src); - gst_rtspsrc_pause (src, FALSE, FALSE); + gst_rtspsrc_pause (src, FALSE); } + src->skip = skip; gst_rtspsrc_do_seek (src, &seeksegment); @@ -1896,18 +1928,12 @@ gst_rtspsrc_perform_seek (GstRTSPSrc * src, GstEvent * event) GST_DEBUG_OBJECT (src, "Creating newsegment from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT, src->segment.position, stop); - /* store the newsegment event so it can be sent from the streaming thread. */ - if (src->start_segment) - gst_event_unref (src->start_segment); - src->start_segment = gst_event_new_segment (&src->segment); - /* mark discont */ GST_DEBUG_OBJECT (src, "mark DISCONT, we did a seek to another position"); for (walk = src->streams; walk; walk = g_list_next (walk)) { GstRTSPStream *stream = (GstRTSPStream *) walk->data; stream->discont = TRUE; } - src->skip = skip; GST_RTSP_STREAM_UNLOCK (src); @@ -2090,6 +2116,18 @@ gst_rtspsrc_handle_src_query (GstPad * pad, GstObject * parent, } break; } + case GST_QUERY_URI: + { + gchar *uri; + + uri = gst_rtspsrc_uri_get_uri (GST_URI_HANDLER (src)); + if (uri != NULL) { + gst_query_set_uri (query, uri); + g_free (uri); + res = TRUE; + } + break; + } default: { GstPad *target = gst_ghost_pad_get_target (GST_GHOST_PAD_CAST (pad)); @@ -2188,7 +2226,7 @@ new_manager_pad (GstElement * manager, GstPad * pad, GstRTSPSrc * src) gchar *name; GstPadTemplate *template; gint id, ssrc, pt; - GList *lstream; + GList *ostreams; GstRTSPStream *stream; gboolean all_added; @@ -2200,41 +2238,43 @@ new_manager_pad (GstElement * manager, GstPad * pad, GstRTSPSrc * src) if (sscanf (name, "recv_rtp_src_%u_%u_%u", &id, &ssrc, &pt) != 3) goto unknown_stream; - GST_DEBUG_OBJECT (src, "stream: %u, SSRC %d, PT %d", id, ssrc, pt); + GST_DEBUG_OBJECT (src, "stream: %u, SSRC %08x, PT %d", id, ssrc, pt); stream = find_stream (src, &id, (gpointer) find_stream_by_id); if (stream == NULL) goto unknown_stream; - /* create a new pad we will use to stream to */ - template = gst_static_pad_template_get (&rtptemplate); - stream->srcpad = gst_ghost_pad_new_from_template (name, pad, template); - gst_object_unref (template); - g_free (name); - + /* we'll add it later see below */ stream->added = TRUE; - gst_pad_set_event_function (stream->srcpad, gst_rtspsrc_handle_src_event); - gst_pad_set_query_function (stream->srcpad, gst_rtspsrc_handle_src_query); - gst_pad_set_active (stream->srcpad, TRUE); - gst_element_add_pad (GST_ELEMENT_CAST (src), stream->srcpad); /* check if we added all streams */ all_added = TRUE; - for (lstream = src->streams; lstream; lstream = g_list_next (lstream)) { - stream = (GstRTSPStream *) lstream->data; + for (ostreams = src->streams; ostreams; ostreams = g_list_next (ostreams)) { + GstRTSPStream *ostream = (GstRTSPStream *) ostreams->data; GST_DEBUG_OBJECT (src, "stream %p, container %d, disabled %d, added %d", - stream, stream->container, stream->disabled, stream->added); + ostream, ostream->container, ostream->disabled, ostream->added); /* a container stream only needs one pad added. Also disabled streams don't * count */ - if (!stream->container && !stream->disabled && !stream->added) { + if (!ostream->container && !ostream->disabled && !ostream->added) { all_added = FALSE; break; } } GST_RTSP_STATE_UNLOCK (src); + /* create a new pad we will use to stream to */ + template = gst_static_pad_template_get (&rtptemplate); + stream->srcpad = gst_ghost_pad_new_from_template (name, pad, template); + gst_object_unref (template); + g_free (name); + + gst_pad_set_event_function (stream->srcpad, gst_rtspsrc_handle_src_event); + gst_pad_set_query_function (stream->srcpad, gst_rtspsrc_handle_src_query); + gst_pad_set_active (stream->srcpad, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (src), stream->srcpad); + if (all_added) { GST_DEBUG_OBJECT (src, "We added all streams"); /* when we get here, all stream are added and we can fire the no-more-pads @@ -2291,7 +2331,7 @@ gst_rtspsrc_do_stream_eos (GstRTSPSrc * src, GstRTSPStream * stream) goto was_eos; stream->eos = TRUE; - gst_rtspsrc_stream_push_event (src, stream, gst_event_new_eos (), TRUE); + gst_rtspsrc_stream_push_event (src, stream, gst_event_new_eos ()); return; /* ERRORS */ @@ -2390,6 +2430,11 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream, g_object_set (src->manager, "latency", src->latency, NULL); klass = G_OBJECT_GET_CLASS (G_OBJECT (src->manager)); + if (g_object_class_find_property (klass, "drop-on-latency")) { + g_object_set (src->manager, "drop-on-latency", src->drop_on_latency, + NULL); + } + if (g_object_class_find_property (klass, "buffer-mode")) { if (src->buffer_mode != BUFFER_MODE_AUTO) { g_object_set (src->manager, "buffer-mode", src->buffer_mode, NULL); @@ -2475,6 +2520,9 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream, g_object_set (rtpsession, "rtcp-rs-bandwidth", stream->rs_bandwidth, NULL); } + + g_object_set (rtpsession, "probation", src->probation, NULL); + g_signal_connect (rtpsession, "on-bye-ssrc", (GCallback) on_bye_ssrc, stream); g_signal_connect (rtpsession, "on-bye-timeout", (GCallback) on_timeout, @@ -2518,6 +2566,7 @@ gst_rtspsrc_stream_free_udp (GstRTSPStream * stream) for (i = 0; i < 2; i++) { if (stream->udpsrc[i]) { + GST_DEBUG ("free UDP source %d for stream %p", i, stream); gst_element_set_state (stream->udpsrc[i], GST_STATE_NULL); gst_object_unref (stream->udpsrc[i]); stream->udpsrc[i] = NULL; @@ -2695,7 +2744,8 @@ gst_rtspsrc_stream_configure_mcast (GstRTSPSrc * src, GstRTSPStream * stream, /* creating UDP source for RTP */ if (min != -1) { uri = g_strdup_printf ("udp://%s:%d", destination, min); - stream->udpsrc[0] = gst_element_make_from_uri (GST_URI_SRC, uri, NULL); + stream->udpsrc[0] = + gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL); g_free (uri); if (stream->udpsrc[0] == NULL) goto no_element; @@ -2703,6 +2753,10 @@ gst_rtspsrc_stream_configure_mcast (GstRTSPSrc * src, GstRTSPStream * stream, /* take ownership */ gst_object_ref_sink (stream->udpsrc[0]); + if (src->udp_buffer_size != 0) + g_object_set (G_OBJECT (stream->udpsrc[0]), "buffer-size", + src->udp_buffer_size, NULL); + /* change state */ gst_element_set_state (stream->udpsrc[0], GST_STATE_PAUSED); } @@ -2710,7 +2764,8 @@ gst_rtspsrc_stream_configure_mcast (GstRTSPSrc * src, GstRTSPStream * stream, /* creating another UDP source for RTCP */ if (max != -1) { uri = g_strdup_printf ("udp://%s:%d", destination, max); - stream->udpsrc[1] = gst_element_make_from_uri (GST_URI_SRC, uri, NULL); + stream->udpsrc[1] = + gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL); g_free (uri); if (stream->udpsrc[1] == NULL) goto no_element; @@ -2755,8 +2810,8 @@ gst_rtspsrc_stream_configure_udp (GstRTSPSrc * src, GstRTSPStream * stream, /* configure a timeout on the UDP port. When the timeout message is * posted, we assume UDP transport is not possible. We reconnect using TCP * if we can. */ - g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", src->udp_timeout, - NULL); + g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", + src->udp_timeout * 1000, NULL); /* get output pad of the UDP source. */ *outpad = gst_element_get_static_pad (stream->udpsrc[0], "src"); @@ -2837,7 +2892,8 @@ gst_rtspsrc_stream_configure_udp_sinks (GstRTSPSrc * src, rtp_port); uri = g_strdup_printf ("udp://%s:%d", destination, rtp_port); - stream->udpsink[0] = gst_element_make_from_uri (GST_URI_SINK, uri, NULL); + stream->udpsink[0] = + gst_element_make_from_uri (GST_URI_SINK, uri, NULL, NULL); g_free (uri); if (stream->udpsink[0] == NULL) goto no_sink_element; @@ -2889,7 +2945,8 @@ gst_rtspsrc_stream_configure_udp_sinks (GstRTSPSrc * src, rtcp_port); uri = g_strdup_printf ("udp://%s:%d", destination, rtcp_port); - stream->udpsink[1] = gst_element_make_from_uri (GST_URI_SINK, uri, NULL); + stream->udpsink[1] = + gst_element_make_from_uri (GST_URI_SINK, uri, NULL, NULL); g_free (uri); if (stream->udpsink[1] == NULL) goto no_sink_element; @@ -3103,15 +3160,15 @@ gst_rtspsrc_activate_streams (GstRTSPSrc * src) g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", (guint64) 0, NULL); } if (stream->srcpad) { + GST_DEBUG_OBJECT (src, "activating stream pad %p", stream); + gst_pad_set_active (stream->srcpad, TRUE); + /* if we don't have a session manager, set the caps now. If we have a * session, we will get a notification of the pad and the caps. */ if (!src->manager) { GST_DEBUG_OBJECT (src, "setting pad caps for stream %p", stream); gst_pad_set_caps (stream->srcpad, stream->caps); } - - GST_DEBUG_OBJECT (src, "activating stream pad %p", stream); - gst_pad_set_active (stream->srcpad, TRUE); /* add the pad */ if (!stream->added) { GST_DEBUG_OBJECT (src, "adding stream pad %p", stream); @@ -3136,7 +3193,8 @@ gst_rtspsrc_activate_streams (GstRTSPSrc * src) } static void -gst_rtspsrc_configure_caps (GstRTSPSrc * src, GstSegment * segment) +gst_rtspsrc_configure_caps (GstRTSPSrc * src, GstSegment * segment, + gboolean reset_manager) { GList *walk; guint64 start, stop; @@ -3172,7 +3230,7 @@ gst_rtspsrc_configure_caps (GstRTSPSrc * src, GstSegment * segment) } GST_DEBUG_OBJECT (src, "stream %p, caps %" GST_PTR_FORMAT, stream, caps); } - if (src->manager) { + if (reset_manager && src->manager) { GST_DEBUG_OBJECT (src, "clear session"); g_signal_emit_by_name (src->manager, "clear-pt-map", NULL); } @@ -3214,15 +3272,15 @@ done: static gboolean gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream, - GstEvent * event, gboolean source) + GstEvent * event) { gboolean res = TRUE; /* only streams that have a connection to the outside world */ - if (stream->srcpad == NULL) + if (stream->container || stream->disabled) goto done; - if (source && stream->udpsrc[0]) { + if (stream->udpsrc[0]) { gst_event_ref (event); res = gst_element_send_event (stream->udpsrc[0], event); } else if (stream->channelpad[0]) { @@ -3233,7 +3291,7 @@ gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream, res = gst_pad_send_event (stream->channelpad[0], event); } - if (source && stream->udpsrc[1]) { + if (stream->udpsrc[1]) { gst_event_ref (event); res &= gst_element_send_event (stream->udpsrc[1], event); } else if (stream->channelpad[1]) { @@ -3251,7 +3309,7 @@ done: } static gboolean -gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event, gboolean source) +gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event) { GList *streams; gboolean res = TRUE; @@ -3260,7 +3318,7 @@ gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event, gboolean source) GstRTSPStream *ostream = (GstRTSPStream *) streams->data; gst_event_ref (event); - res &= gst_rtspsrc_stream_push_event (src, ostream, event, source); + res &= gst_rtspsrc_stream_push_event (src, ostream, event); } gst_event_unref (event); @@ -3343,6 +3401,7 @@ static GstRTSPResult gst_rtsp_conninfo_close (GstRTSPSrc * src, GstRTSPConnInfo * info, gboolean free) { + GST_RTSP_STATE_LOCK (src); if (info->connected) { GST_DEBUG_OBJECT (src, "closing connection..."); gst_rtsp_connection_close (info->connection); @@ -3354,6 +3413,7 @@ gst_rtsp_conninfo_close (GstRTSPSrc * src, GstRTSPConnInfo * info, gst_rtsp_connection_free (info->connection); info->connection = NULL; } + GST_RTSP_STATE_UNLOCK (src); return GST_RTSP_OK; } @@ -3376,6 +3436,7 @@ gst_rtspsrc_connection_flush (GstRTSPSrc * src, gboolean flush) GList *walk; GST_DEBUG_OBJECT (src, "set flushing %d", flush); + GST_RTSP_STATE_LOCK (src); if (src->conninfo.connection) { GST_DEBUG_OBJECT (src, "connection flush"); gst_rtsp_connection_flush (src->conninfo.connection, flush); @@ -3386,6 +3447,7 @@ gst_rtspsrc_connection_flush (GstRTSPSrc * src, gboolean flush) if (stream->conninfo.connection) gst_rtsp_connection_flush (stream->conninfo.connection, flush); } + GST_RTSP_STATE_UNLOCK (src); } /* FIXME, handle server request, reply with OK, for now */ @@ -3514,6 +3576,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) GstFlowReturn ret = GST_FLOW_OK; GstBuffer *buf; gboolean is_rtcp, have_data; + GstEvent *event; /* here we are only interested in data messages */ have_data = FALSE; @@ -3644,6 +3707,10 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) gst_rtspsrc_activate_streams (src); src->need_activate = FALSE; } + if ((event = src->start_segment) != NULL) { + src->start_segment = NULL; + gst_rtspsrc_push_event (src, event); + } if (src->base_time == -1) { /* Take current running_time. This timestamp will be put on @@ -3788,10 +3855,13 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) * see what happens. */ GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL), ("The server closed the connection.")); - if ((res = - gst_rtsp_conninfo_reconnect (src, &src->conninfo, FALSE)) < 0) - goto connect_error; - + if (src->udp_reconnect) { + if ((res = + gst_rtsp_conninfo_reconnect (src, &src->conninfo, FALSE)) < 0) + goto connect_error; + } else { + goto server_eof; + } continue; case GST_RTSP_ENET: GST_DEBUG_OBJECT (src, "An ethernet problem occured."); @@ -4066,7 +4136,7 @@ gst_rtspsrc_loop_end_cmd (GstRTSPSrc * src, gint cmd, GstRTSPResult ret) } static void -gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd) +gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gint mask) { gint old; @@ -4076,19 +4146,25 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd) GST_DEBUG_OBJECT (src, "sending cmd %d", cmd); GST_OBJECT_LOCK (src); - old = src->loop_cmd; + old = src->pending_cmd; + if (old == CMD_RECONNECT) { + GST_DEBUG_OBJECT (src, "ignore, we were reconnecting"); + cmd = CMD_RECONNECT; + } if (old != CMD_WAIT) { - src->loop_cmd = CMD_WAIT; + src->pending_cmd = CMD_WAIT; GST_OBJECT_UNLOCK (src); /* cancel previous request */ gst_rtspsrc_loop_cancel_cmd (src, old); GST_OBJECT_LOCK (src); } - src->loop_cmd = cmd; + src->pending_cmd = cmd; /* interrupt if allowed */ - if (src->waiting) { - GST_DEBUG_OBJECT (src, "start connection flush"); + if (src->busy_cmd & mask) { + GST_DEBUG_OBJECT (src, "connection flush busy %d", src->busy_cmd); gst_rtspsrc_connection_flush (src, TRUE); + } else { + GST_DEBUG_OBJECT (src, "not interrupting busy cmd %d", src->busy_cmd); } if (src->task) gst_task_start (src->task); @@ -4132,8 +4208,11 @@ pause: gst_element_post_message (GST_ELEMENT_CAST (src), gst_message_new_segment_done (GST_OBJECT_CAST (src), src->segment.format, src->segment.position)); + gst_rtspsrc_push_event (src, + gst_event_new_segment_done (src->segment.format, + src->segment.position)); } else { - gst_rtspsrc_push_event (src, gst_event_new_eos (), FALSE); + gst_rtspsrc_push_event (src, gst_event_new_eos ()); } } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) { /* for fatal errors we post an error message, post the error before the @@ -4141,8 +4220,9 @@ pause: GST_ELEMENT_ERROR (src, STREAM, FAILED, ("Internal data flow error."), ("streaming task paused, reason %s (%d)", reason, ret)); - gst_rtspsrc_push_event (src, gst_event_new_eos (), FALSE); + gst_rtspsrc_push_event (src, gst_event_new_eos ()); } + gst_rtspsrc_loop_send_cmd (src, CMD_WAIT, CMD_LOOP); return FALSE; } } @@ -4529,8 +4609,8 @@ receive_error: { switch (res) { case GST_RTSP_EEOF: - GST_WARNING_OBJECT (src, "server closed connection, doing reconnect"); - if (try == 0) { + GST_WARNING_OBJECT (src, "server closed connection"); + if ((try == 0) && !src->interleaved && src->udp_reconnect) { try++; /* if reconnect succeeds, try again */ if ((res = @@ -4731,9 +4811,7 @@ gst_rtspsrc_parse_methods (GstRTSPSrc * src, GstRTSPMessage * response) { GstRTSPHeaderField field; gchar *respoptions; - gchar **options; gint indx = 0; - gint i; /* reset supported methods */ src->methods = 0; @@ -4751,25 +4829,7 @@ gst_rtspsrc_parse_methods (GstRTSPSrc * src, GstRTSPMessage * response) if (!respoptions) break; - /* If we get here, the server gave a list of supported methods, parse - * them here. The string is like: - * - * OPTIONS, DESCRIBE, ANNOUNCE, PLAY, SETUP, ... - */ - options = g_strsplit (respoptions, ",", 0); - - for (i = 0; options[i]; i++) { - gchar *stripped; - gint method; - - stripped = g_strstrip (options[i]); - method = gst_rtsp_find_method (stripped); - - /* keep bitfield of supported methods */ - if (method != GST_RTSP_INVALID) - src->methods |= method; - } - g_strfreev (options); + src->methods |= gst_rtsp_options_from_text (respoptions); indx++; } @@ -4865,6 +4925,14 @@ gst_rtspsrc_create_transports_string (GstRTSPSrc * src, if (add_udp_str) g_string_append (result, "/UDP"); g_string_append (result, ";multicast"); + if (src->next_port_num != 0) { + if (src->client_port_range.max > 0 && + src->next_port_num >= src->client_port_range.max) + goto no_ports; + + g_string_append_printf (result, ";client_port=%d-%d", + src->next_port_num, src->next_port_num + 1); + } } else if (protocols & GST_RTSP_LOWER_TRANS_TCP) { GST_DEBUG_OBJECT (src, "adding TCP"); @@ -4881,8 +4949,14 @@ gst_rtspsrc_create_transports_string (GstRTSPSrc * src, /* ERRORS */ failed: { + GST_ERROR ("extension gave error %d", res); return res; } +no_ports: + { + GST_ERROR ("no more ports available"); + return GST_RTSP_ERROR; + } } static GstRTSPResult @@ -4956,6 +5030,7 @@ done: /* ERRORS */ failed: { + GST_ERROR ("failed to allocate udp ports"); return GST_RTSP_ERROR; } } @@ -5225,6 +5300,12 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src, gboolean async) /* only allow multicast for other streams */ GST_DEBUG_OBJECT (src, "stream %p as UDP multicast", stream); protocols = GST_RTSP_LOWER_TRANS_UDP_MCAST; + /* if the server selected our ports, increment our counters so that + * we select a new port later */ + if (src->next_port_num == transport.port.min && + src->next_port_num + 1 == transport.port.max) { + src->next_port_num += 2; + } break; case GST_RTSP_LOWER_TRANS_UDP: /* only allow unicast for other streams */ @@ -5512,6 +5593,7 @@ gst_rtspsrc_open_from_sdp (GstRTSPSrc * src, GstSDPMessage * sdp, setup_failed: { GST_ERROR_OBJECT (src, "setup failed"); + gst_rtspsrc_cleanup (src); return res; } } @@ -6066,8 +6148,23 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, gboolean async) * only in async case, since receive elements may not have been affected * by overall state change (e.g. not around yet), * do not mess with state in sync case (e.g. seeking) */ - if (async) - gst_element_set_state (GST_ELEMENT_CAST (src), GST_STATE_PLAYING); + if (async) { + /* state change might be happening in the application thread. A + * specific case is when chaging state to NULL where we will wait + * for this task to finish (gst_rtspsrc_stop). However this task + * will try to change the state to PLAYING causing a deadlock. */ + + /* make sure we are not in the middle of a state change. The + * state lock is a recursive lock so it's safe to lock twice from + * the same thread */ + if (GST_STATE_TRYLOCK (src)) { + gst_element_set_state (GST_ELEMENT_CAST (src), GST_STATE_PLAYING); + GST_STATE_UNLOCK (src); + } else { + res = GST_RTSP_ERROR; + goto changing_state; + } + } /* construct a control url */ if (src->control) @@ -6104,6 +6201,11 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, gboolean async) gst_rtsp_message_add_header (&request, GST_RTSP_HDR_RANGE, hval); g_free (hval); + + /* store the newsegment event so it can be sent from the streaming thread. */ + if (src->start_segment) + gst_event_unref (src->start_segment); + src->start_segment = gst_event_new_segment (&src->segment); } if (segment->rate != 1.0) { @@ -6185,12 +6287,13 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, gboolean async) if (control) break; } + /* configure the caps of the streams after we parsed all headers. Only reset + * the manager object when we set a new Range header (we did a seek) */ + gst_rtspsrc_configure_caps (src, segment, src->need_range); + /* set again when needed */ src->need_range = FALSE; - /* configure the caps of the streams after we parsed all headers. */ - gst_rtspsrc_configure_caps (src, segment); - src->running = TRUE; src->base_time = -1; src->state = GST_RTSP_STATE_PLAYING; @@ -6224,6 +6327,11 @@ was_playing: GST_DEBUG_OBJECT (src, "we were already PLAYING"); goto done; } +changing_state: + { + GST_DEBUG_OBJECT (src, "failed going to PLAYING, already changing state"); + goto done; + } create_request_failed: { gchar *str = gst_rtsp_strresult (res); @@ -6250,7 +6358,7 @@ send_error: } static GstRTSPResult -gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle, gboolean async) +gst_rtspsrc_pause (GstRTSPSrc * src, gboolean async) { GstRTSPResult res = GST_RTSP_OK; GstRTSPMessage request = { 0 }; @@ -6397,7 +6505,7 @@ gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message) /* we only act on the first udp timeout message, others are irrelevant * and can be ignored. */ if (!ignore_timeout) - gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_RECONNECT); + gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_RECONNECT, CMD_LOOP); /* eat and free */ gst_message_unref (message); return; @@ -6454,46 +6562,39 @@ static void gst_rtspsrc_thread (GstRTSPSrc * src) { gint cmd; - GstRTSPResult ret; - gboolean running = FALSE; GST_OBJECT_LOCK (src); - cmd = src->loop_cmd; - src->loop_cmd = CMD_WAIT; + cmd = src->pending_cmd; + if (cmd == CMD_RECONNECT || CMD_PLAY || cmd == CMD_LOOP) + src->pending_cmd = CMD_LOOP; + else + src->pending_cmd = CMD_WAIT; GST_DEBUG_OBJECT (src, "got command %d", cmd); /* we got the message command, so ensure communication is possible again */ gst_rtspsrc_connection_flush (src, FALSE); - /* we allow these to be interrupted */ - if (cmd == CMD_LOOP || cmd == CMD_CLOSE || cmd == CMD_PAUSE) - src->waiting = TRUE; + src->busy_cmd = cmd; GST_OBJECT_UNLOCK (src); switch (cmd) { case CMD_OPEN: - ret = gst_rtspsrc_open (src, TRUE); + gst_rtspsrc_open (src, TRUE); break; case CMD_PLAY: - ret = gst_rtspsrc_play (src, &src->segment, TRUE); - if (ret == GST_RTSP_OK) - running = TRUE; + gst_rtspsrc_play (src, &src->segment, TRUE); break; case CMD_PAUSE: - ret = gst_rtspsrc_pause (src, TRUE, TRUE); - if (ret == GST_RTSP_OK) - running = TRUE; + gst_rtspsrc_pause (src, TRUE); break; case CMD_CLOSE: - ret = gst_rtspsrc_close (src, TRUE, FALSE); + gst_rtspsrc_close (src, TRUE, FALSE); break; case CMD_LOOP: - running = gst_rtspsrc_loop (src); + gst_rtspsrc_loop (src); break; case CMD_RECONNECT: - ret = gst_rtspsrc_reconnect (src, FALSE); - if (ret == GST_RTSP_OK) - running = TRUE; + gst_rtspsrc_reconnect (src, FALSE); break; default: break; @@ -6501,14 +6602,12 @@ gst_rtspsrc_thread (GstRTSPSrc * src) GST_OBJECT_LOCK (src); /* and go back to sleep */ - if (src->loop_cmd == CMD_WAIT) { - if (running) - src->loop_cmd = CMD_LOOP; - else if (src->task) + if (src->pending_cmd == CMD_WAIT) { + if (src->task) gst_task_pause (src->task); } /* reset waiting */ - src->waiting = FALSE; + src->busy_cmd = CMD_WAIT; GST_OBJECT_UNLOCK (src); } @@ -6519,10 +6618,10 @@ gst_rtspsrc_start (GstRTSPSrc * src) GST_OBJECT_LOCK (src); - src->loop_cmd = CMD_WAIT; + src->pending_cmd = CMD_WAIT; if (src->task == NULL) { - src->task = gst_task_new ((GstTaskFunction) gst_rtspsrc_thread, src); + src->task = gst_task_new ((GstTaskFunction) gst_rtspsrc_thread, src, NULL); if (src->task == NULL) goto task_error; @@ -6548,7 +6647,7 @@ gst_rtspsrc_stop (GstRTSPSrc * src) GST_DEBUG_OBJECT (src, "stopping"); /* also cancels pending task */ - gst_rtspsrc_loop_send_cmd (src, CMD_WAIT); + gst_rtspsrc_loop_send_cmd (src, CMD_WAIT, CMD_CLOSE); GST_OBJECT_LOCK (src); if ((task = src->task)) { @@ -6596,12 +6695,12 @@ gst_rtspsrc_change_state (GstElement * element, GstStateChange transition) /* first attempt, don't ignore timeouts */ rtspsrc->ignore_timeout = FALSE; rtspsrc->open_error = FALSE; - gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_OPEN); + gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_OPEN, 0); break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: case GST_STATE_CHANGE_PLAYING_TO_PAUSED: /* unblock the tcp tasks and make the loop waiting */ - gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_WAIT); + gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_WAIT, CMD_LOOP); break; case GST_STATE_CHANGE_PAUSED_TO_READY: break; @@ -6615,18 +6714,18 @@ gst_rtspsrc_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_PLAY); + gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_PLAY, 0); break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: /* send pause request and keep the idle task around */ - gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_PAUSE); + gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_PAUSE, CMD_LOOP); ret = GST_STATE_CHANGE_NO_PREROLL; break; case GST_STATE_CHANGE_READY_TO_PAUSED: ret = GST_STATE_CHANGE_NO_PREROLL; break; case GST_STATE_CHANGE_PAUSED_TO_READY: - gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_CLOSE); + gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_CLOSE, CMD_PAUSE); break; case GST_STATE_CHANGE_READY_TO_NULL: gst_rtspsrc_stop (rtspsrc); @@ -6654,7 +6753,7 @@ gst_rtspsrc_send_event (GstElement * element, GstEvent * event) rtspsrc = GST_RTSPSRC (element); if (GST_EVENT_IS_DOWNSTREAM (event)) { - res = gst_rtspsrc_push_event (rtspsrc, event, TRUE); + res = gst_rtspsrc_push_event (rtspsrc, event); } else { res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event); }