From: Wim Taymans Date: Wed, 11 Mar 2009 15:45:12 +0000 (+0100) Subject: Add TCP transports X-Git-Tag: 1.6.0~971 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=ebc28a47dad000f8344ab803a43fc981d1fca339;p=platform%2Fupstream%2Fgst-rtsp-server.git Add TCP transports Use appsrc and appsink to send and receive RTP/RTCP packets in the TCP connection. --- diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c index 811ae03..70b0e23 100644 --- a/gst/rtsp-server/rtsp-client.c +++ b/gst/rtsp-server/rtsp-client.c @@ -190,9 +190,6 @@ send_response (GstRTSPClient *client, GstRTSPSession *session, GstRTSPMessage *r gst_rtsp_message_remove_header (response, GST_RTSP_HDR_SESSION, -1); } -#if 0 - gst_rtsp_connection_send (client->connection, response, &timeout); -#endif gst_rtsp_watch_queue_message (client->watch, response); gst_rtsp_message_unset (response); } @@ -297,6 +294,81 @@ no_prepare: } static gboolean +do_send_data (GstBuffer *buffer, guint8 channel, GstRTSPClient *client) +{ + GstRTSPMessage message = { 0 }; + guint8 *data; + guint size; + + gst_rtsp_message_init_data (&message, channel); + + data = GST_BUFFER_DATA (buffer); + size = GST_BUFFER_SIZE (buffer); + gst_rtsp_message_take_body (&message, data, size); + + gst_rtsp_watch_queue_message (client->watch, &message); + + gst_rtsp_message_steal_body (&message, &data, &size); + gst_rtsp_message_unset (&message); + + return TRUE; +} + +static void +link_stream (GstRTSPClient *client, GstRTSPSessionStream *stream) +{ + gst_rtsp_session_stream_set_callbacks (stream, (GstRTSPSendFunc) do_send_data, + (GstRTSPSendFunc) do_send_data, client, NULL); + client->streams = g_list_prepend (client->streams, stream); +} + +static void +unlink_stream (GstRTSPClient *client, GstRTSPSessionStream *stream) +{ + gst_rtsp_session_stream_set_callbacks (stream, NULL, + NULL, client, g_object_unref); + client->streams = g_list_remove (client->streams, stream); +} + +static void +unlink_streams (GstRTSPClient *client) +{ + GList *walk; + + for (walk = client->streams; walk; walk = g_list_next (walk)) { + GstRTSPSessionStream *stream = (GstRTSPSessionStream *) walk->data; + + gst_rtsp_session_stream_set_callbacks (stream, NULL, + NULL, NULL, NULL); + } + g_list_free (client->streams); + client->streams = NULL; +} + +static void +unlink_session_streams (GstRTSPClient *client, GstRTSPSessionMedia *media) +{ + guint n_streams, i; + + n_streams = gst_rtsp_media_n_streams (media->media); + for (i = 0; i < n_streams; i++) { + GstRTSPSessionStream *sstream; + GstRTSPTransport *tr; + + /* get the stream as configured in the session */ + sstream = gst_rtsp_session_media_get_stream (media, i); + /* get the transport, if there is no transport configured, skip this stream */ + if (!(tr = sstream->trans.transport)) + continue; + + if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) { + /* for TCP, unlink the stream from the TCP connection of the client */ + unlink_stream (client, sstream); + } + } +} + +static gboolean handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request) { GstRTSPSessionMedia *media; @@ -311,7 +383,10 @@ handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession if (!media) goto not_found; - gst_rtsp_session_media_stop (media); + /* unlink the all TCP callbacks */ + unlink_session_streams (client, media); + + gst_rtsp_session_media_set_state (media, GST_STATE_NULL); /* unmanage the media in the session, returns false if all media session * are torn down. */ @@ -360,7 +435,11 @@ handle_pause_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se media->state != GST_RTSP_STATE_RECORDING) goto invalid_state; - gst_rtsp_session_media_pause (media); + /* unlink the all TCP callbacks */ + unlink_session_streams (client, media); + + /* then pause sending */ + gst_rtsp_session_media_set_state (media, GST_STATE_PAUSED); /* construct the response now */ code = GST_RTSP_STS_OK; @@ -420,10 +499,23 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses n_streams = gst_rtsp_media_n_streams (media->media); for (i = 0; i < n_streams; i++) { + GstRTSPSessionStream *sstream; GstRTSPMediaStream *stream; + GstRTSPTransport *tr; gchar *uristr; - stream = gst_rtsp_media_get_stream (media->media, i); + /* get the stream as configured in the session */ + sstream = gst_rtsp_session_media_get_stream (media, i); + /* get the transport, if there is no transport configured, skip this stream */ + if (!(tr = sstream->trans.transport)) + continue; + + if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) { + /* for TCP, link the stream to the TCP connection of the client */ + link_stream (client, sstream); + } + + stream = sstream->media_stream; g_object_get (G_OBJECT (stream->payloader), "seqnum", &seqnum, NULL); g_object_get (G_OBJECT (stream->payloader), "timestamp", ×tamp, NULL); @@ -451,7 +543,7 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses send_response (client, session, &response); /* start playing after sending the request */ - gst_rtsp_session_media_play (media); + gst_rtsp_session_media_set_state (media, GST_STATE_PLAYING); media->state = GST_RTSP_STATE_PLAYING; @@ -594,12 +686,6 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se if (!(stream = gst_rtsp_session_media_get_stream (media, streamid))) goto no_stream; - /* setup the server transport from the client transport */ - if (ct->lower_transport == GST_RTSP_LOWER_TRANS_TCP) { - ct->port.min = gst_rtsp_connection_get_readfd (client->connection); - ct->port.max = gst_rtsp_connection_get_writefd (client->connection); - } - st = gst_rtsp_session_stream_set_transport (stream, ct); /* serialize the server transport */ @@ -806,6 +892,8 @@ handle_request (GstRTSPClient *client, GstRTSPMessage *request) gst_rtsp_message_dump (request); #endif + g_message ("client %p: received a request", client); + gst_rtsp_message_parse_request (request, &method, &uristr, &version); if (version != GST_RTSP_VERSION_1_0) { @@ -889,6 +977,54 @@ session_not_found: } } +static void +handle_data (GstRTSPClient *client, GstRTSPMessage *message) +{ + GstRTSPResult res; + guint8 channel; + GList *walk; + guint8 *data; + guint size; + GstBuffer *buffer; + + /* find the stream for this message */ + res = gst_rtsp_message_parse_data (message, &channel); + if (res != GST_RTSP_OK) + return; + + gst_rtsp_message_steal_body (message, &data, &size); + + buffer = gst_buffer_new (); + GST_BUFFER_DATA (buffer) = data; + GST_BUFFER_MALLOCDATA (buffer) = data; + GST_BUFFER_SIZE (buffer) = size; + + for (walk = client->streams; walk; walk = g_list_next (walk)) { + GstRTSPSessionStream *stream = (GstRTSPSessionStream *) walk->data; + GstRTSPMediaStream *mstream; + GstRTSPTransport *tr; + + /* get the transport, if there is no transport configured, skip this stream */ + if (!(tr = stream->trans.transport)) + continue; + + /* we also need a media stream */ + if (!(mstream = stream->media_stream)) + continue; + + /* check for TCP transport */ + if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) { + /* dispatch to the stream based on the channel number */ + if (tr->interleaved.min == channel) { + gst_rtsp_media_stream_rtp (mstream, buffer); + } else if (tr->interleaved.max == channel) { + gst_rtsp_media_stream_rtcp (mstream, buffer); + } + } + } + gst_buffer_unref (buffer); +} + /** * gst_rtsp_client_set_timeout: * @client: a #GstRTSPClient @@ -1008,8 +1144,6 @@ message_received (GstRTSPWatch *watch, GstRTSPMessage *message, gpointer user_da { GstRTSPClient *client = GST_RTSP_CLIENT (user_data); - g_message ("client %p: received a message", client); - switch (message->type) { case GST_RTSP_MESSAGE_REQUEST: handle_request (client, message); @@ -1017,6 +1151,7 @@ message_received (GstRTSPWatch *watch, GstRTSPMessage *message, gpointer user_da case GST_RTSP_MESSAGE_RESPONSE: break; case GST_RTSP_MESSAGE_DATA: + handle_data (client, message); break; default: break; @@ -1048,6 +1183,9 @@ closed (GstRTSPWatch *watch, gpointer user_data) g_mutex_unlock (tunnels_lock); } + /* remove all streams that are streaming over this client connection */ + unlink_streams (client); + return GST_RTSP_OK; } diff --git a/gst/rtsp-server/rtsp-client.h b/gst/rtsp-server/rtsp-client.h index 9949f1a..07b81d9 100644 --- a/gst/rtsp-server/rtsp-client.h +++ b/gst/rtsp-server/rtsp-client.h @@ -59,10 +59,14 @@ typedef struct _GstRTSPClientClass GstRTSPClientClass; * GstRTSPClient: * * @connection: the connection object handling the client request. - * @address: the address of the connection - * @media: handle to the media handled by the client. - * @pool: handle to the session pool used by the client. - * @thread: thread to handle the client connection + * @watch: watch for the connection + * @watchid: id of the watch + * @timeout: the session timeout + * @session_pool: handle to the session pool used by the client. + * @media_mapping: handle to the media mapping used by the client. + * @uri: cached uri + * @media: cached media + * @streams: a list of streams using @connection. * * The client structure. */ @@ -79,6 +83,8 @@ struct _GstRTSPClient { GstRTSPUrl *uri; GstRTSPMedia *media; + + GList *streams; }; struct _GstRTSPClientClass { diff --git a/gst/rtsp-server/rtsp-media.c b/gst/rtsp-server/rtsp-media.c index 6284612..b75d386 100644 --- a/gst/rtsp-server/rtsp-media.c +++ b/gst/rtsp-server/rtsp-media.c @@ -69,7 +69,6 @@ static void gst_rtsp_media_init (GstRTSPMedia * media) { media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *)); - media->complete = FALSE; media->is_live = FALSE; media->buffering = FALSE; } @@ -251,6 +250,46 @@ gst_rtsp_media_get_stream (GstRTSPMedia *media, guint idx) return res; } +/** + * gst_rtsp_media_stream_rtp: + * @stream: a #GstRTSPMediaStream + * @buffer: a #GstBuffer + * + * Handle an RTP buffer for the stream. This method is usually called when a + * message has been received from a client using the TCP transport. + * + * Returns: a GstFlowReturn. + */ +GstFlowReturn +gst_rtsp_media_stream_rtp (GstRTSPMediaStream *stream, GstBuffer *buffer) +{ + GstFlowReturn ret; + + g_signal_emit_by_name (stream->appsrc[0], "push-buffer", buffer, &ret); + + return ret; +} + +/** + * gst_rtsp_media_stream_rtcp: + * @stream: a #GstRTSPMediaStream + * @buffer: a #GstBuffer + * + * Handle an RTCP buffer for the stream. This method is usually called when a + * message has been received from a client using the TCP transport. + * + * Returns: a GstFlowReturn. + */ +GstFlowReturn +gst_rtsp_media_stream_rtcp (GstRTSPMediaStream *stream, GstBuffer *buffer) +{ + GstFlowReturn ret; + + g_signal_emit_by_name (stream->appsrc[1], "push-buffer", buffer, &ret); + + return GST_FLOW_ERROR; +} + /* Allocate the udp ports and sockets */ static gboolean alloc_udp_ports (GstRTSPMediaStream * stream) @@ -461,19 +500,67 @@ on_timeout (GObject *session, GObject *source, GstRTSPMedia *media) g_message ("%p: source %p timeout", media, source); } +static void +handle_new_buffer (GstElement *sink, GstRTSPMediaStream *stream) +{ + GList *walk; + GstBuffer *buffer; + + g_signal_emit_by_name (sink, "pull-buffer", &buffer); + if (!buffer) + return; + + for (walk = stream->transports; walk; walk = g_list_next (walk)) { + GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data; + + if (sink == stream->appsink[0]) { + if (tr->send_rtp) + tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data); + } + else { + if (tr->send_rtcp) + tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data); + } + } + gst_buffer_unref (buffer); +} + /* prepare the pipeline objects to handle @stream in @media */ static gboolean setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media) { gchar *name; - GstPad *pad; + GstPad *pad, *teepad, *selpad; + GstPadLinkReturn ret; + gint i; + GstElement *tee, *selector; - alloc_udp_ports (stream); + /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2 + * for sending RTP/RTCP. The sender and receiver ports are shared between the + * elements */ + if (!alloc_udp_ports (stream)) + return FALSE; - gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[0]); - gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[1]); - gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[0]); - gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[1]); + /* add the ports to the pipeline */ + for (i = 0; i < 2; i++) { + gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]); + gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]); + } + + /* create elements for the TCP transfer */ + for (i = 0; i < 2; i++) { + stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL); + stream->appsink[i] = gst_element_factory_make ("appsink", NULL); + g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL); + g_object_set (stream->appsink[i], "emit-signals", TRUE, NULL); + g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL); + gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]); + gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]); + } + g_signal_connect (stream->appsink[0], "new-buffer", + (GCallback) handle_new_buffer, stream); + g_signal_connect (stream->appsink[1], "new-buffer", + (GCallback) handle_new_buffer, stream); /* hook up the stream to the RTP session elements. */ name = g_strdup_printf ("send_rtp_sink_%d", idx); @@ -505,19 +592,73 @@ setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media) media); /* link the RTP pad to the session manager */ - gst_pad_link (stream->srcpad, stream->send_rtp_sink); + ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink); + if (ret != GST_PAD_LINK_OK) + goto link_failed; - /* link udp elements */ - pad = gst_element_get_static_pad (stream->udpsink[0], "sink"); + /* make tee for RTP and link to stream */ + tee = gst_element_factory_make ("tee", NULL); + gst_bin_add (GST_BIN_CAST (media->pipeline), tee); + + pad = gst_element_get_static_pad (tee, "sink"); gst_pad_link (stream->send_rtp_src, pad); gst_object_unref (pad); - pad = gst_element_get_static_pad (stream->udpsink[1], "sink"); + + /* link RTP sink, we're pretty sure this will work. */ + teepad = gst_element_get_request_pad (tee, "src%d"); + pad = gst_element_get_static_pad (stream->udpsink[0], "sink"); + gst_pad_link (teepad, pad); + gst_object_unref (pad); + gst_object_unref (teepad); + + teepad = gst_element_get_request_pad (tee, "src%d"); + pad = gst_element_get_static_pad (stream->appsink[0], "sink"); + gst_pad_link (teepad, pad); + gst_object_unref (pad); + gst_object_unref (teepad); + + /* make tee for RTCP */ + tee = gst_element_factory_make ("tee", NULL); + gst_bin_add (GST_BIN_CAST (media->pipeline), tee); + + pad = gst_element_get_static_pad (tee, "sink"); gst_pad_link (stream->send_rtcp_src, pad); gst_object_unref (pad); - pad = gst_element_get_static_pad (stream->udpsrc[1], "src"); + + /* link RTCP elements */ + teepad = gst_element_get_request_pad (tee, "src%d"); + pad = gst_element_get_static_pad (stream->udpsink[1], "sink"); + gst_pad_link (teepad, pad); + gst_object_unref (pad); + gst_object_unref (teepad); + + teepad = gst_element_get_request_pad (tee, "src%d"); + pad = gst_element_get_static_pad (stream->appsink[1], "sink"); + gst_pad_link (teepad, pad); + gst_object_unref (pad); + gst_object_unref (teepad); + + /* make selector for the RTCP receivers */ + selector = gst_element_factory_make ("input-selector", NULL); + g_object_set (selector, "select-all", TRUE, NULL); + gst_bin_add (GST_BIN_CAST (media->pipeline), selector); + + pad = gst_element_get_static_pad (selector, "src"); gst_pad_link (pad, stream->recv_rtcp_sink); gst_object_unref (pad); + selpad = gst_element_get_request_pad (selector, "sink%d"); + pad = gst_element_get_static_pad (stream->udpsrc[1], "src"); + gst_pad_link (pad, selpad); + gst_object_unref (pad); + gst_object_unref (selpad); + + selpad = gst_element_get_request_pad (selector, "sink%d"); + pad = gst_element_get_static_pad (stream->appsrc[1], "src"); + gst_pad_link (pad, selpad); + gst_object_unref (pad); + gst_object_unref (selpad); + /* we set and keep these to playing so that they don't cause NO_PREROLL return * values */ gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING); @@ -532,6 +673,13 @@ setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media) stream->prepared = TRUE; return TRUE; + + /* ERRORS */ +link_failed: + { + g_warning ("failed to link stream %d", idx); + return FALSE; + } } static void @@ -648,6 +796,17 @@ default_handle_message (GstRTSPMedia *media, GstMessage *message) g_free (debug); break; } + case GST_MESSAGE_WARNING: + { + GError *gerror; + gchar *debug; + + gst_message_parse_warning (message, &gerror, &debug); + g_warning ("%p: got warning %s (%s)", media, gerror->message, debug); + g_error_free (gerror); + g_free (debug); + break; + } default: g_message ("%p: got message type %s", media, gst_message_type_get_name (type)); break; @@ -678,6 +837,9 @@ bus_message (GstBus *bus, GstMessage *message, GstRTSPMedia *media) * Prepare @media for streaming. This function will create the pipeline and * other objects to manage the streaming. * + * It will preroll the pipeline and collect vital information about the streams + * such as the duration. + * * Returns: %TRUE on success. */ gboolean @@ -736,19 +898,22 @@ gst_rtsp_media_prepare (GstRTSPMedia *media) g_message ("live media %p", media); media->is_live = TRUE; ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING); + if (ret == GST_STATE_CHANGE_FAILURE) + goto state_failed; break; case GST_STATE_CHANGE_FAILURE: - { - unlock_streams (media); goto state_failed; - } } /* now wait for all pads to be prerolled */ ret = gst_element_get_state (media->pipeline, NULL, NULL, -1); + if (ret == GST_STATE_CHANGE_FAILURE) + goto state_failed; /* and back to PAUSED for live pipelines */ ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED); + if (ret == GST_STATE_CHANGE_FAILURE) + goto state_failed; /* collect stats about the media */ collect_media_stats (media); @@ -770,95 +935,58 @@ was_prepared: /* ERRORS */ state_failed: { + g_warning ("failed to preroll pipeline"); + unlock_streams (media); gst_element_set_state (media->pipeline, GST_STATE_NULL); return FALSE; } } /** - * gst_rtsp_media_play: + * gst_rtsp_media_set_state: * @media: a #GstRTSPMedia + * @state: the target state of the media * @transports: a GArray of #GstRTSPMediaTrans pointers * - * Start playing @media for to the transports in @transports. + * Set the state of @media to @state and for the transports in @transports. * * Returns: %TRUE on success. */ gboolean -gst_rtsp_media_play (GstRTSPMedia *media, GArray *transports) +gst_rtsp_media_set_state (GstRTSPMedia *media, GstState state, GArray *transports) { gint i; GstStateChangeReturn ret; + gboolean add, remove; g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE); g_return_val_if_fail (transports != NULL, FALSE); g_return_val_if_fail (media->prepared, FALSE); - if (media->target_state == GST_STATE_PLAYING) - return TRUE; + /* NULL and READY are the same */ + if (state == GST_STATE_READY) + state = GST_STATE_NULL; - for (i = 0; i < transports->len; i++) { - GstRTSPMediaTrans *tr; - GstRTSPMediaStream *stream; - GstRTSPTransport *trans; - - /* we need a non-NULL entry in the array */ - tr = g_array_index (transports, GstRTSPMediaTrans *, i); - if (tr == NULL) - continue; - - /* we need a transport */ - if (!(trans = tr->transport)) - continue; + if (media->target_state == state) + return TRUE; - /* get the stream and add the destinations */ - stream = gst_rtsp_media_get_stream (media, tr->idx); - switch (trans->lower_transport) { - case GST_RTSP_LOWER_TRANS_UDP: - case GST_RTSP_LOWER_TRANS_UDP_MCAST: - g_message ("adding %s:%d-%d", trans->destination, trans->client_port.min, trans->client_port.max); + add = remove = FALSE; - g_signal_emit_by_name (stream->udpsink[0], "add", trans->destination, trans->client_port.min, NULL); - g_signal_emit_by_name (stream->udpsink[1], "add", trans->destination, trans->client_port.max, NULL); - break; - case GST_RTSP_LOWER_TRANS_TCP: - g_message ("TCP transport not yet implemented"); - break; - default: - g_message ("Unknown transport %d", trans->lower_transport); - break; - } + switch (state) { + case GST_STATE_NULL: + case GST_STATE_PAUSED: + /* we're going from PLAYING to READY or NULL, remove */ + if (media->target_state == GST_STATE_PLAYING) + remove = TRUE; + break; + case GST_STATE_PLAYING: + /* we're going to PLAYING, add */ + add = TRUE; + break; + default: + break; } - g_message ("playing media %p", media); - media->target_state = GST_STATE_PLAYING; - ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING); - - return TRUE; -} - -/** - * gst_rtsp_media_pause: - * @media: a #GstRTSPMedia - * @transports: a array of #GstRTSPTransport pointers - * - * Pause playing @media for to the transports in @transports. - * - * Returns: %TRUE on success. - */ -gboolean -gst_rtsp_media_pause (GstRTSPMedia *media, GArray *transports) -{ - gint i; - GstStateChangeReturn ret; - - g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE); - g_return_val_if_fail (transports != NULL, FALSE); - g_return_val_if_fail (media->prepared, FALSE); - - if (media->target_state == GST_STATE_PAUSED) - return TRUE; - for (i = 0; i < transports->len; i++) { GstRTSPMediaTrans *tr; GstRTSPMediaStream *stream; @@ -875,56 +1003,35 @@ gst_rtsp_media_pause (GstRTSPMedia *media, GArray *transports) /* get the stream and add the destinations */ stream = gst_rtsp_media_get_stream (media, tr->idx); - switch (trans->lower_transport) { case GST_RTSP_LOWER_TRANS_UDP: case GST_RTSP_LOWER_TRANS_UDP_MCAST: - g_message ("removing %s:%d-%d", trans->destination, trans->client_port.min, trans->client_port.max); - - g_signal_emit_by_name (stream->udpsink[0], "remove", trans->destination, trans->client_port.min, NULL); - g_signal_emit_by_name (stream->udpsink[1], "remove", trans->destination, trans->client_port.max, NULL); + if (add) { + g_message ("adding %s:%d-%d", trans->destination, trans->client_port.min, trans->client_port.max); + g_signal_emit_by_name (stream->udpsink[0], "add", trans->destination, trans->client_port.min, NULL); + g_signal_emit_by_name (stream->udpsink[1], "add", trans->destination, trans->client_port.max, NULL); + } else if (remove) { + g_message ("removing %s:%d-%d", trans->destination, trans->client_port.min, trans->client_port.max); + g_signal_emit_by_name (stream->udpsink[0], "remove", trans->destination, trans->client_port.min, NULL); + g_signal_emit_by_name (stream->udpsink[1], "remove", trans->destination, trans->client_port.max, NULL); + } break; case GST_RTSP_LOWER_TRANS_TCP: - g_message ("TCP transport not yet implemented"); + if (add) { + stream->transports = g_list_prepend (stream->transports, tr); + } else if (remove) { + stream->transports = g_list_remove (stream->transports, tr); + } break; default: g_message ("Unknown transport %d", trans->lower_transport); break; } } - g_message ("pause media %p", media); - media->target_state = GST_STATE_PAUSED; - ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED); - - return TRUE; -} - -/** - * gst_rtsp_media_stream_stop: - * @media: a #GstRTSPMedia - * @transports: a GArray of #GstRTSPMediaTrans pointers - * - * Stop playing @media for to the transports in @transports. - * - * Returns: %TRUE on success. - */ -gboolean -gst_rtsp_media_stop (GstRTSPMedia *media, GArray *transports) -{ - GstStateChangeReturn ret; - - g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE); - g_return_val_if_fail (transports != NULL, FALSE); - g_return_val_if_fail (media->prepared, FALSE); - - if (media->target_state == GST_STATE_NULL) - return TRUE; - - gst_rtsp_media_pause (media, transports); - g_message ("stop media %p", media); - media->target_state = GST_STATE_NULL; - ret = gst_element_set_state (media->pipeline, GST_STATE_NULL); + g_message ("state %s media %p", gst_element_state_get_name (state), media); + media->target_state = state; + ret = gst_element_set_state (media->pipeline, state); return TRUE; } diff --git a/gst/rtsp-server/rtsp-media.h b/gst/rtsp-server/rtsp-media.h index f746be6..4aaaa1d 100644 --- a/gst/rtsp-server/rtsp-media.h +++ b/gst/rtsp-server/rtsp-media.h @@ -41,9 +41,15 @@ typedef struct _GstRTSPMedia GstRTSPMedia; typedef struct _GstRTSPMediaClass GstRTSPMediaClass; typedef struct _GstRTSPMediaTrans GstRTSPMediaTrans; +typedef gboolean (*GstRTSPSendFunc) (GstBuffer *buffer, guint8 channel, gpointer user_data); + /** * GstRTSPMediaTrans: * @idx: a stream index + * @send_rtp: callback for sending RTP messages + * @send_rtcp: callback for sending RTCP messages + * @user_data: user data passed in the callbacks + * @notify: free function for the user_data. * @transport: a transport description * * A Transport description for stream @idx @@ -51,13 +57,17 @@ typedef struct _GstRTSPMediaTrans GstRTSPMediaTrans; struct _GstRTSPMediaTrans { guint idx; + GstRTSPSendFunc send_rtp; + GstRTSPSendFunc send_rtcp; + gpointer user_data; + GDestroyNotify notify; + GstRTSPTransport *transport; }; /** * GstRTSPMediaStream: * - * @media: the owner #GstRTSPMedia * @srcpad: the srcpad of the stream * @payloader: the payloader of the format * @prepared: if the stream is prepared for streaming @@ -68,8 +78,12 @@ struct _GstRTSPMediaTrans { * @recv_rtcp_src: srcpad for RTCP buffers * @udpsrc: the udp source elements for RTP/RTCP * @udpsink: the udp sink elements for RTP/RTCP + * @appsrc: the app source elements for RTP/RTCP + * @appsink: the app sink elements for RTP/RTCP + * @server_port: the server ports for this stream * @caps_sig: the signal id for detecting caps * @caps: the caps of the stream + * @tranports: the current transports being streamed * * The definition of a media stream. The streams are identified by @id. */ @@ -91,7 +105,9 @@ struct _GstRTSPMediaStream { * sockets */ GstElement *udpsrc[2]; GstElement *udpsink[2]; + /* for TCP transport */ GstElement *appsrc[2]; + GstElement *appsink[2]; /* server ports for sending/receiving */ GstRTSPRange server_port; @@ -99,17 +115,25 @@ struct _GstRTSPMediaStream { /* the caps of the stream */ gulong caps_sig; GstCaps *caps; + + /* transports we stream to */ + GList *transports; }; /** * GstRTSPMedia: * @shared: if this media can be shared between clients * @element: the data providing element - * @stream: the different streams provided by @element + * @streams: the different streams provided by @element * @prepared: if the media is prepared for streaming * @pipeline: the toplevel pipeline + * @source: the bus watch for pipeline messages. + * @id: the id of the watch + * @is_live: if the pipeline is live + * @buffering: if the pipeline is buffering + * @target_state: the desired target state of the pipeline * @rtpbin: the rtpbin - * @multifdsink: multifdsink element for TCP transport + * @range: the range of the media being streamed * * A class that contains the GStreamer element along with a list of * #GstRTSPediaStream objects that can produce data. @@ -120,7 +144,6 @@ struct _GstRTSPMedia { GObject parent; gboolean shared; - gboolean complete; GstElement *element; GArray *streams; @@ -138,9 +161,6 @@ struct _GstRTSPMedia { /* RTP session manager */ GstElement *rtpbin; - /* for TCP transport */ - GstElement *multifdsink; - /* the range of media */ GstRTSPTimeRange range; }; @@ -180,9 +200,10 @@ gboolean gst_rtsp_media_prepare (GstRTSPMedia *media); guint gst_rtsp_media_n_streams (GstRTSPMedia *media); GstRTSPMediaStream * gst_rtsp_media_get_stream (GstRTSPMedia *media, guint idx); -gboolean gst_rtsp_media_play (GstRTSPMedia *media, GArray *trans); -gboolean gst_rtsp_media_pause (GstRTSPMedia *media, GArray *trans); -gboolean gst_rtsp_media_stop (GstRTSPMedia *media, GArray *trans); +GstFlowReturn gst_rtsp_media_stream_rtp (GstRTSPMediaStream *stream, GstBuffer *buffer); +GstFlowReturn gst_rtsp_media_stream_rtcp (GstRTSPMediaStream *stream, GstBuffer *buffer); + +gboolean gst_rtsp_media_set_state (GstRTSPMedia *media, GstState state, GArray *trans); G_END_DECLS diff --git a/gst/rtsp-server/rtsp-server.c b/gst/rtsp-server/rtsp-server.c index b860563..e9c965e 100644 --- a/gst/rtsp-server/rtsp-server.c +++ b/gst/rtsp-server/rtsp-server.c @@ -220,7 +220,6 @@ gst_rtsp_server_set_session_pool (GstRTSPServer *server, GstRTSPSessionPool *poo } } - /** * gst_rtsp_server_get_session_pool: * @server: a #GstRTSPServer diff --git a/gst/rtsp-server/rtsp-session.c b/gst/rtsp-server/rtsp-session.c index a80e280..b590146 100644 --- a/gst/rtsp-server/rtsp-session.c +++ b/gst/rtsp-server/rtsp-session.c @@ -89,7 +89,7 @@ gst_rtsp_session_free_media (GstRTSPSessionMedia *media, GstRTSPSession *session g_message ("free session media %p", media); - gst_rtsp_session_media_stop (media); + gst_rtsp_session_media_set_state (media, GST_STATE_NULL); for (i = 0; i < size; i++) { GstRTSPSessionStream *stream; @@ -476,75 +476,59 @@ gst_rtsp_session_stream_set_transport (GstRTSPSessionStream *stream, st->profile = ct->profile; st->lower_transport = ct->lower_transport; st->client_port = ct->client_port; + st->interleaved = ct->interleaved; + st->server_port.min = stream->media_stream->server_port.min; + st->server_port.max = stream->media_stream->server_port.max; - /* keep track of the transports */ + /* keep track of the transports in the stream. */ if (stream->trans.transport) gst_rtsp_transport_free (stream->trans.transport); stream->trans.transport = ct; - st->server_port.min = stream->media_stream->server_port.min; - st->server_port.max = stream->media_stream->server_port.max; - return st; } /** - * gst_rtsp_session_media_play: - * @media: a #GstRTSPSessionMedia - * - * Tell the media object @media to start playing and streaming to the client. - * - * Returns: %TRUE on success. - */ -gboolean -gst_rtsp_session_media_play (GstRTSPSessionMedia *media) -{ - gboolean ret; - - g_return_val_if_fail (media != NULL, FALSE); - - ret = gst_rtsp_media_play (media->media, media->streams); - - return ret; -} - -/** - * gst_rtsp_session_media_pause: - * @media: a #GstRTSPSessionMedia - * - * Tell the media object @media to pause. + * gst_rtsp_session_stream_set_callbacks: + * @stream: a #GstRTSPSessionStream + * @send_rtp: a callback called when RTP should be send + * @send_rtcp: a callback called when RTCP should be send + * @user_data: user data passed to callbacks + * @notify: called with the user_data when no longer needed. * - * Returns: %TRUE on success. + * Install callbacks that will be called when data for a stream should be sent + * to a client. This is usually used when sending RTP/RTCP over TCP. */ -gboolean -gst_rtsp_session_media_pause (GstRTSPSessionMedia *media) +void +gst_rtsp_session_stream_set_callbacks (GstRTSPSessionStream *stream, + GstRTSPSendFunc send_rtp, GstRTSPSendFunc send_rtcp, gpointer user_data, + GDestroyNotify notify) { - gboolean ret; - - g_return_val_if_fail (media != NULL, FALSE); - - ret = gst_rtsp_media_pause (media->media, media->streams); - - return ret; + stream->trans.send_rtp = send_rtp; + stream->trans.send_rtcp = send_rtcp; + if (stream->trans.notify) + stream->trans.notify (stream->trans.user_data); + stream->trans.user_data = user_data; + stream->trans.notify = notify; } /** - * gst_rtsp_session_media_stop: + * gst_rtsp_session_media_set_state: * @media: a #GstRTSPSessionMedia + * @state: the new state * - * Tell the media object @media to stop playing. After this call the media - * cannot be played or paused anymore + * Tell the media object @media to change to @state. * * Returns: %TRUE on success. */ gboolean -gst_rtsp_session_media_stop (GstRTSPSessionMedia *media) +gst_rtsp_session_media_set_state (GstRTSPSessionMedia *media, GstState state) { gboolean ret; g_return_val_if_fail (media != NULL, FALSE); - ret = gst_rtsp_media_stop (media->media, media->streams); + ret = gst_rtsp_media_set_state (media->media, state, media->streams); return ret; } diff --git a/gst/rtsp-server/rtsp-session.h b/gst/rtsp-server/rtsp-session.h index b3797b1..495bc75 100644 --- a/gst/rtsp-server/rtsp-session.h +++ b/gst/rtsp-server/rtsp-session.h @@ -132,9 +132,7 @@ gboolean gst_rtsp_session_release_media (GstRTSPSession *se GstRTSPSessionMedia * gst_rtsp_session_get_media (GstRTSPSession *sess, const GstRTSPUrl *uri); /* control media */ -gboolean gst_rtsp_session_media_play (GstRTSPSessionMedia *media); -gboolean gst_rtsp_session_media_pause (GstRTSPSessionMedia *media); -gboolean gst_rtsp_session_media_stop (GstRTSPSessionMedia *media); +gboolean gst_rtsp_session_media_set_state (GstRTSPSessionMedia *media, GstState state); /* get stream config */ GstRTSPSessionStream * gst_rtsp_session_media_get_stream (GstRTSPSessionMedia *media, @@ -143,6 +141,11 @@ GstRTSPSessionStream * gst_rtsp_session_media_get_stream (GstRTSPSessionMedi /* configure transport */ GstRTSPTransport * gst_rtsp_session_stream_set_transport (GstRTSPSessionStream *stream, GstRTSPTransport *ct); +void gst_rtsp_session_stream_set_callbacks (GstRTSPSessionStream *stream, + GstRTSPSendFunc send_rtp, + GstRTSPSendFunc send_rtcp, + gpointer user_data, + GDestroyNotify notify); G_END_DECLS