gint i;
guint idx;
gchar *name;
- GstPad *pad, *sinkpad, *selpad;
+ GstPad *pad, *sinkpad = NULL, *selpad;
GstPadLinkReturn ret;
+ gboolean is_tcp = FALSE, is_udp = FALSE;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
GST_INFO ("stream %p joining bin as session %u", stream, idx);
- if (!alloc_ports (stream))
+ is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
+
+ is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
+ (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
+
+ if (is_udp && !alloc_ports (stream))
goto no_ports;
/* update the dscp qos field in the sinks */
* RTP and RTCP. Sync and preroll are enabled on udpsink so
* we need to add a queue before appsink and udpsink to make
* the pipeline not block. For the TCP case, we want to pump
- * data to the client as fast as possible.
+ * client as fast as possible anyway. This pipeline is used
+ * when both TCP and UDP are present.
*
* .--------. .-----. .---------. .---------.
* | rtpbin | | tee | | queue | | udpsink |
* | src->sink src->sink |
* '-----' '---------' '---------'
*
- * When only UDP is allowed, we skip the tee, queue and appsink and link the
- * udpsink directly to the session.
+ * When only UDP or only TCP is allowed, we skip the tee and queue
+ * and link the udpsink (for UDP) or appsink (for TCP) directly to
+ * the session.
*/
/* Only link the RTP send src if we're going to send RTP, link
* the RTCP send src always */
if (priv->srcpad || i == 1) {
- /* add udpsink */
- gst_bin_add (bin, priv->udpsink[i]);
- sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
+ if (is_udp) {
+ /* add udpsink */
+ gst_bin_add (bin, priv->udpsink[i]);
+ sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
+ }
+
+ if (is_tcp) {
+ /* make appsink */
+ priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
+ g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
+ gst_bin_add (bin, priv->appsink[i]);
+ gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
+ &sink_cb, stream, NULL);
+ }
+
+ if (is_udp && is_tcp) {
+ g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
- if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
/* make tee for RTP/RTCP */
priv->tee[i] = gst_element_factory_make ("tee", NULL);
gst_bin_add (bin, priv->tee[i]);
queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
gst_pad_link (queuepad, sinkpad);
gst_object_unref (queuepad);
+ gst_object_unref (sinkpad);
- /* make queue */
+ /* make appqueue */
priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
g_object_set (priv->appqueue[i], "max-size-buffers",
1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
NULL);
gst_bin_add (bin, priv->appqueue[i]);
- /* and link to tee */
+ /* and link tee to appqueue */
teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
gst_pad_link (teepad, pad);
gst_object_unref (pad);
gst_object_unref (teepad);
- /* make appsink */
- priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
- g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
- g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
- gst_bin_add (bin, priv->appsink[i]);
- gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
- &sink_cb, stream, NULL);
- /* and link to queue */
+ /* and link appqueue to appsink */
queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
pad = gst_element_get_static_pad (priv->appsink[i], "sink");
gst_pad_link (queuepad, pad);
gst_object_unref (pad);
gst_object_unref (queuepad);
+ } else if (is_tcp) {
+ /* only appsink needed, link it to the session */
+ pad = gst_element_get_static_pad (priv->appsink[i], "sink");
+ gst_pad_link (priv->send_src[i], pad);
+ gst_object_unref (pad);
+
+ /* when its only TCP, we need to set sync and preroll to FALSE
+ * for the sink to avoid deadlock. And this is only needed for
+ * sink used for RTCP data, not the RTP data. */
+ if (i == 1)
+ g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
} else {
/* else only udpsink needed, link it to the session */
gst_pad_link (priv->send_src[i], sinkpad);
+ gst_object_unref (sinkpad);
}
- gst_object_unref (sinkpad);
}
/* Only connect recv RTP sink if we expect to receive RTP. Connect recv
gst_object_unref (selpad);
}
- if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
+ if (is_tcp) {
/* make and add appsrc */
priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
priv->appsrc_base_time[i] = -1;
GstRTSPStreamPrivate *priv;
gint i;
GList *l;
+ gboolean is_tcp, is_udp;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
priv->recv_rtp_src = NULL;
}
+ is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
+
+ is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
+ (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
+
+
for (i = 0; i < 2; i++) {
if (priv->udpsink[i])
gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
if (priv->appsrc[i])
gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
- if (priv->udpsrc_v4[i] && (priv->sinkpad || i == 1)) {
- /* and set udpsrc to NULL now before removing */
- gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
- gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
- /* removing them should also nicely release the request
- * pads when they finalize */
- gst_bin_remove (bin, priv->udpsrc_v4[i]);
+
+ if (priv->udpsrc_v4[i]) {
+ if (priv->sinkpad || i == 1) {
+ /* and set udpsrc to NULL now before removing */
+ gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
+ gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
+ /* removing them should also nicely release the request
+ * pads when they finalize */
+ gst_bin_remove (bin, priv->udpsrc_v4[i]);
+ } else {
+ /* we need to set the state to NULL before unref */
+ gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
+ gst_object_unref (priv->udpsrc_v4[i]);
+ }
}
- if (priv->udpsrc_v6[i] && (priv->sinkpad || i == 1)) {
- gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
- gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
- gst_bin_remove (bin, priv->udpsrc_v6[i]);
+
+ if (priv->udpsrc_v6[i]) {
+ if (priv->sinkpad || i == 1) {
+ gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
+ gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
+ gst_bin_remove (bin, priv->udpsrc_v6[i]);
+ } else {
+ gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
+ gst_object_unref (priv->udpsrc_v6[i]);
+ }
}
for (l = priv->transport_sources; l; l = l->next) {
gst_bin_remove (bin, s->udpsrc[i]);
}
- if (priv->udpsink[i] && (priv->srcpad || i == 1))
+ if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
gst_bin_remove (bin, priv->udpsink[i]);
if (priv->appsrc[i] && (priv->sinkpad || i == 1))
gst_bin_remove (bin, priv->appsrc[i]);
- if (priv->appsink[i] && (priv->srcpad || i == 1))
+ if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
gst_bin_remove (bin, priv->appsink[i]);
- if (priv->appqueue[i] && (priv->srcpad || i == 1))
+ if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
gst_bin_remove (bin, priv->appqueue[i]);
- if (priv->udpqueue[i] && (priv->srcpad || i == 1))
+ if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
gst_bin_remove (bin, priv->udpqueue[i]);
- if (priv->tee[i] && (priv->srcpad || i == 1))
+ if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
gst_bin_remove (bin, priv->tee[i]);
if (priv->funnel[i] && (priv->sinkpad || i == 1))
gst_bin_remove (bin, priv->funnel[i]);
priv = stream->priv;
g_mutex_lock (&priv->lock);
- if ((sink = priv->udpsink[0]))
+ /* depending on the transport type, it should query corresponding sink */
+ if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
+ (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
+ sink = priv->udpsink[0];
+ else
+ sink = priv->appsink[0];
+
+ if (sink)
gst_object_ref (sink);
g_mutex_unlock (&priv->lock);
priv = stream->priv;
g_mutex_lock (&priv->lock);
- if ((sink = priv->udpsink[0]))
+ /* depending on the transport type, it should query corresponding sink */
+ if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
+ (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
+ sink = priv->udpsink[0];
+ else
+ sink = priv->appsink[0];
+
+ if (sink)
gst_object_ref (sink);
g_mutex_unlock (&priv->lock);