-/**
- * gst_rtsp_media_stream_rtp:
- * @stream: a #GstRTSPMediaStream
- * @buffer: a #GstBuffer
- *
- * Handle an RTP buffer for the stream. This method is usually called when a
- * message has been received from a client using the TCP transport.
- *
- * This function takes ownership of @buffer.
- *
- * Returns: a GstFlowReturn.
- */
-GstFlowReturn
-gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
-{
- GstFlowReturn ret;
-
- ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
-
- return ret;
-}
-
-/**
- * gst_rtsp_media_stream_rtcp:
- * @stream: a #GstRTSPMediaStream
- * @buffer: a #GstBuffer
- *
- * Handle an RTCP buffer for the stream. This method is usually called when a
- * message has been received from a client using the TCP transport.
- *
- * This function takes ownership of @buffer.
- *
- * Returns: a GstFlowReturn.
- */
-GstFlowReturn
-gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
-{
- GstFlowReturn ret;
-
- ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
-
- return ret;
-}
-
-/* Allocate the udp ports and sockets */
-static gboolean
-alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream)
-{
- GstStateChangeReturn ret;
- GstElement *udpsrc0, *udpsrc1;
- GstElement *udpsink0, *udpsink1;
- gint tmp_rtp, tmp_rtcp;
- guint count;
- gint rtpport, rtcpport, sockfd;
- const gchar *host;
-
- udpsrc0 = NULL;
- udpsrc1 = NULL;
- udpsink0 = NULL;
- udpsink1 = NULL;
- count = 0;
-
- /* Start with random port */
- tmp_rtp = 0;
-
- if (media->is_ipv6)
- host = "udp://[::0]";
- else
- host = "udp://0.0.0.0";
-
- /* try to allocate 2 UDP ports, the RTP port should be an even
- * number and the RTCP port should be the next (uneven) port */
-again:
- udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
- if (udpsrc0 == NULL)
- goto no_udp_protocol;
- g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
-
- ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
- if (ret == GST_STATE_CHANGE_FAILURE) {
- if (tmp_rtp != 0) {
- tmp_rtp += 2;
- if (++count > 20)
- goto no_ports;
-
- gst_element_set_state (udpsrc0, GST_STATE_NULL);
- gst_object_unref (udpsrc0);
-
- goto again;
- }
- goto no_udp_protocol;
- }
-
- g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
-
- /* check if port is even */
- if ((tmp_rtp & 1) != 0) {
- /* port not even, close and allocate another */
- if (++count > 20)
- goto no_ports;
-
- gst_element_set_state (udpsrc0, GST_STATE_NULL);
- gst_object_unref (udpsrc0);
-
- tmp_rtp++;
- goto again;
- }
-
- /* allocate port+1 for RTCP now */
- udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
- if (udpsrc1 == NULL)
- goto no_udp_rtcp_protocol;
-
- /* set port */
- tmp_rtcp = tmp_rtp + 1;
- g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
-
- ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
- /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
- if (ret == GST_STATE_CHANGE_FAILURE) {
-
- if (++count > 20)
- goto no_ports;
-
- gst_element_set_state (udpsrc0, GST_STATE_NULL);
- gst_object_unref (udpsrc0);
-
- gst_element_set_state (udpsrc1, GST_STATE_NULL);
- gst_object_unref (udpsrc1);
-
- tmp_rtp += 2;
- goto again;
- }
-
- /* all fine, do port check */
- g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
- g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
-
- /* this should not happen... */
- if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
- goto port_error;
-
- udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
- if (!udpsink0)
- goto no_udp_protocol;
-
- g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
- g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
- g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
-
- udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
- if (!udpsink1)
- goto no_udp_protocol;
-
- if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
- "send-duplicates")) {
- g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
- g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
- } else {
- g_warning
- ("old multiudpsink version found without send-duplicates property");
- }
-
- if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
- "buffer-size")) {
- g_object_set (G_OBJECT (udpsink0), "buffer-size", media->buffer_size, NULL);
- } else {
- GST_WARNING ("multiudpsink version found without buffer-size property");
- }
-
- g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
- g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
- g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
- g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
- g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
-
- g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
- g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
- g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
- g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
-
- /* we keep these elements, we configure all in configure_transport when the
- * server told us to really use the UDP ports. */
- stream->udpsrc[0] = udpsrc0;
- stream->udpsrc[1] = udpsrc1;
- stream->udpsink[0] = udpsink0;
- stream->udpsink[1] = udpsink1;
- stream->server_port.min = rtpport;
- stream->server_port.max = rtcpport;
-
- return TRUE;
-
- /* ERRORS */
-no_udp_protocol:
- {
- goto cleanup;
- }
-no_ports:
- {
- goto cleanup;
- }
-no_udp_rtcp_protocol:
- {
- goto cleanup;
- }
-port_error:
- {
- goto cleanup;
- }
-cleanup:
- {
- if (udpsrc0) {
- gst_element_set_state (udpsrc0, GST_STATE_NULL);
- gst_object_unref (udpsrc0);
- }
- if (udpsrc1) {
- gst_element_set_state (udpsrc1, GST_STATE_NULL);
- gst_object_unref (udpsrc1);
- }
- if (udpsink0) {
- gst_element_set_state (udpsink0, GST_STATE_NULL);
- gst_object_unref (udpsink0);
- }
- if (udpsink1) {
- gst_element_set_state (udpsink1, GST_STATE_NULL);
- gst_object_unref (udpsink1);
- }
- return FALSE;
- }
-}
-
-/* executed from streaming thread */
-static void
-caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
-{
- gchar *capsstr;
- GstCaps *newcaps, *oldcaps;
-
- if ((newcaps = GST_PAD_CAPS (pad)))
- gst_caps_ref (newcaps);
-
- oldcaps = stream->caps;
- stream->caps = newcaps;
-
- if (oldcaps)
- gst_caps_unref (oldcaps);
-
- capsstr = gst_caps_to_string (newcaps);
- GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
- g_free (capsstr);
-}
-
-static void
-dump_structure (const GstStructure * s)
-{
- gchar *sstr;
-
- sstr = gst_structure_to_string (s);
- GST_INFO ("structure: %s", sstr);
- g_free (sstr);
-}
-
-static GstRTSPMediaTrans *
-find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
-{
- GList *walk;
- GstRTSPMediaTrans *result = NULL;
- const gchar *tmp;
- gchar *dest;
- guint port;
-
- if (rtcp_from == NULL)
- return NULL;
-
- tmp = g_strrstr (rtcp_from, ":");
- if (tmp == NULL)
- return NULL;
-
- port = atoi (tmp + 1);
- dest = g_strndup (rtcp_from, tmp - rtcp_from);
-
- GST_INFO ("finding %s:%d", dest, port);
-
- for (walk = stream->transports; walk; walk = g_list_next (walk)) {
- GstRTSPMediaTrans *trans = walk->data;
- gint min, max;
-
- min = trans->transport->client_port.min;
- max = trans->transport->client_port.max;
-
- if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
- || max == port)) {
- result = trans;
- break;
- }
- }
- g_free (dest);
-
- return result;
-}
-
-static void
-on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
-{
- GstStructure *stats;
- GstRTSPMediaTrans *trans;
-
- GST_INFO ("%p: new source %p", stream, source);
-
- /* see if we have a stream to match with the origin of the RTCP packet */
- trans = g_object_get_qdata (source, ssrc_stream_map_key);
- if (trans == NULL) {
- g_object_get (source, "stats", &stats, NULL);
- if (stats) {
- const gchar *rtcp_from;
-
- dump_structure (stats);
-
- rtcp_from = gst_structure_get_string (stats, "rtcp-from");
- if ((trans = find_transport (stream, rtcp_from))) {
- GST_INFO ("%p: found transport %p for source %p", stream, trans,
- source);
-
- /* keep ref to the source */
- trans->rtpsource = source;
-
- g_object_set_qdata (source, ssrc_stream_map_key, trans);
- }
- gst_structure_free (stats);
- }
- } else {
- GST_INFO ("%p: source %p for transport %p", stream, source, trans);
- }
-}
-
-static void
-on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
-{
- GST_INFO ("%p: new SDES %p", stream, source);
-}
-
-static void
-on_ssrc_active (GObject * session, GObject * source,
- GstRTSPMediaStream * stream)
-{
- GstRTSPMediaTrans *trans;
-
- trans = g_object_get_qdata (source, ssrc_stream_map_key);
-
- GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
-
- if (trans && trans->keep_alive)
- trans->keep_alive (trans->ka_user_data);
-
-#ifdef DUMP_STATS
- {
- GstStructure *stats;
- g_object_get (source, "stats", &stats, NULL);
- if (stats) {
- dump_structure (stats);
- gst_structure_free (stats);
- }
- }
-#endif
-}
-
-static void
-on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
-{
- GST_INFO ("%p: source %p bye", stream, source);
-}
-
-static void
-on_bye_timeout (GObject * session, GObject * source,
- GstRTSPMediaStream * stream)
-{
- GstRTSPMediaTrans *trans;
-
- GST_INFO ("%p: source %p bye timeout", stream, source);
-
- if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
- trans->rtpsource = NULL;
- trans->timeout = TRUE;
- }
-}
-
-static void
-on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
-{
- GstRTSPMediaTrans *trans;
-
- GST_INFO ("%p: source %p timeout", stream, source);
-
- if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
- trans->rtpsource = NULL;
- trans->timeout = TRUE;
- }
-}
-
-static GstFlowReturn
-handle_new_buffer (GstAppSink * sink, gpointer user_data)
-{
- GList *walk;
- GstBuffer *buffer;
- GstRTSPMediaStream *stream;
-
- buffer = gst_app_sink_pull_buffer (sink);
- if (!buffer)
- return GST_FLOW_OK;
-
- stream = (GstRTSPMediaStream *) user_data;
-
- for (walk = stream->transports; walk; walk = g_list_next (walk)) {
- GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
-
- if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
- if (tr->send_rtp)
- tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
- } else {
- if (tr->send_rtcp)
- tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
- }
- }
- gst_buffer_unref (buffer);
-
- return GST_FLOW_OK;
-}
-
-static GstFlowReturn
-handle_new_buffer_list (GstAppSink * sink, gpointer user_data)
-{
- GList *walk;
- GstBufferList *blist;
- GstRTSPMediaStream *stream;
-
- blist = gst_app_sink_pull_buffer_list (sink);
- if (!blist)
- return GST_FLOW_OK;
-
- stream = (GstRTSPMediaStream *) user_data;
-
- for (walk = stream->transports; walk; walk = g_list_next (walk)) {
- GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
-
- if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
- if (tr->send_rtp_list)
- tr->send_rtp_list (blist, tr->transport->interleaved.min,
- tr->user_data);
- } else {
- if (tr->send_rtcp_list)
- tr->send_rtcp_list (blist, tr->transport->interleaved.max,
- tr->user_data);
- }
- }
- gst_buffer_list_unref (blist);
-
- return GST_FLOW_OK;
-}
-
-static GstAppSinkCallbacks sink_cb = {
- NULL, /* not interested in EOS */
- NULL, /* not interested in preroll buffers */
- handle_new_buffer,
- handle_new_buffer_list
-};
-
-/* prepare the pipeline objects to handle @stream in @media */
-static gboolean
-setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
-{
- gchar *name;
- GstPad *pad, *teepad, *selpad;
- GstPadLinkReturn ret;
- gint i;
-
- /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
- * for sending RTP/RTCP. The sender and receiver ports are shared between the
- * elements */
- if (!alloc_udp_ports (media, stream))
- return FALSE;
-
- /* add the ports to the pipeline */
- for (i = 0; i < 2; i++) {
- gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
- gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
- }
-
- /* create elements for the TCP transfer */
- for (i = 0; i < 2; i++) {
- stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
- stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
- g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
- g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
- g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
- gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
- gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
- gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
- &sink_cb, stream, NULL);
- }
-
- /* hook up the stream to the RTP session elements. */
- name = g_strdup_printf ("send_rtp_sink_%d", idx);
- stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
- g_free (name);
- name = g_strdup_printf ("send_rtp_src_%d", idx);
- stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
- g_free (name);
- name = g_strdup_printf ("send_rtcp_src_%d", idx);
- stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
- g_free (name);
- name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
- stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
- g_free (name);
- name = g_strdup_printf ("recv_rtp_sink_%d", idx);
- stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
- g_free (name);
-
- /* get the session */
- g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
- &stream->session);
-
- g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
- stream);
- g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
- stream);
- g_signal_connect (stream->session, "on-ssrc-active",
- (GCallback) on_ssrc_active, stream);
- g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
- stream);
- g_signal_connect (stream->session, "on-bye-timeout",
- (GCallback) on_bye_timeout, stream);
- g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
- stream);
-
- /* link the RTP pad to the session manager */
- ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
- if (ret != GST_PAD_LINK_OK)
- goto link_failed;
-
- /* make tee for RTP and link to stream */
- stream->tee[0] = gst_element_factory_make ("tee", NULL);
- gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
-
- pad = gst_element_get_static_pad (stream->tee[0], "sink");
- gst_pad_link (stream->send_rtp_src, pad);
- gst_object_unref (pad);
-
- /* link RTP sink, we're pretty sure this will work. */
- teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
- pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
- gst_pad_link (teepad, pad);
- gst_object_unref (pad);
- gst_object_unref (teepad);
-
- teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
- pad = gst_element_get_static_pad (stream->appsink[0], "sink");
- gst_pad_link (teepad, pad);
- gst_object_unref (pad);
- gst_object_unref (teepad);
-
- /* make tee for RTCP */
- stream->tee[1] = gst_element_factory_make ("tee", NULL);
- gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
-
- pad = gst_element_get_static_pad (stream->tee[1], "sink");
- gst_pad_link (stream->send_rtcp_src, pad);
- gst_object_unref (pad);
-
- /* link RTCP elements */
- teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
- pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
- gst_pad_link (teepad, pad);
- gst_object_unref (pad);
- gst_object_unref (teepad);
-
- teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
- pad = gst_element_get_static_pad (stream->appsink[1], "sink");
- gst_pad_link (teepad, pad);
- gst_object_unref (pad);
- gst_object_unref (teepad);
-
- /* make selector for the RTP receivers */
- stream->selector[0] = gst_element_factory_make ("rtspfunnel", NULL);
- gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
-
- pad = gst_element_get_static_pad (stream->selector[0], "src");
- gst_pad_link (pad, stream->recv_rtp_sink);
- gst_object_unref (pad);
-
- selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
- pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
- gst_pad_link (pad, selpad);
- gst_object_unref (pad);
- gst_object_unref (selpad);
-
- selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
- pad = gst_element_get_static_pad (stream->appsrc[0], "src");
- gst_pad_link (pad, selpad);
- gst_object_unref (pad);
- gst_object_unref (selpad);
-
- /* make selector for the RTCP receivers */
- stream->selector[1] = gst_element_factory_make ("rtspfunnel", NULL);
- gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
-
- pad = gst_element_get_static_pad (stream->selector[1], "src");
- gst_pad_link (pad, stream->recv_rtcp_sink);
- gst_object_unref (pad);
-
- selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
- pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
- gst_pad_link (pad, selpad);
- gst_object_unref (pad);
- gst_object_unref (selpad);
-
- selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
- pad = gst_element_get_static_pad (stream->appsrc[1], "src");
- gst_pad_link (pad, selpad);
- gst_object_unref (pad);
- gst_object_unref (selpad);
-
- /* we set and keep these to playing so that they don't cause NO_PREROLL return
- * values */
- gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
- gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
- gst_element_set_locked_state (stream->udpsrc[0], TRUE);
- gst_element_set_locked_state (stream->udpsrc[1], TRUE);
-
- /* be notified of caps changes */
- stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
- (GCallback) caps_notify, stream);
-
- stream->prepared = TRUE;
-
- return TRUE;
-
- /* ERRORS */
-link_failed:
- {
- GST_WARNING ("failed to link stream %d", idx);
- return FALSE;
- }
-}
-
-static void
-unlock_streams (GstRTSPMedia * media)
-{
- guint i, n_streams;
-
- /* unlock the udp src elements */
- n_streams = gst_rtsp_media_n_streams (media);
- for (i = 0; i < n_streams; i++) {
- GstRTSPMediaStream *stream;
-
- stream = gst_rtsp_media_get_stream (media, i);
-
- gst_element_set_locked_state (stream->udpsrc[0], FALSE);
- gst_element_set_locked_state (stream->udpsrc[1], FALSE);
- }
-}
-