From: Wim Taymans Date: Tue, 26 May 2009 17:01:10 +0000 (+0200) Subject: rtsp: use RTCP to keep the session alive X-Git-Tag: 1.6.0~933 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=9bed89c3b7f05bc0cb8f7fd1cad49ca6e05c414e;p=platform%2Fupstream%2Fgst-rtsp-server.git rtsp: use RTCP to keep the session alive Use the RTCP rtcp-from stats field to find the associated session and use this to keep the session alive. --- diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c index b7ac1d0..aeaf52a 100644 --- a/gst/rtsp-server/rtsp-client.c +++ b/gst/rtsp-server/rtsp-client.c @@ -35,13 +35,14 @@ enum PROP_LAST }; -static void gst_rtsp_client_get_property (GObject *object, guint propid, - GValue *value, GParamSpec *pspec); -static void gst_rtsp_client_set_property (GObject *object, guint propid, - const GValue *value, GParamSpec *pspec); +static void gst_rtsp_client_get_property (GObject * object, guint propid, + GValue * value, GParamSpec * pspec); +static void gst_rtsp_client_set_property (GObject * object, guint propid, + const GValue * value, GParamSpec * pspec); static void gst_rtsp_client_finalize (GObject * obj); -static void client_session_finalized (GstRTSPClient *client, GstRTSPSession *session); +static void client_session_finalized (GstRTSPClient * client, + GstRTSPSession * session); G_DEFINE_TYPE (GstRTSPClient, gst_rtsp_client, G_TYPE_OBJECT); @@ -59,14 +60,17 @@ gst_rtsp_client_class_init (GstRTSPClientClass * klass) g_object_class_install_property (gobject_class, PROP_SESSION_POOL, g_param_spec_object ("session-pool", "Session Pool", "The session pool to use for client session", - GST_TYPE_RTSP_SESSION_POOL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + GST_TYPE_RTSP_SESSION_POOL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MEDIA_MAPPING, g_param_spec_object ("media-mapping", "Media Mapping", "The media mapping to use for client session", - GST_TYPE_RTSP_MEDIA_MAPPING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + GST_TYPE_RTSP_MEDIA_MAPPING, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - tunnels = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_object_unref); + tunnels = + g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_object_unref); tunnels_lock = g_mutex_new (); } @@ -100,8 +104,8 @@ gst_rtsp_client_finalize (GObject * obj) } static void -gst_rtsp_client_get_property (GObject *object, guint propid, - GValue *value, GParamSpec *pspec) +gst_rtsp_client_get_property (GObject * object, guint propid, + GValue * value, GParamSpec * pspec) { GstRTSPClient *client = GST_RTSP_CLIENT (object); @@ -118,8 +122,8 @@ gst_rtsp_client_get_property (GObject *object, guint propid, } static void -gst_rtsp_client_set_property (GObject *object, guint propid, - const GValue *value, GParamSpec *pspec) +gst_rtsp_client_set_property (GObject * object, guint propid, + const GValue * value, GParamSpec * pspec) { GstRTSPClient *client = GST_RTSP_CLIENT (object); @@ -151,9 +155,11 @@ gst_rtsp_client_new (void) } static void -send_response (GstRTSPClient *client, GstRTSPSession *session, GstRTSPMessage *response) +send_response (GstRTSPClient * client, GstRTSPSession * session, + GstRTSPMessage * response) { - gst_rtsp_message_add_header (response, GST_RTSP_HDR_SERVER, "GStreamer RTSP server"); + gst_rtsp_message_add_header (response, GST_RTSP_HDR_SERVER, + "GStreamer RTSP server"); /* remove any previous header */ gst_rtsp_message_remove_header (response, GST_RTSP_HDR_SESSION, -1); @@ -163,13 +169,14 @@ send_response (GstRTSPClient *client, GstRTSPSession *session, GstRTSPMessage *r gchar *str; if (session->timeout != 60) - str = g_strdup_printf ("%s; timeout=%d", session->sessionid, session->timeout); + str = + g_strdup_printf ("%s; timeout=%d", session->sessionid, + session->timeout); else str = g_strdup (session->sessionid); gst_rtsp_message_take_header (response, GST_RTSP_HDR_SESSION, str); } - #ifdef DEBUG gst_rtsp_message_dump (response); #endif @@ -179,19 +186,19 @@ send_response (GstRTSPClient *client, GstRTSPSession *session, GstRTSPMessage *r } static void -send_generic_response (GstRTSPClient *client, GstRTSPStatusCode code, - GstRTSPMessage *request) +send_generic_response (GstRTSPClient * client, GstRTSPStatusCode code, + GstRTSPMessage * request) { GstRTSPMessage response = { 0 }; - gst_rtsp_message_init_response (&response, code, - gst_rtsp_status_as_text (code), request); + gst_rtsp_message_init_response (&response, code, + gst_rtsp_status_as_text (code), request); send_response (client, NULL, &response); } static gboolean -compare_uri (const GstRTSPUrl *uri1, const GstRTSPUrl *uri2) +compare_uri (const GstRTSPUrl * uri1, const GstRTSPUrl * uri2) { if (uri1 == NULL || uri2 == NULL) return FALSE; @@ -206,7 +213,7 @@ compare_uri (const GstRTSPUrl *uri1, const GstRTSPUrl *uri2) * but is cached for when the same client (without breaking the connection) is * doing a setup for the exact same url. */ static GstRTSPMedia * -find_media (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPMessage *request) +find_media (GstRTSPClient * client, GstRTSPUrl * uri, GstRTSPMessage * request) { GstRTSPMediaFactory *factory; GstRTSPMedia *media; @@ -225,7 +232,8 @@ find_media (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPMessage *request) goto no_mapping; /* find the factory for the uri first */ - if (!(factory = gst_rtsp_media_mapping_find_factory (client->media_mapping, uri))) + if (!(factory = + gst_rtsp_media_mapping_find_factory (client->media_mapping, uri))) goto no_factory; /* prepare the media and add it to the pipeline */ @@ -239,11 +247,10 @@ find_media (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPMessage *request) /* now keep track of the uri and the media */ client->uri = gst_rtsp_url_copy (uri); client->media = media; - } - else { + } else { /* we have seen this uri before, used cached media */ media = client->media; - g_message ("reusing cached media %p", media); + g_message ("reusing cached media %p", media); } if (media) @@ -278,7 +285,7 @@ no_prepare: } static gboolean -do_send_data (GstBuffer *buffer, guint8 channel, GstRTSPClient *client) +do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client) { GstRTSPMessage message = { 0 }; guint8 *data; @@ -299,38 +306,36 @@ do_send_data (GstBuffer *buffer, guint8 channel, GstRTSPClient *client) } static void -link_stream (GstRTSPClient *client, GstRTSPSessionStream *stream) +link_stream (GstRTSPClient * client, GstRTSPSessionStream * stream) { gst_rtsp_session_stream_set_callbacks (stream, (GstRTSPSendFunc) do_send_data, - (GstRTSPSendFunc) do_send_data, client, NULL); + (GstRTSPSendFunc) do_send_data, client, NULL); client->streams = g_list_prepend (client->streams, stream); } static void -unlink_stream (GstRTSPClient *client, GstRTSPSessionStream *stream) +unlink_stream (GstRTSPClient * client, GstRTSPSessionStream * stream) { - gst_rtsp_session_stream_set_callbacks (stream, NULL, - NULL, NULL, NULL); + gst_rtsp_session_stream_set_callbacks (stream, NULL, NULL, NULL, NULL); client->streams = g_list_remove (client->streams, stream); } static void -unlink_streams (GstRTSPClient *client) +unlink_streams (GstRTSPClient * client) { GList *walk; for (walk = client->streams; walk; walk = g_list_next (walk)) { GstRTSPSessionStream *stream = (GstRTSPSessionStream *) walk->data; - gst_rtsp_session_stream_set_callbacks (stream, NULL, - NULL, NULL, NULL); + gst_rtsp_session_stream_set_callbacks (stream, NULL, NULL, NULL, NULL); } g_list_free (client->streams); client->streams = NULL; } static void -unlink_session_streams (GstRTSPClient *client, GstRTSPSessionMedia *media) +unlink_session_streams (GstRTSPClient * client, GstRTSPSessionMedia * media) { guint n_streams, i; @@ -353,7 +358,8 @@ unlink_session_streams (GstRTSPClient *client, GstRTSPSessionMedia *media) } static gboolean -handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request) +handle_teardown_request (GstRTSPClient * client, GstRTSPUrl * uri, + GstRTSPSession * session, GstRTSPMessage * request) { GstRTSPSessionMedia *media; GstRTSPMessage response = { 0 }; @@ -371,7 +377,8 @@ handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession unlink_session_streams (client, media); /* remove the session from the watched sessions */ - g_object_weak_unref (G_OBJECT (session), (GWeakNotify) client_session_finalized, client); + g_object_weak_unref (G_OBJECT (session), + (GWeakNotify) client_session_finalized, client); client->sessions = g_list_remove (client->sessions, session); gst_rtsp_session_media_set_state (media, GST_STATE_NULL); @@ -384,7 +391,8 @@ handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession } /* construct the response now */ code = GST_RTSP_STS_OK; - gst_rtsp_message_init_response (&response, code, gst_rtsp_status_as_text (code), request); + gst_rtsp_message_init_response (&response, code, + gst_rtsp_status_as_text (code), request); send_response (client, session, &response); @@ -404,7 +412,8 @@ not_found: } static gboolean -handle_pause_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request) +handle_pause_request (GstRTSPClient * client, GstRTSPUrl * uri, + GstRTSPSession * session, GstRTSPMessage * request) { GstRTSPSessionMedia *media; GstRTSPMessage response = { 0 }; @@ -431,7 +440,8 @@ handle_pause_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se /* construct the response now */ code = GST_RTSP_STS_OK; - gst_rtsp_message_init_response (&response, code, gst_rtsp_status_as_text (code), request); + gst_rtsp_message_init_response (&response, code, + gst_rtsp_status_as_text (code), request); send_response (client, session, &response); @@ -453,13 +463,15 @@ not_found: } invalid_state: { - send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE, request); + send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE, + request); return FALSE; } } static gboolean -handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request) +handle_play_request (GstRTSPClient * client, GstRTSPUrl * uri, + GstRTSPSession * session, GstRTSPMessage * request) { GstRTSPSessionMedia *media; GstRTSPMessage response = { 0 }; @@ -508,8 +520,10 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses /* get the stream as configured in the session */ sstream = gst_rtsp_session_media_get_stream (media, i); /* get the transport, if there is no transport configured, skip this stream */ - if (!(tr = sstream->trans.transport)) + if (!(tr = sstream->trans.transport)) { + g_message ("stream %d is not configured", i); continue; + } if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) { /* for TCP, link the stream to the TCP connection of the client */ @@ -521,7 +535,7 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses payobjclass = G_OBJECT_GET_CLASS (stream->payloader); if (g_object_class_find_property (payobjclass, "seqnum") && - g_object_class_find_property (payobjclass, "timestamp")) { + g_object_class_find_property (payobjclass, "timestamp")) { GObject *payobj; payobj = G_OBJECT (stream->payloader); @@ -533,26 +547,26 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses g_string_append (rtpinfo, ", "); uristr = gst_rtsp_url_get_request_uri (uri); - g_string_append_printf (rtpinfo, "url=%s/stream=%d;seq=%u;rtptime=%u", uristr, i, seqnum, timestamp); + g_string_append_printf (rtpinfo, "url=%s/stream=%d;seq=%u;rtptime=%u", + uristr, i, seqnum, timestamp); g_free (uristr); infocount++; - } - else { + } else { g_warning ("RTP-Info cannot be determined for stream %d", i); } } /* construct the response now */ code = GST_RTSP_STS_OK; - gst_rtsp_message_init_response (&response, code, gst_rtsp_status_as_text (code), request); + gst_rtsp_message_init_response (&response, code, + gst_rtsp_status_as_text (code), request); /* add the RTP-Info header */ if (infocount > 0) { str = g_string_free (rtpinfo, FALSE); gst_rtsp_message_take_header (&response, GST_RTSP_HDR_RTP_INFO, str); - } - else { + } else { g_string_free (rtpinfo, TRUE); } @@ -582,13 +596,22 @@ not_found: } invalid_state: { - send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE, request); + send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE, + request); return FALSE; } } +static void +do_keepalive (GstRTSPSession * session) +{ + g_message ("keep session %p alive", session); + gst_rtsp_session_touch (session); +} + static gboolean -handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request) +handle_setup_request (GstRTSPClient * client, GstRTSPUrl * uri, + GstRTSPSession * session, GstRTSPMessage * request) { GstRTSPResult res; gchar *transport; @@ -623,18 +646,20 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se goto bad_request; /* parse the transport */ - res = gst_rtsp_message_get_header (request, GST_RTSP_HDR_TRANSPORT, &transport, 0); + res = + gst_rtsp_message_get_header (request, GST_RTSP_HDR_TRANSPORT, &transport, + 0); if (res != GST_RTSP_OK) goto no_transport; transports = g_strsplit (transport, ",", 0); - gst_rtsp_transport_new (&ct); + gst_rtsp_transport_new (&ct); /* loop through the transports, try to parse */ have_transport = FALSE; for (i = 0; transports[i]; i++) { - gst_rtsp_transport_init (ct); + gst_rtsp_transport_init (ct); res = gst_rtsp_transport_parse (transports[i], ct); if (res == GST_RTSP_OK) { have_transport = TRUE; @@ -644,7 +669,7 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se g_strfreev (transports); /* we have not found anything usable, error out */ - if (!have_transport) + if (!have_transport) goto unsupported_transports; /* we have a valid transport, check if we can handle it */ @@ -654,8 +679,7 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se goto unsupported_transports; supported = GST_RTSP_LOWER_TRANS_UDP | - GST_RTSP_LOWER_TRANS_UDP_MCAST | - GST_RTSP_LOWER_TRANS_TCP; + GST_RTSP_LOWER_TRANS_UDP_MCAST | GST_RTSP_LOWER_TRANS_TCP; if (!(ct->lower_transport & supported)) goto unsupported_transports; @@ -674,8 +698,7 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se media = gst_rtsp_session_get_media (session, uri); need_session = FALSE; - } - else { + } else { /* create a session if this fails we probably reached our session limit or * something. */ if (!(session = gst_rtsp_session_pool_create (client->session_pool))) @@ -708,13 +731,18 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se st = gst_rtsp_session_stream_set_transport (stream, ct); + /* configure keepalive for this transport */ + gst_rtsp_session_stream_set_keepalive (stream, + (GstRTSPKeepAliveFunc) do_keepalive, session, NULL); + /* serialize the server transport */ trans_str = gst_rtsp_transport_as_text (st); gst_rtsp_transport_free (st); /* construct the response now */ code = GST_RTSP_STS_OK; - gst_rtsp_message_init_response (&response, code, gst_rtsp_status_as_text (code), request); + gst_rtsp_message_init_response (&response, code, + gst_rtsp_status_as_text (code), request); gst_rtsp_message_add_header (&response, GST_RTSP_HDR_TRANSPORT, trans_str); g_free (trans_str); @@ -763,7 +791,7 @@ no_transport: unsupported_transports: { send_generic_response (client, GST_RTSP_STS_UNSUPPORTED_TRANSPORT, request); - gst_rtsp_transport_free (ct); + gst_rtsp_transport_free (ct); return FALSE; } no_pool: @@ -780,7 +808,8 @@ service_unavailable: /* for the describe we must generate an SDP */ static gboolean -handle_describe_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request) +handle_describe_request (GstRTSPClient * client, GstRTSPUrl * uri, + GstRTSPSession * session, GstRTSPMessage * request) { GstRTSPMessage response = { 0 }; GstRTSPResult res; @@ -791,10 +820,11 @@ handle_describe_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession /* check what kind of format is accepted, we don't really do anything with it * and always return SDP for now. */ - for (i = 0; i++; ) { + for (i = 0; i++;) { gchar *accept; - res = gst_rtsp_message_get_header (request, GST_RTSP_HDR_ACCEPT, &accept, i); + res = + gst_rtsp_message_get_header (request, GST_RTSP_HDR_ACCEPT, &accept, i); if (res == GST_RTSP_ENOTIMPL) break; @@ -812,10 +842,11 @@ handle_describe_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession g_object_unref (media); - gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK, - gst_rtsp_status_as_text (GST_RTSP_STS_OK), request); + gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK, + gst_rtsp_status_as_text (GST_RTSP_STS_OK), request); - gst_rtsp_message_add_header (&response, GST_RTSP_HDR_CONTENT_TYPE, "application/sdp"); + gst_rtsp_message_add_header (&response, GST_RTSP_HDR_CONTENT_TYPE, + "application/sdp"); /* content base for some clients that might screw up creating the setup uri */ str = g_strdup_printf ("rtsp://%s:%u%s/", uri->host, uri->port, uri->abspath); @@ -824,7 +855,7 @@ handle_describe_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession /* add SDP to the response body */ str = gst_sdp_message_as_text (sdp); - gst_rtsp_message_take_body (&response, (guint8 *)str, strlen (str)); + gst_rtsp_message_take_body (&response, (guint8 *) str, strlen (str)); gst_sdp_message_free (sdp); send_response (client, session, &response); @@ -846,25 +877,24 @@ no_sdp: } static void -handle_options_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request) +handle_options_request (GstRTSPClient * client, GstRTSPUrl * uri, + GstRTSPSession * session, GstRTSPMessage * request) { GstRTSPMessage response = { 0 }; GstRTSPMethod options; gchar *str; options = GST_RTSP_DESCRIBE | - GST_RTSP_OPTIONS | - GST_RTSP_PAUSE | - GST_RTSP_PLAY | - GST_RTSP_SETUP | - GST_RTSP_GET_PARAMETER | - GST_RTSP_SET_PARAMETER | - GST_RTSP_TEARDOWN; + GST_RTSP_OPTIONS | + GST_RTSP_PAUSE | + GST_RTSP_PLAY | + GST_RTSP_SETUP | + GST_RTSP_GET_PARAMETER | GST_RTSP_SET_PARAMETER | GST_RTSP_TEARDOWN; str = gst_rtsp_options_as_text (options); - gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK, - gst_rtsp_status_as_text (GST_RTSP_STS_OK), request); + gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK, + gst_rtsp_status_as_text (GST_RTSP_STS_OK), request); gst_rtsp_message_add_header (&response, GST_RTSP_HDR_PUBLIC, str); g_free (str); @@ -874,7 +904,7 @@ handle_options_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession * /* remove duplicate and trailing '/' */ static void -santize_uri (GstRTSPUrl *uri) +santize_uri (GstRTSPUrl * uri) { gint i, len; gchar *s, *d; @@ -894,22 +924,22 @@ santize_uri (GstRTSPUrl *uri) } len = d - uri->abspath; /* don't remove the first slash if that's the only thing left */ - if (len > 1 && *(d-1) == '/') + if (len > 1 && *(d - 1) == '/') d--; *d = '\0'; } static void -client_session_finalized (GstRTSPClient *client, GstRTSPSession *session) +client_session_finalized (GstRTSPClient * client, GstRTSPSession * session) { if (!(client->sessions = g_list_remove (client->sessions, session))) { g_message ("all sessions finalized, close the connection"); - g_source_destroy ((GSource*)client->watch); + g_source_destroy ((GSource *) client->watch); } } static void -client_watch_session (GstRTSPClient *client, GstRTSPSession *session) +client_watch_session (GstRTSPClient * client, GstRTSPSession * session) { GList *walk; @@ -923,12 +953,13 @@ client_watch_session (GstRTSPClient *client, GstRTSPSession *session) g_message ("watching session %p", session); - g_object_weak_ref (G_OBJECT (session), (GWeakNotify) client_session_finalized, client); + g_object_weak_ref (G_OBJECT (session), (GWeakNotify) client_session_finalized, + client); client->sessions = g_list_prepend (client->sessions, session); } static void -handle_request (GstRTSPClient *client, GstRTSPMessage *request) +handle_request (GstRTSPClient * client, GstRTSPMessage * request) { GstRTSPMethod method; const gchar *uristr; @@ -948,7 +979,8 @@ handle_request (GstRTSPClient *client, GstRTSPMessage *request) if (version != GST_RTSP_VERSION_1_0) { /* we can only handle 1.0 requests */ - send_generic_response (client, GST_RTSP_STS_RTSP_VERSION_NOT_SUPPORTED, request); + send_generic_response (client, GST_RTSP_STS_RTSP_VERSION_NOT_SUPPORTED, + request); return; } @@ -975,8 +1007,7 @@ handle_request (GstRTSPClient *client, GstRTSPMessage *request) * disappears because it times out, we will be notified. If all sessions are * gone, we will close the connection */ client_watch_session (client, session); - } - else + } else session = NULL; /* now see what is asked and dispatch to a dedicated handler */ @@ -1033,7 +1064,7 @@ session_not_found: } static void -handle_data (GstRTSPClient *client, GstRTSPMessage *message) +handle_data (GstRTSPClient * client, GstRTSPMessage * message) { GstRTSPResult res; guint8 channel; @@ -1043,7 +1074,7 @@ handle_data (GstRTSPClient *client, GstRTSPMessage *message) GstBuffer *buffer; gboolean handled; - /* find the stream for this message */ + /* find the stream for this message */ res = gst_rtsp_message_parse_data (message, &channel); if (res != GST_RTSP_OK) return; @@ -1073,12 +1104,12 @@ handle_data (GstRTSPClient *client, GstRTSPMessage *message) if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) { /* dispatch to the stream based on the channel number */ if (tr->interleaved.min == channel) { - gst_rtsp_media_stream_rtp (mstream, buffer); - handled = TRUE; + gst_rtsp_media_stream_rtp (mstream, buffer); + handled = TRUE; break; } else if (tr->interleaved.max == channel) { - gst_rtsp_media_stream_rtcp (mstream, buffer); - handled = TRUE; + gst_rtsp_media_stream_rtcp (mstream, buffer); + handled = TRUE; break; } } @@ -1097,7 +1128,8 @@ handle_data (GstRTSPClient *client, GstRTSPMessage *message) * that created the client but can be overridden later. */ void -gst_rtsp_client_set_session_pool (GstRTSPClient *client, GstRTSPSessionPool *pool) +gst_rtsp_client_set_session_pool (GstRTSPClient * client, + GstRTSPSessionPool * pool) { GstRTSPSessionPool *old; @@ -1120,7 +1152,7 @@ gst_rtsp_client_set_session_pool (GstRTSPClient *client, GstRTSPSessionPool *poo * Returns: a #GstRTSPSessionPool, unref after usage. */ GstRTSPSessionPool * -gst_rtsp_client_get_session_pool (GstRTSPClient *client) +gst_rtsp_client_get_session_pool (GstRTSPClient * client) { GstRTSPSessionPool *result; @@ -1140,7 +1172,8 @@ gst_rtsp_client_get_session_pool (GstRTSPClient *client) * created the client but can be overriden later. */ void -gst_rtsp_client_set_media_mapping (GstRTSPClient *client, GstRTSPMediaMapping *mapping) +gst_rtsp_client_set_media_mapping (GstRTSPClient * client, + GstRTSPMediaMapping * mapping) { GstRTSPMediaMapping *old; @@ -1164,7 +1197,7 @@ gst_rtsp_client_set_media_mapping (GstRTSPClient *client, GstRTSPMediaMapping *m * Returns: a #GstRTSPMediaMapping, unref after usage. */ GstRTSPMediaMapping * -gst_rtsp_client_get_media_mapping (GstRTSPClient *client) +gst_rtsp_client_get_media_mapping (GstRTSPClient * client) { GstRTSPMediaMapping *result; @@ -1175,7 +1208,8 @@ gst_rtsp_client_get_media_mapping (GstRTSPClient *client) } static GstRTSPResult -message_received (GstRTSPWatch *watch, GstRTSPMessage *message, gpointer user_data) +message_received (GstRTSPWatch * watch, GstRTSPMessage * message, + gpointer user_data) { GstRTSPClient *client = GST_RTSP_CLIENT (user_data); @@ -1195,7 +1229,7 @@ message_received (GstRTSPWatch *watch, GstRTSPMessage *message, gpointer user_da } static GstRTSPResult -message_sent (GstRTSPWatch *watch, guint cseq, gpointer user_data) +message_sent (GstRTSPWatch * watch, guint cseq, gpointer user_data) { GstRTSPClient *client = GST_RTSP_CLIENT (user_data); @@ -1205,7 +1239,7 @@ message_sent (GstRTSPWatch *watch, guint cseq, gpointer user_data) } static GstRTSPResult -closed (GstRTSPWatch *watch, gpointer user_data) +closed (GstRTSPWatch * watch, gpointer user_data) { GstRTSPClient *client = GST_RTSP_CLIENT (user_data); const gchar *tunnelid; @@ -1225,7 +1259,7 @@ closed (GstRTSPWatch *watch, gpointer user_data) } static GstRTSPResult -error (GstRTSPWatch *watch, GstRTSPResult result, gpointer user_data) +error (GstRTSPWatch * watch, GstRTSPResult result, gpointer user_data) { GstRTSPClient *client = GST_RTSP_CLIENT (user_data); gchar *str; @@ -1238,7 +1272,7 @@ error (GstRTSPWatch *watch, GstRTSPResult result, gpointer user_data) } static GstRTSPStatusCode -tunnel_start (GstRTSPWatch *watch, gpointer user_data) +tunnel_start (GstRTSPWatch * watch, gpointer user_data) { GstRTSPClient *client; const gchar *tunnelid; @@ -1272,7 +1306,7 @@ tunnel_existed: } static GstRTSPResult -tunnel_complete (GstRTSPWatch *watch, gpointer user_data) +tunnel_complete (GstRTSPWatch * watch, gpointer user_data) { const gchar *tunnelid; GstRTSPClient *client = GST_RTSP_CLIENT (user_data); @@ -1336,7 +1370,7 @@ static GstRTSPWatchFuncs watch_funcs = { * Returns: %TRUE if the client could be accepted. */ gboolean -gst_rtsp_client_accept (GstRTSPClient *client, GIOChannel *channel) +gst_rtsp_client_accept (GstRTSPClient * client, GIOChannel * channel) { int sock; GstRTSPConnection *conn; @@ -1351,14 +1385,13 @@ gst_rtsp_client_accept (GstRTSPClient *client, GIOChannel *channel) GST_RTSP_CHECK (gst_rtsp_connection_accept (sock, &conn), accept_failed); url = gst_rtsp_connection_get_url (conn); - g_message ("added new client %p ip %s:%d", client, - url->host, url->port); + g_message ("added new client %p ip %s:%d", client, url->host, url->port); client->connection = conn; /* create watch for the connection and attach */ client->watch = gst_rtsp_watch_new (client->connection, &watch_funcs, - g_object_ref (client), g_object_unref); + g_object_ref (client), g_object_unref); /* find the context to add the watch */ if ((source = g_main_current_source ())) @@ -1378,8 +1411,7 @@ accept_failed: { gchar *str = gst_rtsp_strresult (res); - g_error ("Could not accept client on server socket %d: %s", - sock, str); + g_error ("Could not accept client on server socket %d: %s", sock, str); g_free (str); return FALSE; } diff --git a/gst/rtsp-server/rtsp-media.c b/gst/rtsp-server/rtsp-media.c index d07b380..fe322bc 100644 --- a/gst/rtsp-server/rtsp-media.c +++ b/gst/rtsp-server/rtsp-media.c @@ -17,6 +17,8 @@ * Boston, MA 02111-1307, USA. */ +#include + #include #include @@ -39,6 +41,8 @@ enum SIGNAL_LAST }; +static GQuark ssrc_stream_map_key; + static void gst_rtsp_media_get_property (GObject *object, guint propid, GValue *value, GParamSpec *pspec); static void gst_rtsp_media_set_property (GObject *object, guint propid, @@ -87,6 +91,8 @@ gst_rtsp_media_class_init (GstRTSPMediaClass * klass) g_critical ("could not start bus thread: %s", error->message); } klass->handle_message = default_handle_message; + + ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream"); } static void @@ -106,6 +112,8 @@ gst_rtsp_media_stream_free (GstRTSPMediaStream *stream) if (stream->caps) gst_caps_unref (stream->caps); + g_list_free (stream->transports); + g_free (stream); } @@ -700,48 +708,112 @@ dump_structure (const GstStructure *s) g_free (sstr); } +static GstRTSPMediaTrans * +find_transport (GstRTSPMediaStream *stream, const gchar *rtcp_from) +{ + GList *walk; + GstRTSPMediaTrans *result = NULL; + const gchar *dest; + guint port; + + if (rtcp_from == NULL) + return NULL; + + dest = g_strrstr (rtcp_from, ":"); + if (dest == NULL) + return NULL; + + port = atoi (dest + 1); + dest = g_strndup (rtcp_from, dest - rtcp_from); + + g_message ("finding %s:%d", dest, port); + + for (walk = stream->transports; walk; walk = g_list_next (walk)) { + GstRTSPMediaTrans *trans = walk->data; + gint min, max; + + min = trans->transport->client_port.min; + max = trans->transport->client_port.max; + + if ((strcmp (trans->transport->destination, dest) == 0) && (min == port || max == port)) { + result = trans; + break; + } + } + return result; +} + static void -on_new_ssrc (GObject *session, GObject *source, GstRTSPMedia *media) +on_new_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream) { - g_message ("%p: new source %p", media, source); + GstStructure *stats; + GstRTSPMediaTrans *trans; + + g_message ("%p: new source %p", stream, source); + + /* see if we have a stream to match with the origin of the RTCP packet */ + trans = g_object_get_qdata (source, ssrc_stream_map_key); + if (trans == NULL) { + g_object_get (source, "stats", &stats, NULL); + if (stats) { + const gchar *rtcp_from; + + rtcp_from = gst_structure_get_string (stats, "rtcp-from"); + if ((trans = find_transport (stream, rtcp_from))) { + g_message ("%p: found transport %p for source %p", stream, trans, source); + g_object_set_qdata (source, ssrc_stream_map_key, trans); + } + } + } else { + g_message ("%p: source %p for transport %p", stream, source, trans); + } } static void -on_ssrc_sdes (GObject *session, GObject *source, GstRTSPMedia *media) +on_ssrc_sdes (GObject *session, GObject *source, GstRTSPMediaStream *stream) { GstStructure *sdes; - g_message ("%p: new SDES %p", media, source); + g_message ("%p: new SDES %p", stream, source); g_object_get (source, "sdes", &sdes, NULL); dump_structure (sdes); } static void -on_ssrc_active (GObject *session, GObject *source, GstRTSPMedia *media) +on_ssrc_active (GObject *session, GObject *source, GstRTSPMediaStream *stream) { GstStructure *stats; + GstRTSPMediaTrans *trans; + + trans = g_object_get_qdata (source, ssrc_stream_map_key); + + g_message ("%p: source %p in transport %p is active", stream, trans, source); + + if (trans && trans->keep_alive) { + trans->keep_alive (trans->ka_user_data); + } - g_message ("%p: source %p is active", media, source); g_object_get (source, "stats", &stats, NULL); dump_structure (stats); + gst_structure_free (stats); } static void -on_bye_ssrc (GObject *session, GObject *source, GstRTSPMedia *media) +on_bye_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream) { - g_message ("%p: source %p bye", media, source); + g_message ("%p: source %p bye", stream, source); } static void -on_bye_timeout (GObject *session, GObject *source, GstRTSPMedia *media) +on_bye_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream) { - g_message ("%p: source %p bye timeout", media, source); + g_message ("%p: source %p bye timeout", stream, source); } static void -on_timeout (GObject *session, GObject *source, GstRTSPMedia *media) +on_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream) { - g_message ("%p: source %p timeout", media, source); + g_message ("%p: source %p timeout", stream, source); } static GstFlowReturn @@ -836,17 +908,17 @@ setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media) &stream->session); g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc, - media); + stream); g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes, - media); + stream); g_signal_connect (stream->session, "on-ssrc-active", (GCallback) on_ssrc_active, - media); + stream); g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc, - media); + stream); g_signal_connect (stream->session, "on-bye-timeout", (GCallback) on_bye_timeout, - media); + stream); g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout, - media); + stream); /* link the RTP pad to the session manager */ ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink); @@ -1361,12 +1433,14 @@ gst_rtsp_media_set_state (GstRTSPMedia *media, GstState state, GArray *transport g_message ("adding %s:%d-%d", dest, min, max); g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL); g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL); + stream->transports = g_list_prepend (stream->transports, tr); tr->active = TRUE; media->active++; } else if (remove && tr->active) { g_message ("removing %s:%d-%d", dest, min, max); g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL); g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL); + stream->transports = g_list_remove (stream->transports, tr); tr->active = FALSE; media->active--; } diff --git a/gst/rtsp-server/rtsp-media.h b/gst/rtsp-server/rtsp-media.h index d41075f..eb513e3 100644 --- a/gst/rtsp-server/rtsp-media.h +++ b/gst/rtsp-server/rtsp-media.h @@ -41,7 +41,8 @@ typedef struct _GstRTSPMedia GstRTSPMedia; typedef struct _GstRTSPMediaClass GstRTSPMediaClass; typedef struct _GstRTSPMediaTrans GstRTSPMediaTrans; -typedef gboolean (*GstRTSPSendFunc) (GstBuffer *buffer, guint8 channel, gpointer user_data); +typedef gboolean (*GstRTSPSendFunc) (GstBuffer *buffer, guint8 channel, gpointer user_data); +typedef void (*GstRTSPKeepAliveFunc) (gpointer user_data); /** * GstRTSPMediaTrans: @@ -50,20 +51,33 @@ typedef gboolean (*GstRTSPSendFunc) (GstBuffer *buffer, guint8 channel, gpointer * @send_rtcp: callback for sending RTCP messages * @user_data: user data passed in the callbacks * @notify: free function for the user_data. + * @keep_alive: keep alive callback + * @ka_user_data: data passed to @keep_alive + * @ka_notify: called when @ka_user_data is freed + * @active: if we are actively sending + * @timeout: if we timed out * @transport: a transport description + * @rtpsource: the receiver rtp source object * * A Transport description for stream @idx */ struct _GstRTSPMediaTrans { guint idx; - GstRTSPSendFunc send_rtp; - GstRTSPSendFunc send_rtcp; - gpointer user_data; - GDestroyNotify notify; - gboolean active; + GstRTSPSendFunc send_rtp; + GstRTSPSendFunc send_rtcp; + gpointer user_data; + GDestroyNotify notify; - GstRTSPTransport *transport; + GstRTSPKeepAliveFunc keep_alive; + gpointer ka_user_data; + GDestroyNotify ka_notify; + gboolean active; + gboolean timeout; + + GstRTSPTransport *transport; + + GObject *rtpsource; }; /** diff --git a/gst/rtsp-server/rtsp-session.c b/gst/rtsp-server/rtsp-session.c index 54a3700..3afd788 100644 --- a/gst/rtsp-server/rtsp-session.c +++ b/gst/rtsp-server/rtsp-session.c @@ -74,6 +74,10 @@ gst_rtsp_session_free_stream (GstRTSPSessionStream *stream) { g_message ("free session stream %p", stream); + /* remove callbacks now */ + gst_rtsp_session_stream_set_callbacks (stream, NULL, NULL, NULL, NULL); + gst_rtsp_session_stream_set_keepalive (stream, NULL, NULL, NULL); + if (stream->trans.transport) gst_rtsp_transport_free (stream->trans.transport); @@ -308,7 +312,7 @@ gst_rtsp_session_media_get_stream (GstRTSPSessionMedia *media, guint idx) result->trans.transport = NULL; result->media_stream = media_stream; - g_array_insert_val (media->streams, idx, result); + g_array_index (media->streams, GstRTSPSessionStream *, idx) = result; } return result; @@ -513,6 +517,27 @@ gst_rtsp_session_stream_set_callbacks (GstRTSPSessionStream *stream, } /** + * gst_rtsp_session_stream_set_keepalive: + * @stream: a #GstRTSPSessionStream + * @keep_alive: a callback called when the receiver is active + * @user_data: user data passed to callback + * @notify: called with the user_data when no longer needed. + * + * Install callbacks that will be called when RTCP packets are received from the + * receiver of @stream. + */ +void +gst_rtsp_session_stream_set_keepalive (GstRTSPSessionStream *stream, + GstRTSPKeepAliveFunc keep_alive, gpointer user_data, GDestroyNotify notify) +{ + stream->trans.keep_alive = keep_alive; + if (stream->trans.ka_notify) + stream->trans.ka_notify (stream->trans.ka_user_data); + stream->trans.ka_user_data = user_data; + stream->trans.ka_notify = notify; +} + +/** * gst_rtsp_session_media_set_state: * @media: a #GstRTSPSessionMedia * @state: the new state diff --git a/gst/rtsp-server/rtsp-session.h b/gst/rtsp-server/rtsp-session.h index 495bc75..0074b55 100644 --- a/gst/rtsp-server/rtsp-session.h +++ b/gst/rtsp-server/rtsp-session.h @@ -146,6 +146,10 @@ void gst_rtsp_session_stream_set_callbacks (GstRTSPSessionStre GstRTSPSendFunc send_rtcp, gpointer user_data, GDestroyNotify notify); +void gst_rtsp_session_stream_set_keepalive (GstRTSPSessionStream *stream, + GstRTSPKeepAliveFunc keep_alive, + gpointer user_data, + GDestroyNotify notify); G_END_DECLS