stream: improve join and leave of the pipeline
authorWim Taymans <wim.taymans@collabora.co.uk>
Fri, 26 Oct 2012 15:28:10 +0000 (17:28 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Fri, 26 Oct 2012 15:28:10 +0000 (17:28 +0200)
simplify code
Do the cleanup properly
Add some docs

gst/rtsp-server/rtsp-media.c
gst/rtsp-server/rtsp-stream.c
gst/rtsp-server/rtsp-stream.h

index d00a51b..3fc6483 100644 (file)
@@ -72,7 +72,6 @@ static gboolean default_handle_message (GstRTSPMedia * media,
     GstMessage * message);
 static void finish_unprepare (GstRTSPMedia * media);
 static gboolean default_unprepare (GstRTSPMedia * media);
-static void unlock_streams (GstRTSPMedia * media);
 
 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
 
@@ -896,22 +895,6 @@ weird_type:
 }
 
 static void
-unlock_streams (GstRTSPMedia * media)
-{
-  guint i;
-
-  /* unlock the udp src elements */
-  for (i = 0; i < media->streams->len; i++) {
-    GstRTSPStream *stream;
-
-    stream = g_ptr_array_index (media->streams, i);
-
-    gst_element_set_locked_state (stream->udpsrc[0], FALSE);
-    gst_element_set_locked_state (stream->udpsrc[1], FALSE);
-  }
-}
-
-static void
 gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
 {
   g_mutex_lock (&media->lock);
@@ -1075,24 +1058,21 @@ static void
 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
 {
   GstRTSPStream *stream;
-  gint i;
 
   stream = gst_rtsp_media_create_stream (media, element, pad);
+
   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad),
       stream->idx);
 
+  /* we will be adding elements below that will cause ASYNC_DONE to be
+   * posted in the bus. We want to ignore those messages until the
+   * pipeline really prerolled. */
   media->adding = TRUE;
 
-  gst_rtsp_stream_join_bin (stream, GST_BIN (media->pipeline), media->rtpbin);
-
-  for (i = 0; i < 2; i++) {
-    gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
-    gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
-    gst_element_set_state (stream->appqueue[i], GST_STATE_PAUSED);
-    gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
-    gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
-    gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
-  }
+  /* join the element in the PAUSED state because this callback is
+   * called from the streaming thread and it is PAUSED */
+  gst_rtsp_stream_join_bin (stream, GST_BIN (media->pipeline),
+      media->rtpbin, GST_STATE_PAUSED);
 
   media->adding = FALSE;
 }
@@ -1173,7 +1153,8 @@ gst_rtsp_media_prepare (GstRTSPMedia * media)
 
     stream = g_ptr_array_index (media->streams, i);
 
-    gst_rtsp_stream_join_bin (stream, GST_BIN (media->pipeline), media->rtpbin);
+    gst_rtsp_stream_join_bin (stream, GST_BIN (media->pipeline),
+        media->rtpbin, GST_STATE_NULL);
   }
 
   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
@@ -1208,7 +1189,7 @@ gst_rtsp_media_prepare (GstRTSPMedia * media)
       /* we need to go to PLAYING */
       GST_INFO ("NO_PREROLL state change: live media %p", media);
       /* FIXME we disable seeking for live streams for now. We should perform a
-       * seeking query in preroll instead and do a seeking query. */
+       * seeking query in preroll instead */
       media->seekable = FALSE;
       media->is_live = TRUE;
       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
@@ -1219,7 +1200,8 @@ gst_rtsp_media_prepare (GstRTSPMedia * media)
       goto state_failed;
   }
 
-  /* now wait for all pads to be prerolled */
+  /* now wait for all pads to be prerolled, FIXME, we should somehow be
+   * able to do this async so that we don't block the server thread. */
   status = gst_rtsp_media_get_status (media);
   if (status == GST_RTSP_MEDIA_STATUS_ERROR)
     goto state_failed;
@@ -1262,7 +1244,6 @@ finish_unprepare (GstRTSPMedia * media)
 
   GST_DEBUG ("shutting down");
 
-  unlock_streams (media);
   gst_element_set_state (media->pipeline, GST_STATE_NULL);
 
   for (i = 0; i < media->streams->len; i++) {
@@ -1278,6 +1259,7 @@ finish_unprepare (GstRTSPMedia * media)
   g_ptr_array_set_size (media->streams, 0);
 
   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
+  media->rtpbin = NULL;
 
   gst_object_unref (media->pipeline);
   media->pipeline = NULL;
index 13360bf..cd0d967 100644 (file)
@@ -68,28 +68,12 @@ gst_rtsp_stream_finalize (GObject * obj)
 
   stream = GST_RTSP_STREAM (obj);
 
-  g_assert (!stream->is_joined);
+  /* we really need to be unjoined now */
+  g_return_if_fail (!stream->is_joined);
 
   gst_object_unref (stream->payloader);
   gst_object_unref (stream->srcpad);
 
-  if (stream->session)
-    g_object_unref (stream->session);
-
-  if (stream->caps)
-    gst_caps_unref (stream->caps);
-
-  if (stream->send_rtp_sink)
-    gst_object_unref (stream->send_rtp_sink);
-  if (stream->send_rtp_src)
-    gst_object_unref (stream->send_rtp_src);
-  if (stream->send_rtcp_src)
-    gst_object_unref (stream->send_rtcp_src);
-  if (stream->recv_rtcp_sink)
-    gst_object_unref (stream->recv_rtcp_sink);
-  if (stream->recv_rtp_sink)
-    gst_object_unref (stream->recv_rtp_sink);
-
   g_list_free (stream->transports);
 
   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
@@ -565,16 +549,18 @@ static GstAppSinkCallbacks sink_cb = {
  * @stream: a #GstRTSPStream
  * @bin: a #GstBin to join
  * @rtpbin: a rtpbin element in @bin
+ * @state: the target state of the new elements
  *
  * Join the #Gstbin @bin that contains the element @rtpbin.
  *
- * @stream will link to @rtpbin, which must be inside @bin.
+ * @stream will link to @rtpbin, which must be inside @bin. The elements
+ * added to @bin will be set to the state given in @state.
  *
  * Returns: %TRUE on success.
  */
 gboolean
 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
-    GstElement * rtpbin)
+    GstElement * rtpbin, GstState state)
 {
   gint i, idx;
   gchar *name;
@@ -585,52 +571,42 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
 
-  idx = stream->idx;
-
   if (stream->is_joined)
     return TRUE;
 
-  GST_INFO ("stream %p joining bin", stream);
+  /* create a session with the same index as the stream */
+  idx = stream->idx;
+
+  GST_INFO ("stream %p joining bin as session %d", stream, idx);
 
   if (!alloc_ports (stream))
     goto no_ports;
 
-  /* add the ports to the pipeline */
-  for (i = 0; i < 2; i++) {
-    gst_bin_add (bin, stream->udpsink[i]);
-    gst_bin_add (bin, stream->udpsrc[i]);
-  }
-
-  /* create elements for the TCP transfer */
-  for (i = 0; i < 2; i++) {
-    stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
-    stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
-    stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
-    g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
-    g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
-    gst_bin_add (bin, stream->appqueue[i]);
-    gst_bin_add (bin, stream->appsink[i]);
-    gst_bin_add (bin, stream->appsrc[i]);
-    gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
-        &sink_cb, stream, NULL);
-  }
-
-  /* hook up the stream to the RTP session elements. */
+  /* get a pad for sending RTP */
   name = g_strdup_printf ("send_rtp_sink_%u", idx);
   stream->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
   g_free (name);
+  /* link the RTP pad to the session manager, it should not really fail unless
+   * this is not really an RTP pad */
+  ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
+  if (ret != GST_PAD_LINK_OK)
+    goto link_failed;
+
+  /* get pads from the RTP session element for sending and receiving
+   * RTP/RTCP*/
   name = g_strdup_printf ("send_rtp_src_%u", idx);
-  stream->send_rtp_src = gst_element_get_static_pad (rtpbin, name);
+  stream->send_src[0] = gst_element_get_static_pad (rtpbin, name);
   g_free (name);
   name = g_strdup_printf ("send_rtcp_src_%u", idx);
-  stream->send_rtcp_src = gst_element_get_request_pad (rtpbin, name);
-  g_free (name);
-  name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
-  stream->recv_rtcp_sink = gst_element_get_request_pad (rtpbin, name);
+  stream->send_src[1] = gst_element_get_request_pad (rtpbin, name);
   g_free (name);
   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
-  stream->recv_rtp_sink = gst_element_get_request_pad (rtpbin, name);
+  stream->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
+  g_free (name);
+  name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
+  stream->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
   g_free (name);
+
   /* get the session */
   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &stream->session);
 
@@ -647,110 +623,119 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
       stream);
 
-  /* link the RTP pad to the session manager */
-  ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
-  if (ret != GST_PAD_LINK_OK)
-    goto link_failed;
+  for (i = 0; i < 2; i++) {
+    /* For the sender we create this bit of pipeline for both
+     * RTP and RTCP. Sync and preroll are enabled on udpsink so
+     * we need to add a queue before appsink to make the pipeline
+     * not block. For the TCP case, we want to pump data to the
+     * client as fast as possible anyway.
+     *
+     * .--------.      .-----.    .---------.
+     * | rtpbin |      | tee |    | udpsink |
+     * |       send->sink   src->sink       |
+     * '--------'      |     |    '---------'
+     *                 |     |    .---------.    .---------.
+     *                 |     |    |  queue  |    | appsink |
+     *                 |    src->sink      src->sink       |
+     *                 '-----'    '---------'    '---------'
+     */
+    /* make tee for RTP/RTCP */
+    stream->tee[i] = gst_element_factory_make ("tee", NULL);
+    gst_bin_add (bin, stream->tee[i]);
+
+    /* and link to rtpbin send pad */
+    pad = gst_element_get_static_pad (stream->tee[i], "sink");
+    gst_pad_link (stream->send_src[i], pad);
+    gst_object_unref (pad);
+
+    /* add udpsink */
+    gst_bin_add (bin, stream->udpsink[i]);
 
-  /* make tee for RTP and link to stream */
-  stream->tee[0] = gst_element_factory_make ("tee", NULL);
-  gst_bin_add (bin, stream->tee[0]);
-
-  pad = gst_element_get_static_pad (stream->tee[0], "sink");
-  gst_pad_link (stream->send_rtp_src, pad);
-  gst_object_unref (pad);
-
-  /* link RTP sink, we're pretty sure this will work. */
-  teepad = gst_element_get_request_pad (stream->tee[0], "src_%u");
-  pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
-  gst_pad_link (teepad, pad);
-  gst_object_unref (pad);
-  gst_object_unref (teepad);
-
-  teepad = gst_element_get_request_pad (stream->tee[0], "src_%u");
-  pad = gst_element_get_static_pad (stream->appqueue[0], "sink");
-  gst_pad_link (teepad, pad);
-  gst_object_unref (pad);
-  gst_object_unref (teepad);
-
-  queuepad = gst_element_get_static_pad (stream->appqueue[0], "src");
-  pad = gst_element_get_static_pad (stream->appsink[0], "sink");
-  gst_pad_link (queuepad, pad);
-  gst_object_unref (pad);
-  gst_object_unref (queuepad);
-
-  /* make tee for RTCP */
-  stream->tee[1] = gst_element_factory_make ("tee", NULL);
-  gst_bin_add (bin, stream->tee[1]);
-
-  pad = gst_element_get_static_pad (stream->tee[1], "sink");
-  gst_pad_link (stream->send_rtcp_src, pad);
-  gst_object_unref (pad);
-
-  /* link RTCP elements */
-  teepad = gst_element_get_request_pad (stream->tee[1], "src_%u");
-  pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
-  gst_pad_link (teepad, pad);
-  gst_object_unref (pad);
-  gst_object_unref (teepad);
-
-  teepad = gst_element_get_request_pad (stream->tee[1], "src_%u");
-  pad = gst_element_get_static_pad (stream->appqueue[1], "sink");
-  gst_pad_link (teepad, pad);
-  gst_object_unref (pad);
-  gst_object_unref (teepad);
-
-  queuepad = gst_element_get_static_pad (stream->appqueue[1], "src");
-  pad = gst_element_get_static_pad (stream->appsink[1], "sink");
-  gst_pad_link (queuepad, pad);
-  gst_object_unref (pad);
-  gst_object_unref (queuepad);
-
-  /* make selector for the RTP receivers */
-  stream->selector[0] = gst_element_factory_make ("funnel", NULL);
-  gst_bin_add (bin, stream->selector[0]);
-
-  pad = gst_element_get_static_pad (stream->selector[0], "src");
-  gst_pad_link (pad, stream->recv_rtp_sink);
-  gst_object_unref (pad);
-
-  selpad = gst_element_get_request_pad (stream->selector[0], "sink_%u");
-  pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
-  gst_pad_link (pad, selpad);
-  gst_object_unref (pad);
-  gst_object_unref (selpad);
-  selpad = gst_element_get_request_pad (stream->selector[0], "sink_%u");
-  pad = gst_element_get_static_pad (stream->appsrc[0], "src");
-  gst_pad_link (pad, selpad);
-  gst_object_unref (pad);
-  gst_object_unref (selpad);
-
-  /* make selector for the RTCP receivers */
-  stream->selector[1] = gst_element_factory_make ("funnel", NULL);
-  gst_bin_add (bin, stream->selector[1]);
-
-  pad = gst_element_get_static_pad (stream->selector[1], "src");
-  gst_pad_link (pad, stream->recv_rtcp_sink);
-  gst_object_unref (pad);
-
-  selpad = gst_element_get_request_pad (stream->selector[1], "sink_%u");
-  pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
-  gst_pad_link (pad, selpad);
-  gst_object_unref (pad);
-  gst_object_unref (selpad);
-
-  selpad = gst_element_get_request_pad (stream->selector[1], "sink_%u");
-  pad = gst_element_get_static_pad (stream->appsrc[1], "src");
-  gst_pad_link (pad, selpad);
-  gst_object_unref (pad);
-  gst_object_unref (selpad);
-
-  /* we set and keep these to playing so that they don't cause NO_PREROLL return
-   * values */
-  gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
-  gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
-  gst_element_set_locked_state (stream->udpsrc[0], TRUE);
-  gst_element_set_locked_state (stream->udpsrc[1], TRUE);
+    /* link tee to udpsink */
+    teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
+    pad = gst_element_get_static_pad (stream->udpsink[i], "sink");
+    gst_pad_link (teepad, pad);
+    gst_object_unref (pad);
+    gst_object_unref (teepad);
+
+    /* make queue */
+    stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
+    gst_bin_add (bin, stream->appqueue[i]);
+    /* and link to tee */
+    teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
+    pad = gst_element_get_static_pad (stream->appqueue[i], "sink");
+    gst_pad_link (teepad, pad);
+    gst_object_unref (pad);
+    gst_object_unref (teepad);
+
+    /* make appsink */
+    stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
+    g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
+    g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
+    gst_bin_add (bin, stream->appsink[i]);
+    gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
+        &sink_cb, stream, NULL);
+    /* and link to queue */
+    queuepad = gst_element_get_static_pad (stream->appqueue[i], "src");
+    pad = gst_element_get_static_pad (stream->appsink[i], "sink");
+    gst_pad_link (queuepad, pad);
+    gst_object_unref (pad);
+    gst_object_unref (queuepad);
+
+    /* For the receiver we create this bit of pipeline for both
+     * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
+     * and it is all funneled into the rtpbin receive pad.
+     *
+     * .--------.     .--------.    .--------.
+     * | udpsrc |     | funnel |    | rtpbin |
+     * |       src->sink      src->sink      |
+     * '--------'     |        |    '--------'
+     * .--------.     |        |
+     * | appsrc |     |        |
+     * |       src->sink       |
+     * '--------'     '--------'
+     */
+    /* make funnel for the RTP/RTCP receivers */
+    stream->funnel[i] = gst_element_factory_make ("funnel", NULL);
+    gst_bin_add (bin, stream->funnel[i]);
+
+    pad = gst_element_get_static_pad (stream->funnel[i], "src");
+    gst_pad_link (pad, stream->recv_sink[i]);
+    gst_object_unref (pad);
+
+    /* add udpsrc */
+    gst_bin_add (bin, stream->udpsrc[i]);
+    /* and link to the funnel */
+    selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
+    pad = gst_element_get_static_pad (stream->udpsrc[i], "src");
+    gst_pad_link (pad, selpad);
+    gst_object_unref (pad);
+    gst_object_unref (selpad);
+
+    /* make and add appsrc */
+    stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
+    gst_bin_add (bin, stream->appsrc[i]);
+    /* and link to the funnel */
+    selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
+    pad = gst_element_get_static_pad (stream->appsrc[i], "src");
+    gst_pad_link (pad, selpad);
+    gst_object_unref (pad);
+    gst_object_unref (selpad);
+
+    /* check if we need to set to a special state */
+    if (state != GST_STATE_NULL) {
+      gst_element_set_state (stream->udpsink[i], state);
+      gst_element_set_state (stream->appsink[i], state);
+      gst_element_set_state (stream->appqueue[i], state);
+      gst_element_set_state (stream->tee[i], state);
+      gst_element_set_state (stream->funnel[i], state);
+      gst_element_set_state (stream->appsrc[i], state);
+    }
+    /* we set and keep these to playing so that they don't cause NO_PREROLL return
+     * values */
+    gst_element_set_state (stream->udpsrc[i], GST_STATE_PLAYING);
+    gst_element_set_locked_state (stream->udpsrc[i], TRUE);
+  }
 
   /* be notified of caps changes */
   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
@@ -769,6 +754,8 @@ no_ports:
 link_failed:
   {
     GST_WARNING ("failed to link stream %d", idx);
+    gst_object_unref (stream->send_rtp_sink);
+    stream->send_rtp_sink = NULL;
     return FALSE;
   }
 }
@@ -779,7 +766,8 @@ link_failed:
  * @bin: a #GstBin
  * @rtpbin: a rtpbin #GstElement
  *
- * Remove the elements of @stream from the bin
+ * Remove the elements of @stream from @bin. @bin must be set
+ * to the NULL state before calling this.
  *
  * Return: %TRUE on success.
  */
@@ -796,22 +784,55 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
   if (!stream->is_joined)
     return TRUE;
 
+  /* all transports must be removed by now */
+  g_return_val_if_fail (stream->transports == NULL, FALSE);
+
   GST_INFO ("stream %p leaving bin", stream);
 
   gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
-
   g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
+  gst_element_release_request_pad (rtpbin, stream->send_rtp_sink);
+  gst_object_unref (stream->send_rtp_sink);
+  stream->send_rtp_sink = NULL;
 
-  /* FIXME not entirely the opposite of join_bin */
   for (i = 0; i < 2; i++) {
+    /* and set udpsrc to NULL now before removing */
+    gst_element_set_locked_state (stream->udpsrc[i], FALSE);
+    gst_element_set_state (stream->udpsrc[i], GST_STATE_NULL);
+
+    /* removing them should also nicely release the request
+     * pads when they finalize */
     gst_bin_remove (bin, stream->udpsrc[i]);
     gst_bin_remove (bin, stream->udpsink[i]);
     gst_bin_remove (bin, stream->appsrc[i]);
     gst_bin_remove (bin, stream->appsink[i]);
     gst_bin_remove (bin, stream->appqueue[i]);
     gst_bin_remove (bin, stream->tee[i]);
-    gst_bin_remove (bin, stream->selector[i]);
+    gst_bin_remove (bin, stream->funnel[i]);
+
+    gst_element_release_request_pad (rtpbin, stream->recv_sink[i]);
+    gst_object_unref (stream->recv_sink[i]);
+    stream->recv_sink[i] = NULL;
+
+    stream->udpsrc[i] = NULL;
+    stream->udpsink[i] = NULL;
+    stream->appsrc[i] = NULL;
+    stream->appsink[i] = NULL;
+    stream->appqueue[i] = NULL;
+    stream->tee[i] = NULL;
+    stream->funnel[i] = NULL;
   }
+  gst_object_unref (stream->send_src[0]);
+  stream->send_src[0] = NULL;
+
+  gst_element_release_request_pad (rtpbin, stream->send_src[1]);
+  gst_object_unref (stream->send_src[1]);
+  stream->send_src[1] = NULL;
+
+  g_object_unref (stream->session);
+  if (stream->caps)
+    gst_caps_unref (stream->caps);
+
   stream->is_joined = FALSE;
 
   return TRUE;
@@ -862,6 +883,10 @@ gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
 {
   GstFlowReturn ret;
 
+  g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
+  g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+  g_return_val_if_fail (stream->is_joined, FALSE);
+
   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
 
   return ret;
@@ -884,6 +909,10 @@ gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
 {
   GstFlowReturn ret;
 
+  g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
+  g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+  g_return_val_if_fail (stream->is_joined, FALSE);
+
   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
 
   return ret;
@@ -969,6 +998,8 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
  * Add the transport in @trans to @stream. The media of @stream will
  * then also be send to the values configured in @trans.
  *
+ * @stream must be joined to a bin.
+ *
  * @trans must contain a valid #GstRTSPTransport.
  *
  * Returns: %TRUE if @trans was added
@@ -979,6 +1010,7 @@ gst_rtsp_stream_add_transport (GstRTSPStream * stream,
 {
   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
+  g_return_val_if_fail (stream->is_joined, FALSE);
   g_return_val_if_fail (trans->transport != NULL, FALSE);
 
   return update_transport (stream, trans, TRUE);
@@ -992,6 +1024,10 @@ gst_rtsp_stream_add_transport (GstRTSPStream * stream,
  * Remove the transport in @trans from @stream. The media of @stream will
  * not be sent to the values configured in @trans.
  *
+ * @stream must be joined to a bin.
+ *
+ * @trans must contain a valid #GstRTSPTransport.
+ *
  * Returns: %TRUE if @trans was removed
  */
 gboolean
@@ -1000,6 +1036,7 @@ gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
 {
   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
+  g_return_val_if_fail (stream->is_joined, FALSE);
   g_return_val_if_fail (trans->transport != NULL, FALSE);
 
   return update_transport (stream, trans, FALSE);
index db923d0..d5fb03d 100644 (file)
@@ -50,21 +50,24 @@ typedef struct _GstRTSPStreamClass GstRTSPStreamClass;
  * @is_ipv6: should this stream be IPv6
  * @buffer_size: the UDP buffer size
  * @is_joined: if the stream is joined in a bin
- * @recv_rtp_sink: sinkpad for RTP buffers
- * @recv_rtcp_sink: sinkpad for RTCP buffers
- * @send_rtp_src: srcpad for RTP buffers
- * @send_rtcp_src: srcpad for RTCP buffers
+ * @send_rtp_sink: sinkpad for sending RTP buffers
+ * @recv_sink: sinkpad for receiving RTP/RTCP buffers
+ * @send_src: srcpad for sending RTP/RTCP buffers
+ * @session: the RTP session object
  * @udpsrc: the udp source elements for RTP/RTCP
  * @udpsink: the udp sink elements for RTP/RTCP
  * @appsrc: the app source elements for RTP/RTCP
+ * @appqueue: the app queue elements for RTP/RTCP
  * @appsink: the app sink elements for RTP/RTCP
+ * @tee: tee for the sending to udpsink and appsink
+ * @funnel: tee for the receiving from udpsrc and appsrc
  * @server_port: the server ports for this stream
  * @caps_sig: the signal id for detecting caps
  * @caps: the caps of the stream
  * @n_active: the number of active transports in @transports
  * @transports: list of #GstStreamTransport being streamed to
  *
- * The definition of a media stream. The streams are identified by @id.
+ * The definition of a media stream. The streams are identified by @idx.
  */
 struct _GstRTSPStream {
   GObject       parent;
@@ -77,11 +80,9 @@ struct _GstRTSPStream {
   gboolean      is_joined;
 
   /* pads on the rtpbin */
-  GstPad       *recv_rtcp_sink;
-  GstPad       *recv_rtp_sink;
   GstPad       *send_rtp_sink;
-  GstPad       *send_rtp_src;
-  GstPad       *send_rtcp_src;
+  GstPad       *recv_sink[2];
+  GstPad       *send_src[2];
 
   /* the RTPSession object */
   GObject      *session;
@@ -96,7 +97,7 @@ struct _GstRTSPStream {
   GstElement   *appsink[2];
 
   GstElement   *tee[2];
-  GstElement   *selector[2];
+  GstElement   *funnel[2];
 
   /* server ports for sending/receiving */
   GstRTSPRange  server_port;
@@ -123,7 +124,8 @@ void              gst_rtsp_stream_set_mtu          (GstRTSPStream * stream, guin
 guint             gst_rtsp_stream_get_mtu          (GstRTSPStream * stream);
 
 gboolean          gst_rtsp_stream_join_bin         (GstRTSPStream * stream,
-                                                    GstBin *bin, GstElement *rtpbin);
+                                                    GstBin *bin, GstElement *rtpbin,
+                                                    GstState state);
 gboolean          gst_rtsp_stream_leave_bin        (GstRTSPStream * stream,
                                                     GstBin *bin, GstElement *rtpbin);