rtspsrc: do not try to send EOS with invalid seqnum
[platform/upstream/gst-plugins-good.git] / gst / rtsp / gstrtspsrc.c
index 1fe834b..cc97bf7 100644 (file)
@@ -42,6 +42,7 @@
  */
 /**
  * SECTION:element-rtspsrc
+ * @title: rtspsrc
  *
  * Makes a connection to an RTSP server and read the data.
  * rtspsrc strictly follows RFC 2326 and therefore does not (yet) support
  * rtspsrc acts like a live source and will therefore only generate data in the
  * PLAYING state.
  *
- * <refsect2>
- * <title>Example launch line</title>
+ * If a RTP session times out then the rtspsrc will generate an element message
+ * named "GstRTSPSrcTimeout". Currently this is only supported for timeouts
+ * triggered by RTCP.
+ *
+ * The message's structure contains three fields:
+ *
+ *   #GstRTSPSrcTimeoutCause `cause`: the cause of the timeout.
+ *
+ *   #gint `stream-number`: an internal identifier of the stream that timed out.
+ *
+ *   #guint `ssrc`: the SSRC of the stream that timed out.
+ *
+ * ## Example launch line
  * |[
  * gst-launch-1.0 rtspsrc location=rtsp://some.server/url ! fakesink
  * ]| Establish a connection to an RTSP server and send the raw RTP packets to a
  * fakesink.
- * </refsect2>
+ *
  */
 
 #ifdef HAVE_CONFIG_H
@@ -693,7 +705,7 @@ gst_rtspsrc_class_init (GstRTSPSrcClass * klass)
   /**
    * GstRTSPSrc:port-range:
    *
-   * Configure the client port numbers that can be used to recieve RTP and
+   * Configure the client port numbers that can be used to receive RTP and
    * RTCP.
    */
   g_object_class_install_property (gobject_class, PROP_PORT_RANGE,
@@ -1574,7 +1586,7 @@ gst_rtspsrc_set_property (GObject * object, guint prop_id, const GValue * value,
       const gchar *str;
 
       str = g_value_get_string (value);
-      if (sscanf (str, "%u-%u", &rtspsrc->client_port_range.min,
+      if (str == NULL || sscanf (str, "%u-%u", &rtspsrc->client_port_range.min,
               &rtspsrc->client_port_range.max) != 2) {
         rtspsrc->client_port_range.min = 0;
         rtspsrc->client_port_range.max = 0;
@@ -2269,7 +2281,9 @@ gst_rtspsrc_stream_free (GstRTSPSrc * src, GstRTSPStream * stream)
   for (i = 0; i < 2; i++) {
     if (stream->udpsrc[i]) {
       gst_element_set_state (stream->udpsrc[i], GST_STATE_NULL);
-      gst_bin_remove (GST_BIN_CAST (src), stream->udpsrc[i]);
+      if (gst_object_has_as_parent (GST_OBJECT (stream->udpsrc[i]),
+              GST_OBJECT (src)))
+        gst_bin_remove (GST_BIN_CAST (src), stream->udpsrc[i]);
       gst_object_unref (stream->udpsrc[i]);
     }
     if (stream->channelpad[i])
@@ -2277,7 +2291,9 @@ gst_rtspsrc_stream_free (GstRTSPSrc * src, GstRTSPStream * stream)
 
     if (stream->udpsink[i]) {
       gst_element_set_state (stream->udpsink[i], GST_STATE_NULL);
-      gst_bin_remove (GST_BIN_CAST (src), stream->udpsink[i]);
+      if (gst_object_has_as_parent (GST_OBJECT (stream->udpsink[i]),
+              GST_OBJECT (src)))
+        gst_bin_remove (GST_BIN_CAST (src), stream->udpsink[i]);
       gst_object_unref (stream->udpsink[i]);
     }
   }
@@ -2566,7 +2582,8 @@ gst_rtspsrc_set_state (GstRTSPSrc * src, GstState state)
 }
 
 static void
-gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing)
+gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing,
+    guint32 seqnum)
 {
   GstEvent *event;
   gint cmd;
@@ -2574,11 +2591,13 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing)
 
   if (flush) {
     event = gst_event_new_flush_start ();
+    gst_event_set_seqnum (event, seqnum);
     GST_DEBUG_OBJECT (src, "start flush");
     cmd = CMD_WAIT;
     state = GST_STATE_PAUSED;
   } else {
     event = gst_event_new_flush_stop (FALSE);
+    gst_event_set_seqnum (event, seqnum);
     GST_DEBUG_OBJECT (src, "stop flush; playing %d", playing);
     cmd = CMD_LOOP;
     if (playing)
@@ -2707,7 +2726,7 @@ gst_rtspsrc_perform_seek (GstRTSPSrc * src, GstEvent * event)
    * blocking in preroll). */
   if (flush) {
     GST_DEBUG_OBJECT (src, "starting flush");
-    gst_rtspsrc_flush (src, TRUE, FALSE);
+    gst_rtspsrc_flush (src, TRUE, FALSE, gst_event_get_seqnum (event));
   } else {
     if (src->task) {
       gst_task_pause (src->task);
@@ -2756,7 +2775,7 @@ gst_rtspsrc_perform_seek (GstRTSPSrc * src, GstEvent * event)
   if (flush) {
     /* if we started flush, we stop now */
     GST_DEBUG_OBJECT (src, "stopping flush");
-    gst_rtspsrc_flush (src, FALSE, playing);
+    gst_rtspsrc_flush (src, FALSE, playing, gst_event_get_seqnum (event));
   }
 
   /* now we did the seek and can activate the new segment values */
@@ -3200,6 +3219,24 @@ was_ok:
   }
 }
 
+static GstPadProbeReturn
+udpsrc_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+  guint32 *segment_seqnum = user_data;
+
+  switch (GST_EVENT_TYPE (info->data)) {
+    case GST_EVENT_SEGMENT:
+      if (!gst_event_is_writable (info->data))
+        info->data = gst_event_make_writable (info->data);
+
+      *segment_seqnum = gst_event_get_seqnum (info->data);
+    default:
+      break;
+  }
+
+  return GST_PAD_PROBE_OK;
+}
+
 static gboolean
 copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
 {
@@ -3410,7 +3447,7 @@ on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
 }
 
 static void
-on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
+on_timeout_common (GObject * session, GObject * source, GstRTSPStream * stream)
 {
   GstRTSPSrc *src = stream->parent;
   guint ssrc;
@@ -3425,6 +3462,22 @@ on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
 }
 
 static void
+on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
+{
+  GstRTSPSrc *src = stream->parent;
+
+  /* timeout, post element message */
+  gst_element_post_message (GST_ELEMENT_CAST (src),
+      gst_message_new_element (GST_OBJECT_CAST (src),
+          gst_structure_new ("GstRTSPSrcTimeout",
+              "cause", G_TYPE_ENUM, GST_RTSP_SRC_TIMEOUT_CAUSE_RTCP,
+              "stream-number", G_TYPE_INT, stream->id, "ssrc", G_TYPE_UINT,
+              stream->ssrc, NULL)));
+
+  on_timeout_common (session, source, stream);
+}
+
+static void
 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc, GstRTSPSrc * src)
 {
   GstRTSPStream *stream;
@@ -3941,8 +3994,8 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream,
 
         g_signal_connect (rtpsession, "on-bye-ssrc", (GCallback) on_bye_ssrc,
             stream);
-        g_signal_connect (rtpsession, "on-bye-timeout", (GCallback) on_timeout,
-            stream);
+        g_signal_connect (rtpsession, "on-bye-timeout",
+            (GCallback) on_timeout_common, stream);
         g_signal_connect (rtpsession, "on-timeout", (GCallback) on_timeout,
             stream);
         g_signal_connect (rtpsession, "on-ssrc-active",
@@ -4269,6 +4322,10 @@ gst_rtspsrc_stream_configure_udp (GstRTSPSrc * src, GstRTSPStream * stream,
         GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
         GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocked, src, NULL);
 
+    gst_pad_add_probe (stream->blockedpad,
+        GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, udpsrc_probe_cb,
+        &(stream->segment_seqnum[0]), NULL);
+
     if (stream->channelpad[0]) {
       GST_DEBUG_OBJECT (src, "connecting UDP source 0 to manager");
       /* configure for UDP delivery, we need to connect the UDP pads to
@@ -4304,6 +4361,9 @@ gst_rtspsrc_stream_configure_udp (GstRTSPSrc * src, GstRTSPStream * stream,
       GST_DEBUG_OBJECT (src, "connecting UDP source 1 to manager");
 
       pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
+      gst_pad_add_probe (pad,
+          GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, udpsrc_probe_cb,
+          &(stream->segment_seqnum[1]), NULL);
       gst_pad_link_full (pad, stream->channelpad[1],
           GST_PAD_LINK_CHECK_NOTHING);
       gst_object_unref (pad);
@@ -4803,8 +4863,16 @@ gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream,
     goto done;
 
   if (stream->udpsrc[0]) {
-    gst_event_ref (event);
-    res = gst_element_send_event (stream->udpsrc[0], event);
+    GstEvent *sent_event;
+
+    if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
+      sent_event = gst_event_new_eos ();
+      gst_event_set_seqnum (sent_event, stream->segment_seqnum[0]);
+    } else {
+      sent_event = gst_event_ref (event);
+    }
+
+    res = gst_element_send_event (stream->udpsrc[0], sent_event);
   } else if (stream->channelpad[0]) {
     gst_event_ref (event);
     if (GST_PAD_IS_SRC (stream->channelpad[0]))
@@ -4814,8 +4882,18 @@ gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream,
   }
 
   if (stream->udpsrc[1]) {
-    gst_event_ref (event);
-    res &= gst_element_send_event (stream->udpsrc[1], event);
+    GstEvent *sent_event;
+
+    if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
+      sent_event = gst_event_new_eos ();
+      if (stream->segment_seqnum[1] != GST_SEQNUM_INVALID) {
+        gst_event_set_seqnum (sent_event, stream->segment_seqnum[1]);
+      }
+    } else {
+      sent_event = gst_event_ref (event);
+    }
+
+    res &= gst_element_send_event (stream->udpsrc[1], sent_event);
   } else if (stream->channelpad[1]) {
     gst_event_ref (event);
     if (GST_PAD_IS_SRC (stream->channelpad[1]))