stream = GST_RTSP_STREAM (obj);
- g_assert (!stream->is_joined);
+ /* we really need to be unjoined now */
+ g_return_if_fail (!stream->is_joined);
gst_object_unref (stream->payloader);
gst_object_unref (stream->srcpad);
- if (stream->session)
- g_object_unref (stream->session);
-
- if (stream->caps)
- gst_caps_unref (stream->caps);
-
- if (stream->send_rtp_sink)
- gst_object_unref (stream->send_rtp_sink);
- if (stream->send_rtp_src)
- gst_object_unref (stream->send_rtp_src);
- if (stream->send_rtcp_src)
- gst_object_unref (stream->send_rtcp_src);
- if (stream->recv_rtcp_sink)
- gst_object_unref (stream->recv_rtcp_sink);
- if (stream->recv_rtp_sink)
- gst_object_unref (stream->recv_rtp_sink);
-
g_list_free (stream->transports);
G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
* @stream: a #GstRTSPStream
* @bin: a #GstBin to join
* @rtpbin: a rtpbin element in @bin
+ * @state: the target state of the new elements
*
* Join the #Gstbin @bin that contains the element @rtpbin.
*
- * @stream will link to @rtpbin, which must be inside @bin.
+ * @stream will link to @rtpbin, which must be inside @bin. The elements
+ * added to @bin will be set to the state given in @state.
*
* Returns: %TRUE on success.
*/
gboolean
gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
- GstElement * rtpbin)
+ GstElement * rtpbin, GstState state)
{
gint i, idx;
gchar *name;
g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
- idx = stream->idx;
-
if (stream->is_joined)
return TRUE;
- GST_INFO ("stream %p joining bin", stream);
+ /* create a session with the same index as the stream */
+ idx = stream->idx;
+
+ GST_INFO ("stream %p joining bin as session %d", stream, idx);
if (!alloc_ports (stream))
goto no_ports;
- /* add the ports to the pipeline */
- for (i = 0; i < 2; i++) {
- gst_bin_add (bin, stream->udpsink[i]);
- gst_bin_add (bin, stream->udpsrc[i]);
- }
-
- /* create elements for the TCP transfer */
- for (i = 0; i < 2; i++) {
- stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
- stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
- stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
- g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
- g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
- gst_bin_add (bin, stream->appqueue[i]);
- gst_bin_add (bin, stream->appsink[i]);
- gst_bin_add (bin, stream->appsrc[i]);
- gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
- &sink_cb, stream, NULL);
- }
-
- /* hook up the stream to the RTP session elements. */
+ /* get a pad for sending RTP */
name = g_strdup_printf ("send_rtp_sink_%u", idx);
stream->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
g_free (name);
+ /* link the RTP pad to the session manager, it should not really fail unless
+ * this is not really an RTP pad */
+ ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
+ if (ret != GST_PAD_LINK_OK)
+ goto link_failed;
+
+ /* get pads from the RTP session element for sending and receiving
+ * RTP/RTCP*/
name = g_strdup_printf ("send_rtp_src_%u", idx);
- stream->send_rtp_src = gst_element_get_static_pad (rtpbin, name);
+ stream->send_src[0] = gst_element_get_static_pad (rtpbin, name);
g_free (name);
name = g_strdup_printf ("send_rtcp_src_%u", idx);
- stream->send_rtcp_src = gst_element_get_request_pad (rtpbin, name);
- g_free (name);
- name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
- stream->recv_rtcp_sink = gst_element_get_request_pad (rtpbin, name);
+ stream->send_src[1] = gst_element_get_request_pad (rtpbin, name);
g_free (name);
name = g_strdup_printf ("recv_rtp_sink_%u", idx);
- stream->recv_rtp_sink = gst_element_get_request_pad (rtpbin, name);
+ stream->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
+ g_free (name);
+ name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
+ stream->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
g_free (name);
+
/* get the session */
g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &stream->session);
g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
stream);
- /* link the RTP pad to the session manager */
- ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
- if (ret != GST_PAD_LINK_OK)
- goto link_failed;
+ for (i = 0; i < 2; i++) {
+ /* For the sender we create this bit of pipeline for both
+ * RTP and RTCP. Sync and preroll are enabled on udpsink so
+ * we need to add a queue before appsink to make the pipeline
+ * not block. For the TCP case, we want to pump data to the
+ * client as fast as possible anyway.
+ *
+ * .--------. .-----. .---------.
+ * | rtpbin | | tee | | udpsink |
+ * | send->sink src->sink |
+ * '--------' | | '---------'
+ * | | .---------. .---------.
+ * | | | queue | | appsink |
+ * | src->sink src->sink |
+ * '-----' '---------' '---------'
+ */
+ /* make tee for RTP/RTCP */
+ stream->tee[i] = gst_element_factory_make ("tee", NULL);
+ gst_bin_add (bin, stream->tee[i]);
+
+ /* and link to rtpbin send pad */
+ pad = gst_element_get_static_pad (stream->tee[i], "sink");
+ gst_pad_link (stream->send_src[i], pad);
+ gst_object_unref (pad);
+
+ /* add udpsink */
+ gst_bin_add (bin, stream->udpsink[i]);
- /* make tee for RTP and link to stream */
- stream->tee[0] = gst_element_factory_make ("tee", NULL);
- gst_bin_add (bin, stream->tee[0]);
-
- pad = gst_element_get_static_pad (stream->tee[0], "sink");
- gst_pad_link (stream->send_rtp_src, pad);
- gst_object_unref (pad);
-
- /* link RTP sink, we're pretty sure this will work. */
- teepad = gst_element_get_request_pad (stream->tee[0], "src_%u");
- pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
- gst_pad_link (teepad, pad);
- gst_object_unref (pad);
- gst_object_unref (teepad);
-
- teepad = gst_element_get_request_pad (stream->tee[0], "src_%u");
- pad = gst_element_get_static_pad (stream->appqueue[0], "sink");
- gst_pad_link (teepad, pad);
- gst_object_unref (pad);
- gst_object_unref (teepad);
-
- queuepad = gst_element_get_static_pad (stream->appqueue[0], "src");
- pad = gst_element_get_static_pad (stream->appsink[0], "sink");
- gst_pad_link (queuepad, pad);
- gst_object_unref (pad);
- gst_object_unref (queuepad);
-
- /* make tee for RTCP */
- stream->tee[1] = gst_element_factory_make ("tee", NULL);
- gst_bin_add (bin, stream->tee[1]);
-
- pad = gst_element_get_static_pad (stream->tee[1], "sink");
- gst_pad_link (stream->send_rtcp_src, pad);
- gst_object_unref (pad);
-
- /* link RTCP elements */
- teepad = gst_element_get_request_pad (stream->tee[1], "src_%u");
- pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
- gst_pad_link (teepad, pad);
- gst_object_unref (pad);
- gst_object_unref (teepad);
-
- teepad = gst_element_get_request_pad (stream->tee[1], "src_%u");
- pad = gst_element_get_static_pad (stream->appqueue[1], "sink");
- gst_pad_link (teepad, pad);
- gst_object_unref (pad);
- gst_object_unref (teepad);
-
- queuepad = gst_element_get_static_pad (stream->appqueue[1], "src");
- pad = gst_element_get_static_pad (stream->appsink[1], "sink");
- gst_pad_link (queuepad, pad);
- gst_object_unref (pad);
- gst_object_unref (queuepad);
-
- /* make selector for the RTP receivers */
- stream->selector[0] = gst_element_factory_make ("funnel", NULL);
- gst_bin_add (bin, stream->selector[0]);
-
- pad = gst_element_get_static_pad (stream->selector[0], "src");
- gst_pad_link (pad, stream->recv_rtp_sink);
- gst_object_unref (pad);
-
- selpad = gst_element_get_request_pad (stream->selector[0], "sink_%u");
- pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
- gst_pad_link (pad, selpad);
- gst_object_unref (pad);
- gst_object_unref (selpad);
- selpad = gst_element_get_request_pad (stream->selector[0], "sink_%u");
- pad = gst_element_get_static_pad (stream->appsrc[0], "src");
- gst_pad_link (pad, selpad);
- gst_object_unref (pad);
- gst_object_unref (selpad);
-
- /* make selector for the RTCP receivers */
- stream->selector[1] = gst_element_factory_make ("funnel", NULL);
- gst_bin_add (bin, stream->selector[1]);
-
- pad = gst_element_get_static_pad (stream->selector[1], "src");
- gst_pad_link (pad, stream->recv_rtcp_sink);
- gst_object_unref (pad);
-
- selpad = gst_element_get_request_pad (stream->selector[1], "sink_%u");
- pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
- gst_pad_link (pad, selpad);
- gst_object_unref (pad);
- gst_object_unref (selpad);
-
- selpad = gst_element_get_request_pad (stream->selector[1], "sink_%u");
- pad = gst_element_get_static_pad (stream->appsrc[1], "src");
- gst_pad_link (pad, selpad);
- gst_object_unref (pad);
- gst_object_unref (selpad);
-
- /* we set and keep these to playing so that they don't cause NO_PREROLL return
- * values */
- gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
- gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
- gst_element_set_locked_state (stream->udpsrc[0], TRUE);
- gst_element_set_locked_state (stream->udpsrc[1], TRUE);
+ /* link tee to udpsink */
+ teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
+ pad = gst_element_get_static_pad (stream->udpsink[i], "sink");
+ gst_pad_link (teepad, pad);
+ gst_object_unref (pad);
+ gst_object_unref (teepad);
+
+ /* make queue */
+ stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
+ gst_bin_add (bin, stream->appqueue[i]);
+ /* and link to tee */
+ teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
+ pad = gst_element_get_static_pad (stream->appqueue[i], "sink");
+ gst_pad_link (teepad, pad);
+ gst_object_unref (pad);
+ gst_object_unref (teepad);
+
+ /* make appsink */
+ stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
+ g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
+ g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
+ gst_bin_add (bin, stream->appsink[i]);
+ gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
+ &sink_cb, stream, NULL);
+ /* and link to queue */
+ queuepad = gst_element_get_static_pad (stream->appqueue[i], "src");
+ pad = gst_element_get_static_pad (stream->appsink[i], "sink");
+ gst_pad_link (queuepad, pad);
+ gst_object_unref (pad);
+ gst_object_unref (queuepad);
+
+ /* For the receiver we create this bit of pipeline for both
+ * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
+ * and it is all funneled into the rtpbin receive pad.
+ *
+ * .--------. .--------. .--------.
+ * | udpsrc | | funnel | | rtpbin |
+ * | src->sink src->sink |
+ * '--------' | | '--------'
+ * .--------. | |
+ * | appsrc | | |
+ * | src->sink |
+ * '--------' '--------'
+ */
+ /* make funnel for the RTP/RTCP receivers */
+ stream->funnel[i] = gst_element_factory_make ("funnel", NULL);
+ gst_bin_add (bin, stream->funnel[i]);
+
+ pad = gst_element_get_static_pad (stream->funnel[i], "src");
+ gst_pad_link (pad, stream->recv_sink[i]);
+ gst_object_unref (pad);
+
+ /* add udpsrc */
+ gst_bin_add (bin, stream->udpsrc[i]);
+ /* and link to the funnel */
+ selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
+ pad = gst_element_get_static_pad (stream->udpsrc[i], "src");
+ gst_pad_link (pad, selpad);
+ gst_object_unref (pad);
+ gst_object_unref (selpad);
+
+ /* make and add appsrc */
+ stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
+ gst_bin_add (bin, stream->appsrc[i]);
+ /* and link to the funnel */
+ selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
+ pad = gst_element_get_static_pad (stream->appsrc[i], "src");
+ gst_pad_link (pad, selpad);
+ gst_object_unref (pad);
+ gst_object_unref (selpad);
+
+ /* check if we need to set to a special state */
+ if (state != GST_STATE_NULL) {
+ gst_element_set_state (stream->udpsink[i], state);
+ gst_element_set_state (stream->appsink[i], state);
+ gst_element_set_state (stream->appqueue[i], state);
+ gst_element_set_state (stream->tee[i], state);
+ gst_element_set_state (stream->funnel[i], state);
+ gst_element_set_state (stream->appsrc[i], state);
+ }
+ /* we set and keep these to playing so that they don't cause NO_PREROLL return
+ * values */
+ gst_element_set_state (stream->udpsrc[i], GST_STATE_PLAYING);
+ gst_element_set_locked_state (stream->udpsrc[i], TRUE);
+ }
/* be notified of caps changes */
stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
link_failed:
{
GST_WARNING ("failed to link stream %d", idx);
+ gst_object_unref (stream->send_rtp_sink);
+ stream->send_rtp_sink = NULL;
return FALSE;
}
}
* @bin: a #GstBin
* @rtpbin: a rtpbin #GstElement
*
- * Remove the elements of @stream from the bin
+ * Remove the elements of @stream from @bin. @bin must be set
+ * to the NULL state before calling this.
*
* Return: %TRUE on success.
*/
if (!stream->is_joined)
return TRUE;
+ /* all transports must be removed by now */
+ g_return_val_if_fail (stream->transports == NULL, FALSE);
+
GST_INFO ("stream %p leaving bin", stream);
gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
-
g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
+ gst_element_release_request_pad (rtpbin, stream->send_rtp_sink);
+ gst_object_unref (stream->send_rtp_sink);
+ stream->send_rtp_sink = NULL;
- /* FIXME not entirely the opposite of join_bin */
for (i = 0; i < 2; i++) {
+ /* and set udpsrc to NULL now before removing */
+ gst_element_set_locked_state (stream->udpsrc[i], FALSE);
+ gst_element_set_state (stream->udpsrc[i], GST_STATE_NULL);
+
+ /* removing them should also nicely release the request
+ * pads when they finalize */
gst_bin_remove (bin, stream->udpsrc[i]);
gst_bin_remove (bin, stream->udpsink[i]);
gst_bin_remove (bin, stream->appsrc[i]);
gst_bin_remove (bin, stream->appsink[i]);
gst_bin_remove (bin, stream->appqueue[i]);
gst_bin_remove (bin, stream->tee[i]);
- gst_bin_remove (bin, stream->selector[i]);
+ gst_bin_remove (bin, stream->funnel[i]);
+
+ gst_element_release_request_pad (rtpbin, stream->recv_sink[i]);
+ gst_object_unref (stream->recv_sink[i]);
+ stream->recv_sink[i] = NULL;
+
+ stream->udpsrc[i] = NULL;
+ stream->udpsink[i] = NULL;
+ stream->appsrc[i] = NULL;
+ stream->appsink[i] = NULL;
+ stream->appqueue[i] = NULL;
+ stream->tee[i] = NULL;
+ stream->funnel[i] = NULL;
}
+ gst_object_unref (stream->send_src[0]);
+ stream->send_src[0] = NULL;
+
+ gst_element_release_request_pad (rtpbin, stream->send_src[1]);
+ gst_object_unref (stream->send_src[1]);
+ stream->send_src[1] = NULL;
+
+ g_object_unref (stream->session);
+ if (stream->caps)
+ gst_caps_unref (stream->caps);
+
stream->is_joined = FALSE;
return TRUE;
{
GstFlowReturn ret;
+ g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
+ g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+ g_return_val_if_fail (stream->is_joined, FALSE);
+
ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
return ret;
{
GstFlowReturn ret;
+ g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
+ g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+ g_return_val_if_fail (stream->is_joined, FALSE);
+
ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
return ret;
* Add the transport in @trans to @stream. The media of @stream will
* then also be send to the values configured in @trans.
*
+ * @stream must be joined to a bin.
+ *
* @trans must contain a valid #GstRTSPTransport.
*
* Returns: %TRUE if @trans was added
{
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
+ g_return_val_if_fail (stream->is_joined, FALSE);
g_return_val_if_fail (trans->transport != NULL, FALSE);
return update_transport (stream, trans, TRUE);
* Remove the transport in @trans from @stream. The media of @stream will
* not be sent to the values configured in @trans.
*
+ * @stream must be joined to a bin.
+ *
+ * @trans must contain a valid #GstRTSPTransport.
+ *
* Returns: %TRUE if @trans was removed
*/
gboolean
{
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
+ g_return_val_if_fail (stream->is_joined, FALSE);
g_return_val_if_fail (trans->transport != NULL, FALSE);
return update_transport (stream, trans, FALSE);