rtsp-server: Add support for tunneling
authorWim Taymans <wim.taymans@collabora.co.uk>
Wed, 4 Mar 2009 11:44:01 +0000 (12:44 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Wed, 4 Mar 2009 11:53:07 +0000 (12:53 +0100)
Add support for tunneling over HTTP.
Use new connection methods to retrieve the url.
Dispatch messages based on the message type instead of blindly
assuming it's always a request.
Keep track of the watch id so that we can remove it later.
Set the media pipeline to NULL before unreffing the pipeline.

gst/rtsp-server/rtsp-client.c
gst/rtsp-server/rtsp-client.h
gst/rtsp-server/rtsp-media.c

index fe757ef..1ba0678 100644 (file)
@@ -26,6 +26,9 @@
 
 #define DEFAULT_TIMEOUT  60
 
+static GMutex *tunnels_lock;
+static GHashTable *tunnels;
+
 enum
 {
   PROP_0,
@@ -67,6 +70,9 @@ gst_rtsp_client_class_init (GstRTSPClientClass * klass)
       g_param_spec_object ("media-mapping", "Media Mapping",
           "The media mapping to use for client session",
           GST_TYPE_RTSP_MEDIA_MAPPING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  tunnels = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_object_unref);
+  tunnels_lock = g_mutex_new ();
 }
 
 static void
@@ -486,6 +492,7 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
   guint streamid;
   GstRTSPSessionMedia *media;
   gboolean need_session;
+  GstRTSPUrl *url;
 
   /* the uri contains the stream number we added in the SDP config, which is
    * always /stream=%d so we need to strip that off 
@@ -535,7 +542,7 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
     goto unsupported_transports;
 
   supported = GST_RTSP_LOWER_TRANS_UDP |
-       GST_RTSP_LOWER_TRANS_UDP_MCAST | GST_RTSP_LOWER_TRANS_TCP;
+       GST_RTSP_LOWER_TRANS_UDP_MCAST;
   if (!(ct->lower_transport & supported))
     goto unsupported_transports;
 
@@ -544,7 +551,8 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
 
   /* we have a valid transport now, set the destination of the client. */
   g_free (ct->destination);
-  ct->destination = g_strdup (client->connection->url->host);
+  url = gst_rtsp_connection_get_url (client->connection);
+  ct->destination = g_strdup (url->host);
 
   if (session) {
     g_object_ref (session);
@@ -996,8 +1004,17 @@ message_received (GstRTSPWatch *watch, GstRTSPMessage *message, gpointer user_da
 
   g_message ("client %p: received a message", client);
 
-  handle_request (client, message);
-
+  switch (message->type) {
+    case GST_RTSP_MESSAGE_REQUEST:
+      handle_request (client, message);
+      break;
+    case GST_RTSP_MESSAGE_RESPONSE:
+      break;
+    case GST_RTSP_MESSAGE_DATA:
+      break;
+    default:
+      break;
+  }
   return GST_RTSP_OK;
 }
 
@@ -1015,9 +1032,15 @@ static GstRTSPResult
 closed (GstRTSPWatch *watch, gpointer user_data)
 {
   GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
+  const gchar *tunnelid;
 
   g_message ("client %p: connection closed", client);
 
+  tunnelid = gst_rtsp_connection_get_tunnelid (client->connection);
+  g_mutex_lock (tunnels_lock);
+  g_hash_table_remove (tunnels, tunnelid);
+  g_mutex_unlock (tunnels_lock);
+
   return GST_RTSP_OK;
 }
 
@@ -1034,11 +1057,90 @@ error (GstRTSPWatch *watch, GstRTSPResult result, gpointer user_data)
   return GST_RTSP_OK;
 }
 
+static GstRTSPStatusCode
+tunnel_start (GstRTSPWatch *watch, gpointer user_data)
+{
+  GstRTSPClient *client;
+  const gchar *tunnelid;
+
+  client = GST_RTSP_CLIENT (user_data);
+
+  g_message ("client %p: tunnel start", client);
+
+  /* store client in the pending tunnels */
+  tunnelid = gst_rtsp_connection_get_tunnelid (client->connection);
+
+  g_message ("client %p: inserting %s", client, tunnelid);
+
+  /* we can't have two clients connecting with the same tunnelid */
+  g_mutex_lock (tunnels_lock);
+  if (g_hash_table_lookup (tunnels, tunnelid))
+    goto tunnel_existed;
+
+  g_hash_table_insert (tunnels, g_strdup (tunnelid), g_object_ref (client));
+  g_mutex_unlock (tunnels_lock);
+
+  return GST_RTSP_STS_OK;
+
+  /* ERRORS */
+tunnel_existed:
+  {
+    g_mutex_unlock (tunnels_lock);
+    g_message ("client %p: tunnel session %s existed", client, tunnelid);
+    return GST_RTSP_STS_SERVICE_UNAVAILABLE;
+  }
+}
+
+static GstRTSPResult
+tunnel_complete (GstRTSPWatch *watch, gpointer user_data)
+{
+  const gchar *tunnelid;
+  GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
+  GstRTSPClient *oclient;
+
+  g_message ("client %p: tunnel complete", client);
+
+  /* find previous tunnel */
+  tunnelid = gst_rtsp_connection_get_tunnelid (client->connection);
+
+  g_mutex_lock (tunnels_lock);
+  if (!(oclient = g_hash_table_lookup (tunnels, tunnelid)))
+    goto no_tunnel;
+
+  /* remove the old client from the table. ref before because removing it will
+   * remove the ref to it. */
+  g_object_ref (oclient);
+  g_hash_table_remove (tunnels, tunnelid);
+  g_mutex_unlock (tunnels_lock);
+
+  g_message ("client %p: found tunnel %p", client, oclient);
+
+  /* merge the tunnels into the first client */
+  gst_rtsp_connection_do_tunnel (oclient->connection, client->connection);
+  gst_rtsp_watch_reset (oclient->watch);
+  g_object_unref (oclient);
+
+  /* we don't need this watch anymore */
+  g_source_remove (client->watchid);
+
+  return GST_RTSP_OK;
+
+  /* ERRORS */
+no_tunnel:
+  {
+    g_mutex_unlock (tunnels_lock);
+    g_message ("client %p: tunnel session %s not found", client, tunnelid);
+    return GST_RTSP_OK;
+  }
+}
+
 static GstRTSPWatchFuncs watch_funcs = {
   message_received,
   message_sent,
   closed,
-  error
+  error,
+  tunnel_start,
+  tunnel_complete
 };
 
 /**
@@ -1061,14 +1163,16 @@ gst_rtsp_client_accept (GstRTSPClient *client, GIOChannel *channel)
   GstRTSPResult res;
   GSource *source;
   GMainContext *context;
+  GstRTSPUrl *url;
 
   /* a new client connected. */
   sock = g_io_channel_unix_get_fd (channel);
 
   GST_RTSP_CHECK (gst_rtsp_connection_accept (sock, &conn), accept_failed);
 
-  g_message ("added new client %p ip %s:%d with fd %d", client,
-               conn->url->host, conn->url->port, conn->fd.fd);
+  url = gst_rtsp_connection_get_url (conn);
+  g_message ("added new client %p ip %s:%d", client,
+               url->host, url->port);
 
   client->connection = conn;
 
@@ -1084,7 +1188,7 @@ gst_rtsp_client_accept (GstRTSPClient *client, GIOChannel *channel)
 
   g_message ("attaching to context %p", context);
 
-  gst_rtsp_watch_attach (client->watch, context);
+  client->watchid = gst_rtsp_watch_attach (client->watch, context);
   gst_rtsp_watch_unref (client->watch);
 
   return TRUE;
index 53e4f2f..9949f1a 100644 (file)
@@ -71,6 +71,7 @@ struct _GstRTSPClient {
 
   GstRTSPConnection *connection;
   GstRTSPWatch      *watch;
+  guint              watchid;
 
   guint                 timeout;
   GstRTSPSessionPool   *session_pool;
index bfe5609..e590ae0 100644 (file)
@@ -110,8 +110,10 @@ gst_rtsp_media_finalize (GObject * obj)
     g_source_unref (media->source);
   }
 
-  if (media->pipeline)
+  if (media->pipeline) {
+    gst_element_set_state (media->pipeline, GST_STATE_NULL);
     gst_object_unref (media->pipeline);
+  }
 
   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
 }