rtsp: use RTCP to keep the session alive
authorWim Taymans <wim.taymans@collabora.co.uk>
Tue, 26 May 2009 17:01:10 +0000 (19:01 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Tue, 26 May 2009 17:01:10 +0000 (19:01 +0200)
Use the RTCP rtcp-from stats field to find the associated session and use this
to keep the session alive.

gst/rtsp-server/rtsp-client.c
gst/rtsp-server/rtsp-media.c
gst/rtsp-server/rtsp-media.h
gst/rtsp-server/rtsp-session.c
gst/rtsp-server/rtsp-session.h

index b7ac1d0..aeaf52a 100644 (file)
@@ -35,13 +35,14 @@ enum
   PROP_LAST
 };
 
-static void gst_rtsp_client_get_property (GObject *object, guint propid,
-    GValue *value, GParamSpec *pspec);
-static void gst_rtsp_client_set_property (GObject *object, guint propid,
-    const GValue *value, GParamSpec *pspec);
+static void gst_rtsp_client_get_property (GObject * object, guint propid,
+    GValue * value, GParamSpec * pspec);
+static void gst_rtsp_client_set_property (GObject * object, guint propid,
+    const GValue * value, GParamSpec * pspec);
 static void gst_rtsp_client_finalize (GObject * obj);
 
-static void client_session_finalized (GstRTSPClient *client, GstRTSPSession *session);
+static void client_session_finalized (GstRTSPClient * client,
+    GstRTSPSession * session);
 
 G_DEFINE_TYPE (GstRTSPClient, gst_rtsp_client, G_TYPE_OBJECT);
 
@@ -59,14 +60,17 @@ gst_rtsp_client_class_init (GstRTSPClientClass * klass)
   g_object_class_install_property (gobject_class, PROP_SESSION_POOL,
       g_param_spec_object ("session-pool", "Session Pool",
           "The session pool to use for client session",
-          GST_TYPE_RTSP_SESSION_POOL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          GST_TYPE_RTSP_SESSION_POOL,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_MEDIA_MAPPING,
       g_param_spec_object ("media-mapping", "Media Mapping",
           "The media mapping to use for client session",
-          GST_TYPE_RTSP_MEDIA_MAPPING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          GST_TYPE_RTSP_MEDIA_MAPPING,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
-  tunnels = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_object_unref);
+  tunnels =
+      g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_object_unref);
   tunnels_lock = g_mutex_new ();
 }
 
@@ -100,8 +104,8 @@ gst_rtsp_client_finalize (GObject * obj)
 }
 
 static void
-gst_rtsp_client_get_property (GObject *object, guint propid,
-    GValue *value, GParamSpec *pspec)
+gst_rtsp_client_get_property (GObject * object, guint propid,
+    GValue * value, GParamSpec * pspec)
 {
   GstRTSPClient *client = GST_RTSP_CLIENT (object);
 
@@ -118,8 +122,8 @@ gst_rtsp_client_get_property (GObject *object, guint propid,
 }
 
 static void
-gst_rtsp_client_set_property (GObject *object, guint propid,
-    const GValue *value, GParamSpec *pspec)
+gst_rtsp_client_set_property (GObject * object, guint propid,
+    const GValue * value, GParamSpec * pspec)
 {
   GstRTSPClient *client = GST_RTSP_CLIENT (object);
 
@@ -151,9 +155,11 @@ gst_rtsp_client_new (void)
 }
 
 static void
-send_response (GstRTSPClient *client, GstRTSPSession *session, GstRTSPMessage *response)
+send_response (GstRTSPClient * client, GstRTSPSession * session,
+    GstRTSPMessage * response)
 {
-  gst_rtsp_message_add_header (response, GST_RTSP_HDR_SERVER, "GStreamer RTSP server");
+  gst_rtsp_message_add_header (response, GST_RTSP_HDR_SERVER,
+      "GStreamer RTSP server");
 
   /* remove any previous header */
   gst_rtsp_message_remove_header (response, GST_RTSP_HDR_SESSION, -1);
@@ -163,13 +169,14 @@ send_response (GstRTSPClient *client, GstRTSPSession *session, GstRTSPMessage *r
     gchar *str;
 
     if (session->timeout != 60)
-      str = g_strdup_printf ("%s; timeout=%d", session->sessionid, session->timeout);
+      str =
+          g_strdup_printf ("%s; timeout=%d", session->sessionid,
+          session->timeout);
     else
       str = g_strdup (session->sessionid);
 
     gst_rtsp_message_take_header (response, GST_RTSP_HDR_SESSION, str);
   }
-
 #ifdef DEBUG
   gst_rtsp_message_dump (response);
 #endif
@@ -179,19 +186,19 @@ send_response (GstRTSPClient *client, GstRTSPSession *session, GstRTSPMessage *r
 }
 
 static void
-send_generic_response (GstRTSPClient *client, GstRTSPStatusCode code, 
-    GstRTSPMessage *request)
+send_generic_response (GstRTSPClient * client, GstRTSPStatusCode code,
+    GstRTSPMessage * request)
 {
   GstRTSPMessage response = { 0 };
 
-  gst_rtsp_message_init_response (&response, code, 
-       gst_rtsp_status_as_text (code), request);
+  gst_rtsp_message_init_response (&response, code,
+      gst_rtsp_status_as_text (code), request);
 
   send_response (client, NULL, &response);
 }
 
 static gboolean
-compare_uri (const GstRTSPUrl *uri1, const GstRTSPUrl *uri2)
+compare_uri (const GstRTSPUrl * uri1, const GstRTSPUrl * uri2)
 {
   if (uri1 == NULL || uri2 == NULL)
     return FALSE;
@@ -206,7 +213,7 @@ compare_uri (const GstRTSPUrl *uri1, const GstRTSPUrl *uri2)
  * but is cached for when the same client (without breaking the connection) is
  * doing a setup for the exact same url. */
 static GstRTSPMedia *
-find_media (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPMessage *request)
+find_media (GstRTSPClient * client, GstRTSPUrl * uri, GstRTSPMessage * request)
 {
   GstRTSPMediaFactory *factory;
   GstRTSPMedia *media;
@@ -225,7 +232,8 @@ find_media (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPMessage *request)
       goto no_mapping;
 
     /* find the factory for the uri first */
-    if (!(factory = gst_rtsp_media_mapping_find_factory (client->media_mapping, uri)))
+    if (!(factory =
+            gst_rtsp_media_mapping_find_factory (client->media_mapping, uri)))
       goto no_factory;
 
     /* prepare the media and add it to the pipeline */
@@ -239,11 +247,10 @@ find_media (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPMessage *request)
     /* now keep track of the uri and the media */
     client->uri = gst_rtsp_url_copy (uri);
     client->media = media;
-  }
-  else {
+  } else {
     /* we have seen this uri before, used cached media */
     media = client->media;
-    g_message ("reusing cached media %p", media); 
+    g_message ("reusing cached media %p", media);
   }
 
   if (media)
@@ -278,7 +285,7 @@ no_prepare:
 }
 
 static gboolean
-do_send_data (GstBuffer *buffer, guint8 channel, GstRTSPClient *client)
+do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
 {
   GstRTSPMessage message = { 0 };
   guint8 *data;
@@ -299,38 +306,36 @@ do_send_data (GstBuffer *buffer, guint8 channel, GstRTSPClient *client)
 }
 
 static void
-link_stream (GstRTSPClient *client, GstRTSPSessionStream *stream)
+link_stream (GstRTSPClient * client, GstRTSPSessionStream * stream)
 {
   gst_rtsp_session_stream_set_callbacks (stream, (GstRTSPSendFunc) do_send_data,
-       (GstRTSPSendFunc) do_send_data, client, NULL);
+      (GstRTSPSendFunc) do_send_data, client, NULL);
   client->streams = g_list_prepend (client->streams, stream);
 }
 
 static void
-unlink_stream (GstRTSPClient *client, GstRTSPSessionStream *stream)
+unlink_stream (GstRTSPClient * client, GstRTSPSessionStream * stream)
 {
-  gst_rtsp_session_stream_set_callbacks (stream, NULL,
-       NULL, NULL, NULL);
+  gst_rtsp_session_stream_set_callbacks (stream, NULL, NULL, NULL, NULL);
   client->streams = g_list_remove (client->streams, stream);
 }
 
 static void
-unlink_streams (GstRTSPClient *client)
+unlink_streams (GstRTSPClient * client)
 {
   GList *walk;
 
   for (walk = client->streams; walk; walk = g_list_next (walk)) {
     GstRTSPSessionStream *stream = (GstRTSPSessionStream *) walk->data;
 
-    gst_rtsp_session_stream_set_callbacks (stream, NULL,
-         NULL, NULL, NULL);
+    gst_rtsp_session_stream_set_callbacks (stream, NULL, NULL, NULL, NULL);
   }
   g_list_free (client->streams);
   client->streams = NULL;
 }
 
 static void
-unlink_session_streams (GstRTSPClient *client, GstRTSPSessionMedia *media)
+unlink_session_streams (GstRTSPClient * client, GstRTSPSessionMedia * media)
 {
   guint n_streams, i;
 
@@ -353,7 +358,8 @@ unlink_session_streams (GstRTSPClient *client, GstRTSPSessionMedia *media)
 }
 
 static gboolean
-handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request)
+handle_teardown_request (GstRTSPClient * client, GstRTSPUrl * uri,
+    GstRTSPSession * session, GstRTSPMessage * request)
 {
   GstRTSPSessionMedia *media;
   GstRTSPMessage response = { 0 };
@@ -371,7 +377,8 @@ handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession
   unlink_session_streams (client, media);
 
   /* remove the session from the watched sessions */
-  g_object_weak_unref (G_OBJECT (session), (GWeakNotify) client_session_finalized, client);
+  g_object_weak_unref (G_OBJECT (session),
+      (GWeakNotify) client_session_finalized, client);
   client->sessions = g_list_remove (client->sessions, session);
 
   gst_rtsp_session_media_set_state (media, GST_STATE_NULL);
@@ -384,7 +391,8 @@ handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession
   }
   /* construct the response now */
   code = GST_RTSP_STS_OK;
-  gst_rtsp_message_init_response (&response, code, gst_rtsp_status_as_text (code), request);
+  gst_rtsp_message_init_response (&response, code,
+      gst_rtsp_status_as_text (code), request);
 
   send_response (client, session, &response);
 
@@ -404,7 +412,8 @@ not_found:
 }
 
 static gboolean
-handle_pause_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request)
+handle_pause_request (GstRTSPClient * client, GstRTSPUrl * uri,
+    GstRTSPSession * session, GstRTSPMessage * request)
 {
   GstRTSPSessionMedia *media;
   GstRTSPMessage response = { 0 };
@@ -431,7 +440,8 @@ handle_pause_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
 
   /* construct the response now */
   code = GST_RTSP_STS_OK;
-  gst_rtsp_message_init_response (&response, code, gst_rtsp_status_as_text (code), request);
+  gst_rtsp_message_init_response (&response, code,
+      gst_rtsp_status_as_text (code), request);
 
   send_response (client, session, &response);
 
@@ -453,13 +463,15 @@ not_found:
   }
 invalid_state:
   {
-    send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE, request);
+    send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE,
+        request);
     return FALSE;
   }
 }
 
 static gboolean
-handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request)
+handle_play_request (GstRTSPClient * client, GstRTSPUrl * uri,
+    GstRTSPSession * session, GstRTSPMessage * request)
 {
   GstRTSPSessionMedia *media;
   GstRTSPMessage response = { 0 };
@@ -508,8 +520,10 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses
     /* get the stream as configured in the session */
     sstream = gst_rtsp_session_media_get_stream (media, i);
     /* get the transport, if there is no transport configured, skip this stream */
-    if (!(tr = sstream->trans.transport))
+    if (!(tr = sstream->trans.transport)) {
+      g_message ("stream %d is not configured", i);
       continue;
+    }
 
     if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
       /* for TCP, link the stream to the TCP connection of the client */
@@ -521,7 +535,7 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses
     payobjclass = G_OBJECT_GET_CLASS (stream->payloader);
 
     if (g_object_class_find_property (payobjclass, "seqnum") &&
-       g_object_class_find_property (payobjclass, "timestamp")) {
+        g_object_class_find_property (payobjclass, "timestamp")) {
       GObject *payobj;
 
       payobj = G_OBJECT (stream->payloader);
@@ -533,26 +547,26 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses
         g_string_append (rtpinfo, ", ");
 
       uristr = gst_rtsp_url_get_request_uri (uri);
-      g_string_append_printf (rtpinfo, "url=%s/stream=%d;seq=%u;rtptime=%u", uristr, i, seqnum, timestamp);
+      g_string_append_printf (rtpinfo, "url=%s/stream=%d;seq=%u;rtptime=%u",
+          uristr, i, seqnum, timestamp);
       g_free (uristr);
 
       infocount++;
-    }
-    else {
+    } else {
       g_warning ("RTP-Info cannot be determined for stream %d", i);
     }
   }
 
   /* construct the response now */
   code = GST_RTSP_STS_OK;
-  gst_rtsp_message_init_response (&response, code, gst_rtsp_status_as_text (code), request);
+  gst_rtsp_message_init_response (&response, code,
+      gst_rtsp_status_as_text (code), request);
 
   /* add the RTP-Info header */
   if (infocount > 0) {
     str = g_string_free (rtpinfo, FALSE);
     gst_rtsp_message_take_header (&response, GST_RTSP_HDR_RTP_INFO, str);
-  }
-  else {
+  } else {
     g_string_free (rtpinfo, TRUE);
   }
 
@@ -582,13 +596,22 @@ not_found:
   }
 invalid_state:
   {
-    send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE, request);
+    send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE,
+        request);
     return FALSE;
   }
 }
 
+static void
+do_keepalive (GstRTSPSession * session)
+{
+  g_message ("keep session %p alive", session);
+  gst_rtsp_session_touch (session);
+}
+
 static gboolean
-handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request)
+handle_setup_request (GstRTSPClient * client, GstRTSPUrl * uri,
+    GstRTSPSession * session, GstRTSPMessage * request)
 {
   GstRTSPResult res;
   gchar *transport;
@@ -623,18 +646,20 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
     goto bad_request;
 
   /* parse the transport */
-  res = gst_rtsp_message_get_header (request, GST_RTSP_HDR_TRANSPORT, &transport, 0);
+  res =
+      gst_rtsp_message_get_header (request, GST_RTSP_HDR_TRANSPORT, &transport,
+      0);
   if (res != GST_RTSP_OK)
     goto no_transport;
 
   transports = g_strsplit (transport, ",", 0);
-  gst_rtsp_transport_new (&ct);  
+  gst_rtsp_transport_new (&ct);
 
   /* loop through the transports, try to parse */
   have_transport = FALSE;
   for (i = 0; transports[i]; i++) {
 
-    gst_rtsp_transport_init (ct);  
+    gst_rtsp_transport_init (ct);
     res = gst_rtsp_transport_parse (transports[i], ct);
     if (res == GST_RTSP_OK) {
       have_transport = TRUE;
@@ -644,7 +669,7 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
   g_strfreev (transports);
 
   /* we have not found anything usable, error out */
-  if (!have_transport) 
+  if (!have_transport)
     goto unsupported_transports;
 
   /* we have a valid transport, check if we can handle it */
@@ -654,8 +679,7 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
     goto unsupported_transports;
 
   supported = GST_RTSP_LOWER_TRANS_UDP |
-       GST_RTSP_LOWER_TRANS_UDP_MCAST |
-       GST_RTSP_LOWER_TRANS_TCP;
+      GST_RTSP_LOWER_TRANS_UDP_MCAST | GST_RTSP_LOWER_TRANS_TCP;
   if (!(ct->lower_transport & supported))
     goto unsupported_transports;
 
@@ -674,8 +698,7 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
     media = gst_rtsp_session_get_media (session, uri);
 
     need_session = FALSE;
-  }
-  else {
+  } else {
     /* create a session if this fails we probably reached our session limit or
      * something. */
     if (!(session = gst_rtsp_session_pool_create (client->session_pool)))
@@ -708,13 +731,18 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
 
   st = gst_rtsp_session_stream_set_transport (stream, ct);
 
+  /* configure keepalive for this transport */
+  gst_rtsp_session_stream_set_keepalive (stream,
+        (GstRTSPKeepAliveFunc) do_keepalive, session, NULL);
+
   /* serialize the server transport */
   trans_str = gst_rtsp_transport_as_text (st);
   gst_rtsp_transport_free (st);
 
   /* construct the response now */
   code = GST_RTSP_STS_OK;
-  gst_rtsp_message_init_response (&response, code, gst_rtsp_status_as_text (code), request);
+  gst_rtsp_message_init_response (&response, code,
+      gst_rtsp_status_as_text (code), request);
 
   gst_rtsp_message_add_header (&response, GST_RTSP_HDR_TRANSPORT, trans_str);
   g_free (trans_str);
@@ -763,7 +791,7 @@ no_transport:
 unsupported_transports:
   {
     send_generic_response (client, GST_RTSP_STS_UNSUPPORTED_TRANSPORT, request);
-    gst_rtsp_transport_free (ct);  
+    gst_rtsp_transport_free (ct);
     return FALSE;
   }
 no_pool:
@@ -780,7 +808,8 @@ service_unavailable:
 
 /* for the describe we must generate an SDP */
 static gboolean
-handle_describe_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request)
+handle_describe_request (GstRTSPClient * client, GstRTSPUrl * uri,
+    GstRTSPSession * session, GstRTSPMessage * request)
 {
   GstRTSPMessage response = { 0 };
   GstRTSPResult res;
@@ -791,10 +820,11 @@ handle_describe_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession
 
   /* check what kind of format is accepted, we don't really do anything with it
    * and always return SDP for now. */
-  for (i = 0; i++; ) {
+  for (i = 0; i++;) {
     gchar *accept;
 
-    res = gst_rtsp_message_get_header (request, GST_RTSP_HDR_ACCEPT, &accept, i);
+    res =
+        gst_rtsp_message_get_header (request, GST_RTSP_HDR_ACCEPT, &accept, i);
     if (res == GST_RTSP_ENOTIMPL)
       break;
 
@@ -812,10 +842,11 @@ handle_describe_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession
 
   g_object_unref (media);
 
-  gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK, 
-       gst_rtsp_status_as_text (GST_RTSP_STS_OK), request);
+  gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK,
+      gst_rtsp_status_as_text (GST_RTSP_STS_OK), request);
 
-  gst_rtsp_message_add_header (&response, GST_RTSP_HDR_CONTENT_TYPE, "application/sdp");
+  gst_rtsp_message_add_header (&response, GST_RTSP_HDR_CONTENT_TYPE,
+      "application/sdp");
 
   /* content base for some clients that might screw up creating the setup uri */
   str = g_strdup_printf ("rtsp://%s:%u%s/", uri->host, uri->port, uri->abspath);
@@ -824,7 +855,7 @@ handle_describe_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession
 
   /* add SDP to the response body */
   str = gst_sdp_message_as_text (sdp);
-  gst_rtsp_message_take_body (&response, (guint8 *)str, strlen (str));
+  gst_rtsp_message_take_body (&response, (guint8 *) str, strlen (str));
   gst_sdp_message_free (sdp);
 
   send_response (client, session, &response);
@@ -846,25 +877,24 @@ no_sdp:
 }
 
 static void
-handle_options_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request)
+handle_options_request (GstRTSPClient * client, GstRTSPUrl * uri,
+    GstRTSPSession * session, GstRTSPMessage * request)
 {
   GstRTSPMessage response = { 0 };
   GstRTSPMethod options;
   gchar *str;
 
   options = GST_RTSP_DESCRIBE |
-           GST_RTSP_OPTIONS |
-            GST_RTSP_PAUSE |
-            GST_RTSP_PLAY |
-            GST_RTSP_SETUP |
-           GST_RTSP_GET_PARAMETER |
-           GST_RTSP_SET_PARAMETER |
-            GST_RTSP_TEARDOWN;
+      GST_RTSP_OPTIONS |
+      GST_RTSP_PAUSE |
+      GST_RTSP_PLAY |
+      GST_RTSP_SETUP |
+      GST_RTSP_GET_PARAMETER | GST_RTSP_SET_PARAMETER | GST_RTSP_TEARDOWN;
 
   str = gst_rtsp_options_as_text (options);
 
-  gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK, 
-       gst_rtsp_status_as_text (GST_RTSP_STS_OK), request);
+  gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK,
+      gst_rtsp_status_as_text (GST_RTSP_STS_OK), request);
 
   gst_rtsp_message_add_header (&response, GST_RTSP_HDR_PUBLIC, str);
   g_free (str);
@@ -874,7 +904,7 @@ handle_options_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *
 
 /* remove duplicate and trailing '/' */
 static void
-santize_uri (GstRTSPUrl *uri)
+santize_uri (GstRTSPUrl * uri)
 {
   gint i, len;
   gchar *s, *d;
@@ -894,22 +924,22 @@ santize_uri (GstRTSPUrl *uri)
   }
   len = d - uri->abspath;
   /* don't remove the first slash if that's the only thing left */
-  if (len > 1 && *(d-1) == '/')
+  if (len > 1 && *(d - 1) == '/')
     d--;
   *d = '\0';
 }
 
 static void
-client_session_finalized (GstRTSPClient *client, GstRTSPSession *session)
+client_session_finalized (GstRTSPClient * client, GstRTSPSession * session)
 {
   if (!(client->sessions = g_list_remove (client->sessions, session))) {
     g_message ("all sessions finalized, close the connection");
-    g_source_destroy ((GSource*)client->watch);
+    g_source_destroy ((GSource *) client->watch);
   }
 }
 
 static void
-client_watch_session (GstRTSPClient *client, GstRTSPSession *session)
+client_watch_session (GstRTSPClient * client, GstRTSPSession * session)
 {
   GList *walk;
 
@@ -923,12 +953,13 @@ client_watch_session (GstRTSPClient *client, GstRTSPSession *session)
 
   g_message ("watching session %p", session);
 
-  g_object_weak_ref (G_OBJECT (session), (GWeakNotify) client_session_finalized, client);
+  g_object_weak_ref (G_OBJECT (session), (GWeakNotify) client_session_finalized,
+      client);
   client->sessions = g_list_prepend (client->sessions, session);
 }
 
 static void
-handle_request (GstRTSPClient *client, GstRTSPMessage *request)
+handle_request (GstRTSPClient * client, GstRTSPMessage * request)
 {
   GstRTSPMethod method;
   const gchar *uristr;
@@ -948,7 +979,8 @@ handle_request (GstRTSPClient *client, GstRTSPMessage *request)
 
   if (version != GST_RTSP_VERSION_1_0) {
     /* we can only handle 1.0 requests */
-    send_generic_response (client, GST_RTSP_STS_RTSP_VERSION_NOT_SUPPORTED, request);
+    send_generic_response (client, GST_RTSP_STS_RTSP_VERSION_NOT_SUPPORTED,
+        request);
     return;
   }
 
@@ -975,8 +1007,7 @@ handle_request (GstRTSPClient *client, GstRTSPMessage *request)
      * disappears because it times out, we will be notified. If all sessions are
      * gone, we will close the connection */
     client_watch_session (client, session);
-  }
-  else
+  } else
     session = NULL;
 
   /* now see what is asked and dispatch to a dedicated handler */
@@ -1033,7 +1064,7 @@ session_not_found:
 }
 
 static void
-handle_data (GstRTSPClient *client, GstRTSPMessage *message)
+handle_data (GstRTSPClient * client, GstRTSPMessage * message)
 {
   GstRTSPResult res;
   guint8 channel;
@@ -1043,7 +1074,7 @@ handle_data (GstRTSPClient *client, GstRTSPMessage *message)
   GstBuffer *buffer;
   gboolean handled;
 
-  /* find the stream for this message */ 
+  /* find the stream for this message */
   res = gst_rtsp_message_parse_data (message, &channel);
   if (res != GST_RTSP_OK)
     return;
@@ -1073,12 +1104,12 @@ handle_data (GstRTSPClient *client, GstRTSPMessage *message)
     if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
       /* dispatch to the stream based on the channel number */
       if (tr->interleaved.min == channel) {
-       gst_rtsp_media_stream_rtp (mstream, buffer);
-       handled = TRUE;
+        gst_rtsp_media_stream_rtp (mstream, buffer);
+        handled = TRUE;
         break;
       } else if (tr->interleaved.max == channel) {
-       gst_rtsp_media_stream_rtcp (mstream, buffer);
-       handled = TRUE;
+        gst_rtsp_media_stream_rtcp (mstream, buffer);
+        handled = TRUE;
         break;
       }
     }
@@ -1097,7 +1128,8 @@ handle_data (GstRTSPClient *client, GstRTSPMessage *message)
  * that created the client but can be overridden later.
  */
 void
-gst_rtsp_client_set_session_pool (GstRTSPClient *client, GstRTSPSessionPool *pool)
+gst_rtsp_client_set_session_pool (GstRTSPClient * client,
+    GstRTSPSessionPool * pool)
 {
   GstRTSPSessionPool *old;
 
@@ -1120,7 +1152,7 @@ gst_rtsp_client_set_session_pool (GstRTSPClient *client, GstRTSPSessionPool *poo
  * Returns: a #GstRTSPSessionPool, unref after usage.
  */
 GstRTSPSessionPool *
-gst_rtsp_client_get_session_pool (GstRTSPClient *client)
+gst_rtsp_client_get_session_pool (GstRTSPClient * client)
 {
   GstRTSPSessionPool *result;
 
@@ -1140,7 +1172,8 @@ gst_rtsp_client_get_session_pool (GstRTSPClient *client)
  * created the client but can be overriden later.
  */
 void
-gst_rtsp_client_set_media_mapping (GstRTSPClient *client, GstRTSPMediaMapping *mapping)
+gst_rtsp_client_set_media_mapping (GstRTSPClient * client,
+    GstRTSPMediaMapping * mapping)
 {
   GstRTSPMediaMapping *old;
 
@@ -1164,7 +1197,7 @@ gst_rtsp_client_set_media_mapping (GstRTSPClient *client, GstRTSPMediaMapping *m
  * Returns: a #GstRTSPMediaMapping, unref after usage.
  */
 GstRTSPMediaMapping *
-gst_rtsp_client_get_media_mapping (GstRTSPClient *client)
+gst_rtsp_client_get_media_mapping (GstRTSPClient * client)
 {
   GstRTSPMediaMapping *result;
 
@@ -1175,7 +1208,8 @@ gst_rtsp_client_get_media_mapping (GstRTSPClient *client)
 }
 
 static GstRTSPResult
-message_received (GstRTSPWatch *watch, GstRTSPMessage *message, gpointer user_data)
+message_received (GstRTSPWatch * watch, GstRTSPMessage * message,
+    gpointer user_data)
 {
   GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
 
@@ -1195,7 +1229,7 @@ message_received (GstRTSPWatch *watch, GstRTSPMessage *message, gpointer user_da
 }
 
 static GstRTSPResult
-message_sent (GstRTSPWatch *watch, guint cseq, gpointer user_data)
+message_sent (GstRTSPWatch * watch, guint cseq, gpointer user_data)
 {
   GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
 
@@ -1205,7 +1239,7 @@ message_sent (GstRTSPWatch *watch, guint cseq, gpointer user_data)
 }
 
 static GstRTSPResult
-closed (GstRTSPWatch *watch, gpointer user_data)
+closed (GstRTSPWatch * watch, gpointer user_data)
 {
   GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
   const gchar *tunnelid;
@@ -1225,7 +1259,7 @@ closed (GstRTSPWatch *watch, gpointer user_data)
 }
 
 static GstRTSPResult
-error (GstRTSPWatch *watch, GstRTSPResult result, gpointer user_data)
+error (GstRTSPWatch * watch, GstRTSPResult result, gpointer user_data)
 {
   GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
   gchar *str;
@@ -1238,7 +1272,7 @@ error (GstRTSPWatch *watch, GstRTSPResult result, gpointer user_data)
 }
 
 static GstRTSPStatusCode
-tunnel_start (GstRTSPWatch *watch, gpointer user_data)
+tunnel_start (GstRTSPWatch * watch, gpointer user_data)
 {
   GstRTSPClient *client;
   const gchar *tunnelid;
@@ -1272,7 +1306,7 @@ tunnel_existed:
 }
 
 static GstRTSPResult
-tunnel_complete (GstRTSPWatch *watch, gpointer user_data)
+tunnel_complete (GstRTSPWatch * watch, gpointer user_data)
 {
   const gchar *tunnelid;
   GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
@@ -1336,7 +1370,7 @@ static GstRTSPWatchFuncs watch_funcs = {
  * Returns: %TRUE if the client could be accepted.
  */
 gboolean
-gst_rtsp_client_accept (GstRTSPClient *client, GIOChannel *channel)
+gst_rtsp_client_accept (GstRTSPClient * client, GIOChannel * channel)
 {
   int sock;
   GstRTSPConnection *conn;
@@ -1351,14 +1385,13 @@ gst_rtsp_client_accept (GstRTSPClient *client, GIOChannel *channel)
   GST_RTSP_CHECK (gst_rtsp_connection_accept (sock, &conn), accept_failed);
 
   url = gst_rtsp_connection_get_url (conn);
-  g_message ("added new client %p ip %s:%d", client,
-               url->host, url->port);
+  g_message ("added new client %p ip %s:%d", client, url->host, url->port);
 
   client->connection = conn;
 
   /* create watch for the connection and attach */
   client->watch = gst_rtsp_watch_new (client->connection, &watch_funcs,
-                 g_object_ref (client), g_object_unref);
+      g_object_ref (client), g_object_unref);
 
   /* find the context to add the watch */
   if ((source = g_main_current_source ()))
@@ -1378,8 +1411,7 @@ accept_failed:
   {
     gchar *str = gst_rtsp_strresult (res);
 
-    g_error ("Could not accept client on server socket %d: %s",
-            sock, str);
+    g_error ("Could not accept client on server socket %d: %s", sock, str);
     g_free (str);
     return FALSE;
   }
index d07b380..fe322bc 100644 (file)
@@ -17,6 +17,8 @@
  * Boston, MA 02111-1307, USA.
  */
 
+#include <string.h>
+
 #include <gst/app/gstappsrc.h>
 #include <gst/app/gstappsink.h>
 
@@ -39,6 +41,8 @@ enum
   SIGNAL_LAST
 };
 
+static GQuark ssrc_stream_map_key;
+
 static void gst_rtsp_media_get_property (GObject *object, guint propid,
     GValue *value, GParamSpec *pspec);
 static void gst_rtsp_media_set_property (GObject *object, guint propid,
@@ -87,6 +91,8 @@ gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
     g_critical ("could not start bus thread: %s", error->message);
   }
   klass->handle_message = default_handle_message;
+
+  ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
 }
 
 static void
@@ -106,6 +112,8 @@ gst_rtsp_media_stream_free (GstRTSPMediaStream *stream)
   if (stream->caps)
     gst_caps_unref (stream->caps);
 
+  g_list_free (stream->transports);
+
   g_free (stream);
 }
 
@@ -700,48 +708,112 @@ dump_structure (const GstStructure *s)
   g_free (sstr);
 }
 
+static GstRTSPMediaTrans *
+find_transport (GstRTSPMediaStream *stream, const gchar *rtcp_from)
+{
+  GList *walk;
+  GstRTSPMediaTrans *result = NULL;
+  const gchar *dest;
+  guint port;
+
+  if (rtcp_from == NULL)
+    return NULL;
+
+  dest = g_strrstr (rtcp_from, ":");
+  if (dest == NULL)
+    return NULL;
+
+  port = atoi (dest + 1);
+  dest = g_strndup (rtcp_from, dest - rtcp_from);
+
+  g_message ("finding %s:%d", dest, port);
+
+  for (walk = stream->transports; walk; walk = g_list_next (walk)) {
+    GstRTSPMediaTrans *trans = walk->data;
+    gint min, max;
+
+    min = trans->transport->client_port.min;
+    max = trans->transport->client_port.max;
+
+    if ((strcmp (trans->transport->destination, dest) == 0) && (min == port || max == port)) {
+      result = trans;
+      break;
+    }
+  }
+  return result;
+}
+
 static void
-on_new_ssrc (GObject *session, GObject *source, GstRTSPMedia *media)
+on_new_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
 {
-  g_message ("%p: new source %p", media, source);
+  GstStructure *stats;
+  GstRTSPMediaTrans *trans;
+
+  g_message ("%p: new source %p", stream, source);
+
+  /* see if we have a stream to match with the origin of the RTCP packet */
+  trans = g_object_get_qdata (source, ssrc_stream_map_key);
+  if (trans == NULL) {
+    g_object_get (source, "stats", &stats, NULL);
+    if (stats) {
+      const gchar *rtcp_from;
+
+      rtcp_from = gst_structure_get_string (stats, "rtcp-from");
+      if ((trans = find_transport (stream, rtcp_from))) {
+        g_message ("%p: found transport %p for source  %p", stream, trans, source);
+        g_object_set_qdata (source, ssrc_stream_map_key, trans);
+      }
+    }
+  } else {
+    g_message ("%p: source %p for transport %p", stream, source, trans);
+  }
 }
 
 static void
-on_ssrc_sdes (GObject *session, GObject *source, GstRTSPMedia *media)
+on_ssrc_sdes (GObject *session, GObject *source, GstRTSPMediaStream *stream)
 {
   GstStructure *sdes;
 
-  g_message ("%p: new SDES %p", media, source);
+  g_message ("%p: new SDES %p", stream, source);
   g_object_get (source, "sdes", &sdes, NULL);
   dump_structure (sdes);
 }
 
 static void
-on_ssrc_active (GObject *session, GObject *source, GstRTSPMedia *media)
+on_ssrc_active (GObject *session, GObject *source, GstRTSPMediaStream *stream)
 {
   GstStructure *stats;
+  GstRTSPMediaTrans *trans;
+
+  trans = g_object_get_qdata (source, ssrc_stream_map_key);
+
+  g_message ("%p: source %p in transport %p is active", stream, trans, source);
+
+  if (trans && trans->keep_alive) {
+    trans->keep_alive (trans->ka_user_data);
+  }
 
-  g_message ("%p: source %p is active", media, source);
   g_object_get (source, "stats", &stats, NULL);
   dump_structure (stats);
+  gst_structure_free (stats);
 }
 
 static void
-on_bye_ssrc (GObject *session, GObject *source, GstRTSPMedia *media)
+on_bye_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
 {
-  g_message ("%p: source %p bye", media, source);
+  g_message ("%p: source %p bye", stream, source);
 }
 
 static void
-on_bye_timeout (GObject *session, GObject *source, GstRTSPMedia *media)
+on_bye_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
 {
-  g_message ("%p: source %p bye timeout", media, source);
+  g_message ("%p: source %p bye timeout", stream, source);
 }
 
 static void
-on_timeout (GObject *session, GObject *source, GstRTSPMedia *media)
+on_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
 {
-  g_message ("%p: source %p timeout", media, source);
+  g_message ("%p: source %p timeout", stream, source);
 }
 
 static GstFlowReturn
@@ -836,17 +908,17 @@ setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media)
                  &stream->session);
 
   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
-      media);
+      stream);
   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
-      media);
+      stream);
   g_signal_connect (stream->session, "on-ssrc-active", (GCallback) on_ssrc_active,
-      media);
+      stream);
   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
-      media);
+      stream);
   g_signal_connect (stream->session, "on-bye-timeout", (GCallback) on_bye_timeout,
-      media);
+      stream);
   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
-      media);
+      stream);
 
   /* link the RTP pad to the session manager */
   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
@@ -1361,12 +1433,14 @@ gst_rtsp_media_set_state (GstRTSPMedia *media, GstState state, GArray *transport
           g_message ("adding %s:%d-%d", dest, min, max);
           g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
           g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
+         stream->transports = g_list_prepend (stream->transports, tr);
          tr->active = TRUE;
          media->active++;
        } else if (remove && tr->active) {
           g_message ("removing %s:%d-%d", dest, min, max);
           g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
           g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
+         stream->transports = g_list_remove (stream->transports, tr);
          tr->active = FALSE;
          media->active--;
        }
index d41075f..eb513e3 100644 (file)
@@ -41,7 +41,8 @@ typedef struct _GstRTSPMedia GstRTSPMedia;
 typedef struct _GstRTSPMediaClass GstRTSPMediaClass;
 typedef struct _GstRTSPMediaTrans GstRTSPMediaTrans;
 
-typedef gboolean (*GstRTSPSendFunc) (GstBuffer *buffer, guint8 channel, gpointer user_data);
+typedef gboolean (*GstRTSPSendFunc)      (GstBuffer *buffer, guint8 channel, gpointer user_data);
+typedef void     (*GstRTSPKeepAliveFunc) (gpointer user_data);
 
 /**
  * GstRTSPMediaTrans:
@@ -50,20 +51,33 @@ typedef gboolean (*GstRTSPSendFunc) (GstBuffer *buffer, guint8 channel, gpointer
  * @send_rtcp: callback for sending RTCP messages
  * @user_data: user data passed in the callbacks
  * @notify: free function for the user_data.
+ * @keep_alive: keep alive callback
+ * @ka_user_data: data passed to @keep_alive
+ * @ka_notify: called when @ka_user_data is freed
+ * @active: if we are actively sending
+ * @timeout: if we timed out
  * @transport: a transport description
+ * @rtpsource: the receiver rtp source object
  *
  * A Transport description for stream @idx
  */
 struct _GstRTSPMediaTrans {
   guint idx;
 
-  GstRTSPSendFunc send_rtp;
-  GstRTSPSendFunc send_rtcp;
-  gpointer        user_data;
-  GDestroyNotify  notify;
-  gboolean        active;
+  GstRTSPSendFunc      send_rtp;
+  GstRTSPSendFunc      send_rtcp;
+  gpointer             user_data;
+  GDestroyNotify       notify;
 
-  GstRTSPTransport *transport;
+  GstRTSPKeepAliveFunc keep_alive;
+  gpointer             ka_user_data;
+  GDestroyNotify       ka_notify;
+  gboolean             active;
+  gboolean             timeout;
+
+  GstRTSPTransport    *transport;
+
+  GObject             *rtpsource;
 };
 
 /**
index 54a3700..3afd788 100644 (file)
@@ -74,6 +74,10 @@ gst_rtsp_session_free_stream (GstRTSPSessionStream *stream)
 {
   g_message ("free session stream %p", stream);
 
+  /* remove callbacks now */
+  gst_rtsp_session_stream_set_callbacks (stream, NULL, NULL, NULL, NULL);
+  gst_rtsp_session_stream_set_keepalive (stream, NULL, NULL, NULL);
+
   if (stream->trans.transport)
     gst_rtsp_transport_free (stream->trans.transport);
 
@@ -308,7 +312,7 @@ gst_rtsp_session_media_get_stream (GstRTSPSessionMedia *media, guint idx)
     result->trans.transport = NULL;
     result->media_stream = media_stream;
 
-    g_array_insert_val (media->streams, idx, result);
+    g_array_index (media->streams, GstRTSPSessionStream *, idx) = result;
   }
   return result;
 
@@ -513,6 +517,27 @@ gst_rtsp_session_stream_set_callbacks (GstRTSPSessionStream *stream,
 }
 
 /**
+ * gst_rtsp_session_stream_set_keepalive:
+ * @stream: a #GstRTSPSessionStream
+ * @keep_alive: a callback called when the receiver is active
+ * @user_data: user data passed to callback
+ * @notify: called with the user_data when no longer needed.
+ *
+ * Install callbacks that will be called when RTCP packets are received from the
+ * receiver of @stream.
+ */
+void
+gst_rtsp_session_stream_set_keepalive (GstRTSPSessionStream *stream,
+    GstRTSPKeepAliveFunc keep_alive, gpointer user_data, GDestroyNotify  notify)
+{
+  stream->trans.keep_alive = keep_alive;
+  if (stream->trans.ka_notify)
+    stream->trans.ka_notify (stream->trans.ka_user_data);
+  stream->trans.ka_user_data = user_data;
+  stream->trans.ka_notify = notify;
+}
+
+/**
  * gst_rtsp_session_media_set_state:
  * @media: a #GstRTSPSessionMedia
  * @state: the new state
index 495bc75..0074b55 100644 (file)
@@ -146,6 +146,10 @@ void                   gst_rtsp_session_stream_set_callbacks (GstRTSPSessionStre
                                                               GstRTSPSendFunc send_rtcp,
                                                               gpointer user_data,
                                                               GDestroyNotify  notify);
+void                   gst_rtsp_session_stream_set_keepalive (GstRTSPSessionStream *stream,
+                                                              GstRTSPKeepAliveFunc keep_alive,
+                                                              gpointer user_data,
+                                                              GDestroyNotify  notify);
 
 G_END_DECLS