+ if (priv->idx == session) {
+ caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
+ if (caps) {
+ GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
+ gst_caps_ref (caps);
+ } else {
+ GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
+ }
+ }
+
+ g_mutex_unlock (&priv->lock);
+
+ return caps;
+}
+
+static void
+pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
+{
+ GstRTSPStreamPrivate *priv = stream->priv;
+ gchar *name;
+ GstPadLinkReturn ret;
+ guint sessid;
+
+ GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
+ GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
+
+ name = gst_pad_get_name (pad);
+ if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
+ g_free (name);
+ return;
+ }
+ g_free (name);
+
+ if (priv->idx != sessid)
+ return;
+
+ if (gst_pad_is_linked (priv->sinkpad)) {
+ GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
+ GST_DEBUG_PAD_NAME (priv->sinkpad));
+ return;
+ }
+
+ /* link the RTP pad to the session manager, it should not really fail unless
+ * this is not really an RTP pad */
+ ret = gst_pad_link (pad, priv->sinkpad);
+ if (ret != GST_PAD_LINK_OK)
+ goto link_failed;
+ priv->recv_rtp_src = gst_object_ref (pad);
+
+ return;
+
+/* ERRORS */
+link_failed:
+ {
+ GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
+ GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
+ }
+}
+
+static void
+on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
+ GstRTSPStream * stream)
+{
+ /* TODO: What to do here other than this? */
+ GST_DEBUG ("Stream %p: Got EOS", stream);
+ gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
+}
+
+static void
+plug_sink (GstBin * bin, GstElement * tee, GstElement * sink,
+ GstElement ** queue_out)
+{
+ GstPad *pad;
+ GstPad *teepad;
+ GstPad *queuepad;
+
+ gst_bin_add (bin, sink);
+
+ *queue_out = gst_element_factory_make ("queue", NULL);
+ g_object_set (*queue_out, "max-size-buffers", 1, "max-size-bytes", 0,
+ "max-size-time", G_GINT64_CONSTANT (0), NULL);
+ gst_bin_add (bin, *queue_out);
+
+ /* link tee to queue */
+ teepad = gst_element_get_request_pad (tee, "src_%u");
+ pad = gst_element_get_static_pad (*queue_out, "sink");
+ gst_pad_link (teepad, pad);
+ gst_object_unref (pad);
+ gst_object_unref (teepad);
+
+ /* link queue to sink */
+ queuepad = gst_element_get_static_pad (*queue_out, "src");
+ pad = gst_element_get_static_pad (sink, "sink");
+ gst_pad_link (queuepad, pad);
+ gst_object_unref (queuepad);
+ gst_object_unref (pad);
+}
+
+/* must be called with lock */
+static void
+create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
+{
+ GstRTSPStreamPrivate *priv;
+ GstPad *pad;
+ gboolean is_tcp, is_udp;
+ gint i;
+
+ priv = stream->priv;
+
+ is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
+ is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
+ (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
+
+ for (i = 0; i < 2; i++) {
+ /* For the sender we create this bit of pipeline for both
+ * RTP and RTCP. Sync and preroll are enabled on udpsink so
+ * we need to add a queue before appsink and udpsink to make
+ * the pipeline not block. For the TCP case, we want to pump
+ * client as fast as possible anyway. This pipeline is used
+ * when both TCP and UDP are present.
+ *
+ * .--------. .-----. .---------. .---------.
+ * | rtpbin | | tee | | queue | | udpsink |
+ * | send->sink src->sink src->sink |
+ * '--------' | | '---------' '---------'
+ * | | .---------. .---------.
+ * | | | queue | | appsink |
+ * | src->sink src->sink |
+ * '-----' '---------' '---------'
+ *
+ * When only UDP or only TCP is allowed, we skip the tee and queue
+ * and link the udpsink (for UDP) or appsink (for TCP) directly to
+ * the session.
+ */
+
+ /* Only link the RTP send src if we're going to send RTP, link
+ * the RTCP send src always */
+ if (!priv->srcpad && i == 0)
+ continue;
+
+ if (is_tcp) {
+ /* make appsink */
+ priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
+ g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
+ gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
+ &sink_cb, stream, NULL);
+ }
+
+ /* If we have udp always use a tee because we could have mcast clients
+ * requesting different ports, in which case we'll have to plug more
+ * udpsinks. */
+ if (is_udp) {
+ /* make tee for RTP/RTCP */
+ priv->tee[i] = gst_element_factory_make ("tee", NULL);
+ gst_bin_add (bin, priv->tee[i]);
+
+ /* and link to rtpbin send pad */
+ pad = gst_element_get_static_pad (priv->tee[i], "sink");
+ gst_pad_link (priv->send_src[i], pad);
+ gst_object_unref (pad);
+
+ if (priv->udpsink[i])
+ plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]);
+
+ if (priv->mcast_udpsink[i])
+ plug_sink (bin, priv->tee[i], priv->mcast_udpsink[i],
+ &priv->mcast_udpqueue[i]);
+
+ if (is_tcp) {
+ g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
+ plug_sink (bin, priv->tee[i], priv->appsink[i], &priv->appqueue[i]);
+ }
+ } else if (is_tcp) {
+ /* only appsink needed, link it to the session */
+ pad = gst_element_get_static_pad (priv->appsink[i], "sink");
+ gst_pad_link (priv->send_src[i], pad);
+ gst_object_unref (pad);
+
+ /* when its only TCP, we need to set sync and preroll to FALSE
+ * for the sink to avoid deadlock. And this is only needed for
+ * sink used for RTCP data, not the RTP data. */
+ if (i == 1)
+ g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
+ }
+
+ /* check if we need to set to a special state */
+ if (state != GST_STATE_NULL) {
+ if (priv->udpsink[i])
+ gst_element_set_state (priv->udpsink[i], state);
+ if (priv->mcast_udpsink[i])
+ gst_element_set_state (priv->mcast_udpsink[i], state);
+ if (priv->appsink[i])
+ gst_element_set_state (priv->appsink[i], state);
+ if (priv->appqueue[i])
+ gst_element_set_state (priv->appqueue[i], state);
+ if (priv->udpqueue[i])
+ gst_element_set_state (priv->udpqueue[i], state);
+ if (priv->mcast_udpqueue[i])
+ gst_element_set_state (priv->mcast_udpqueue[i], state);
+ if (priv->tee[i])
+ gst_element_set_state (priv->tee[i], state);
+ }
+ }
+}
+
+/* must be called with lock */
+static void
+plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src,
+ GstElement * funnel)
+{
+ GstRTSPStreamPrivate *priv;
+ GstPad *pad, *selpad;
+
+ priv = stream->priv;
+
+ if (priv->srcpad) {
+ /* we set and keep these to playing so that they don't cause NO_PREROLL return
+ * values. This is only relevant for PLAY pipelines */
+ gst_element_set_state (src, GST_STATE_PLAYING);
+ gst_element_set_locked_state (src, TRUE);
+ }
+
+ /* add src */
+ gst_bin_add (bin, src);
+
+ /* and link to the funnel */
+ selpad = gst_element_get_request_pad (funnel, "sink_%u");
+ pad = gst_element_get_static_pad (src, "src");
+ gst_pad_link (pad, selpad);
+ gst_object_unref (pad);
+ gst_object_unref (selpad);
+}
+
+/* must be called with lock */
+static void
+create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
+{
+ GstRTSPStreamPrivate *priv;
+ GstPad *pad;
+ gboolean is_tcp;
+ gint i;
+
+ priv = stream->priv;
+
+ is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
+
+ for (i = 0; i < 2; i++) {
+ /* For the receiver we create this bit of pipeline for both
+ * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
+ * and it is all funneled into the rtpbin receive pad.
+ *
+ * .--------. .--------. .--------.
+ * | udpsrc | | funnel | | rtpbin |
+ * | src->sink src->sink |
+ * '--------' | | '--------'
+ * .--------. | |
+ * | appsrc | | |
+ * | src->sink |
+ * '--------' '--------'
+ */
+
+ if (!priv->sinkpad && i == 0) {
+ /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
+ * RTCP sink always */
+ continue;
+ }
+
+ /* make funnel for the RTP/RTCP receivers */
+ priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
+ gst_bin_add (bin, priv->funnel[i]);
+
+ pad = gst_element_get_static_pad (priv->funnel[i], "src");
+ gst_pad_link (pad, priv->recv_sink[i]);
+ gst_object_unref (pad);
+
+ if (priv->udpsrc_v4[i])
+ plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]);
+
+ if (priv->udpsrc_v6[i])
+ plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
+
+ if (priv->mcast_udpsrc_v4[i])
+ plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
+
+ if (priv->mcast_udpsrc_v6[i])
+ plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
+
+ if (is_tcp) {
+ /* make and add appsrc */
+ priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
+ priv->appsrc_base_time[i] = -1;
+ g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
+ TRUE, NULL);
+ plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]);
+ }
+
+ /* check if we need to set to a special state */
+ if (state != GST_STATE_NULL) {
+ gst_element_set_state (priv->funnel[i], state);