Use ASYNC RTSP io
authorWim Taymans <wim.taymans@collabora.co.uk>
Wed, 18 Feb 2009 17:57:31 +0000 (18:57 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Wed, 18 Feb 2009 17:57:31 +0000 (18:57 +0100)
Use the async RTSP channels instead of spawning a new thread for each client.
If a sessionid is specified in a request, fail if we don't have the session.

gst/rtsp-server/rtsp-client.c
gst/rtsp-server/rtsp-client.h

index f0c0eb7..c615f85 100644 (file)
@@ -184,7 +184,10 @@ send_response (GstRTSPClient *client, GstRTSPSession *session, GstRTSPMessage *r
     gst_rtsp_message_remove_header (response, GST_RTSP_HDR_SESSION, -1);
   }
 
+#if 0
   gst_rtsp_connection_send (client->connection, response, &timeout);
+#endif
+  gst_rtsp_channel_queue_message (client->channel, response);
   gst_rtsp_message_unset (response);
 }
 
@@ -287,45 +290,6 @@ no_prepare:
   }
 }
 
-/* Get the session or NULL when there was no session */
-static GstRTSPSession *
-find_session (GstRTSPClient *client, GstRTSPMessage *request)
-{
-  GstRTSPResult res;
-  GstRTSPSession *session;
-  gchar *sessid;
-
-  res = gst_rtsp_message_get_header (request, GST_RTSP_HDR_SESSION, &sessid, 0);
-  if (res == GST_RTSP_OK) {
-    if (client->session_pool == NULL)
-      goto no_pool;
-
-    /* we had a session in the request, find it again */
-    if (!(session = gst_rtsp_session_pool_find (client->session_pool, sessid)))
-      goto session_not_found;
-
-    client->timeout = gst_rtsp_session_get_timeout (session);
-  }
-  else
-    goto service_unavailable;
-
-  return session;
-
-  /* ERRORS */
-no_pool:
-  {
-    return NULL;
-  }
-session_not_found:
-  {
-    return NULL;
-  }
-service_unavailable:
-  {
-    return NULL;
-  }
-}
-
 static gboolean
 handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request)
 {
@@ -490,7 +454,7 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses
   /* ERRORS */
 no_session:
   {
-    /* error was sent */
+    send_generic_response (client, GST_RTSP_STS_SESSION_NOT_FOUND, request);
     return FALSE;
   }
 not_found:
@@ -580,7 +544,7 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
 
   /* we have a valid transport now, set the destination of the client. */
   g_free (ct->destination);
-  ct->destination = g_strdup (inet_ntoa (client->address.sin_addr));
+  ct->destination = g_strdup (client->connection->url->host);
 
   if (session) {
     g_object_ref (session);
@@ -813,163 +777,101 @@ santize_uri (GstRTSPUrl *uri)
   *d = '\0';
 }
 
-/* this function runs in a client specific thread and handles all rtsp messages
- * with the client */
-static gpointer
-handle_client (GstRTSPClient *client)
+static void
+handle_request (GstRTSPClient *client, GstRTSPMessage *request)
 {
-  GstRTSPMessage request = { 0 };
-  GstRTSPResult res;
   GstRTSPMethod method;
   const gchar *uristr;
   GstRTSPUrl *uri;
   GstRTSPVersion version;
-
-  while (TRUE) {
-    GTimeVal timeout;
-    GstRTSPSession *session;
-
-    timeout.tv_sec = client->timeout;
-    timeout.tv_usec = 0;
-
-    /* start by waiting for a message from the client */
-    res = gst_rtsp_connection_receive (client->connection, &request, &timeout);
-    if (res < 0) {
-      if (res == GST_RTSP_ETIMEOUT)
-        goto timeout;
-
-      goto receive_failed;
-    }
+  GstRTSPResult res;
+  GstRTSPSession *session;
+  gchar *sessid;
 
 #ifdef DEBUG
-    gst_rtsp_message_dump (&request);
+  gst_rtsp_message_dump (request);
 #endif
 
-    gst_rtsp_message_parse_request (&request, &method, &uristr, &version);
-
-    if (version != GST_RTSP_VERSION_1_0) {
-      /* we can only handle 1.0 requests */
-      send_generic_response (client, GST_RTSP_STS_RTSP_VERSION_NOT_SUPPORTED, &request);
-      continue;
-    }
-
-    /* we always try to parse the url first */
-    if ((res = gst_rtsp_url_parse (uristr, &uri)) != GST_RTSP_OK) {
-      send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, &request);
-      continue;
-    }
+  gst_rtsp_message_parse_request (request, &method, &uristr, &version);
 
-    /* sanitize the uri */
-    santize_uri (uri);
-
-    /* get the session if there is any */
-    session = find_session (client, &request);
-
-    /* now see what is asked and dispatch to a dedicated handler */
-    switch (method) {
-      case GST_RTSP_OPTIONS:
-        handle_options_request (client, uri, session, &request);
-        break;
-      case GST_RTSP_DESCRIBE:
-        handle_describe_request (client, uri, session, &request);
-        break;
-      case GST_RTSP_SETUP:
-        handle_setup_request (client, uri, session, &request);
-        break;
-      case GST_RTSP_PLAY:
-        handle_play_request (client, uri, session, &request);
-        break;
-      case GST_RTSP_PAUSE:
-        handle_pause_request (client, uri, session, &request);
-        break;
-      case GST_RTSP_TEARDOWN:
-        handle_teardown_request (client, uri, session, &request);
-        break;
-      case GST_RTSP_ANNOUNCE:
-      case GST_RTSP_GET_PARAMETER:
-      case GST_RTSP_RECORD:
-      case GST_RTSP_REDIRECT:
-      case GST_RTSP_SET_PARAMETER:
-        send_generic_response (client, GST_RTSP_STS_NOT_IMPLEMENTED, &request);
-        break;
-      case GST_RTSP_INVALID:
-      default:
-        send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, &request);
-        break;
-    }
-    if (session)
-      g_object_unref (session);
-    gst_rtsp_url_free (uri);
+  if (version != GST_RTSP_VERSION_1_0) {
+    /* we can only handle 1.0 requests */
+    send_generic_response (client, GST_RTSP_STS_RTSP_VERSION_NOT_SUPPORTED, request);
+    return;
   }
-  g_object_unref (client);
-  return NULL;
 
-  /* ERRORS */
-timeout:
-  {
-    g_message ("client timed out");
-    if (client->session_pool)
-      gst_rtsp_session_pool_cleanup (client->session_pool);
-    goto cleanup;
-  }
-receive_failed:
-  {
-    gchar *str;
-    str = gst_rtsp_strresult (res);
-    g_message ("receive failed %d (%s), disconnect client %p", res, 
-           str, client);
-    g_free (str);
-    goto cleanup;
-  }
-cleanup:
-  {
-    gst_rtsp_message_unset (&request);
-    gst_rtsp_connection_close (client->connection);
-    g_object_unref (client);
-    return NULL;
+  /* we always try to parse the url first */
+  if ((res = gst_rtsp_url_parse (uristr, &uri)) != GST_RTSP_OK) {
+    send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, request);
+    return;
   }
-}
 
-/* called when we need to accept a new request from a client */
-static gboolean
-client_accept (GstRTSPClient *client, GIOChannel *channel)
-{
-  /* a new client connected. */
-  int server_sock_fd, fd;
-  unsigned int address_len;
-  GstRTSPConnection *conn;
-
-  server_sock_fd = g_io_channel_unix_get_fd (channel);
+  /* sanitize the uri */
+  santize_uri (uri);
 
-  address_len = sizeof (client->address);
-  memset (&client->address, 0, address_len);
-
-  fd = accept (server_sock_fd, (struct sockaddr *) &client->address,
-      &address_len);
-  if (fd == -1)
-    goto accept_failed;
-
-  /* now create the connection object */
-  gst_rtsp_connection_create (NULL, &conn);
-  conn->fd.fd = fd;
+  /* get the session if there is any */
+  res = gst_rtsp_message_get_header (request, GST_RTSP_HDR_SESSION, &sessid, 0);
+  if (res == GST_RTSP_OK) {
+    if (client->session_pool == NULL)
+      goto no_pool;
 
-  /* FIXME some hackery, we need to have a connection method to accept server
-   * connections */
-  gst_poll_add_fd (conn->fdset, &conn->fd);
+    /* we had a session in the request, find it again */
+    if (!(session = gst_rtsp_session_pool_find (client->session_pool, sessid)))
+      goto session_not_found;
 
-  g_message ("added new client %p ip %s with fd %d", client,
-               inet_ntoa (client->address.sin_addr), conn->fd.fd);
+    client->timeout = gst_rtsp_session_get_timeout (session);
+  }
+  else
+    session = NULL;
 
-  client->connection = conn;
+  /* now see what is asked and dispatch to a dedicated handler */
+  switch (method) {
+    case GST_RTSP_OPTIONS:
+      handle_options_request (client, uri, session, request);
+      break;
+    case GST_RTSP_DESCRIBE:
+      handle_describe_request (client, uri, session, request);
+      break;
+    case GST_RTSP_SETUP:
+      handle_setup_request (client, uri, session, request);
+      break;
+    case GST_RTSP_PLAY:
+      handle_play_request (client, uri, session, request);
+      break;
+    case GST_RTSP_PAUSE:
+      handle_pause_request (client, uri, session, request);
+      break;
+    case GST_RTSP_TEARDOWN:
+      handle_teardown_request (client, uri, session, request);
+      break;
+    case GST_RTSP_ANNOUNCE:
+    case GST_RTSP_GET_PARAMETER:
+    case GST_RTSP_RECORD:
+    case GST_RTSP_REDIRECT:
+    case GST_RTSP_SET_PARAMETER:
+      send_generic_response (client, GST_RTSP_STS_NOT_IMPLEMENTED, request);
+      break;
+    case GST_RTSP_INVALID:
+    default:
+      send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, request);
+      break;
+  }
+  if (session)
+    g_object_unref (session);
 
-  return TRUE;
+  gst_rtsp_url_free (uri);
+  return;
 
   /* ERRORS */
-accept_failed:
+no_pool:
   {
-    g_error ("Could not accept client on server socket %d: %s (%d)",
-            server_sock_fd, g_strerror (errno), errno);
-    return FALSE;
+    send_generic_response (client, GST_RTSP_STS_SERVICE_UNAVAILABLE, request);
+    return;
+  }
+session_not_found:
+  {
+    send_generic_response (client, GST_RTSP_STS_SESSION_NOT_FOUND, request);
+    return;
   }
 }
 
@@ -1087,6 +989,58 @@ gst_rtsp_client_get_media_mapping (GstRTSPClient *client)
   return result;
 }
 
+static GstRTSPResult
+message_received (GstRTSPChannel *channel, GstRTSPMessage *message, gpointer user_data)
+{
+  GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
+
+  g_message ("client %p: received a message", client);
+
+  handle_request (client, message);
+
+  return GST_RTSP_OK;
+}
+
+static GstRTSPResult
+message_sent (GstRTSPChannel *channel, guint cseq, gpointer user_data)
+{
+  GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
+
+  g_message ("client %p: sent a message with cseq %d", client, cseq);
+
+  return GST_RTSP_OK;
+}
+
+static GstRTSPResult
+closed (GstRTSPChannel *channel, gpointer user_data)
+{
+  GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
+
+  g_message ("client %p: connection closed", client);
+
+  return GST_RTSP_OK;
+}
+
+static GstRTSPResult
+error (GstRTSPChannel *channel, GstRTSPResult result, gpointer user_data)
+{
+  GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
+  gchar *str;
+
+  str = gst_rtsp_strresult (result);
+  g_message ("client %p: received an error %s", client, str);
+  g_free (str);
+
+  return GST_RTSP_OK;
+}
+
+static GstRTSPChannelFuncs channel_funcs = {
+  message_received,
+  message_sent,
+  closed,
+  error
+};
+
 /**
  * gst_rtsp_client_attach:
  * @client: a #GstRTSPClient
@@ -1102,32 +1056,47 @@ gst_rtsp_client_get_media_mapping (GstRTSPClient *client)
 gboolean
 gst_rtsp_client_accept (GstRTSPClient *client, GIOChannel *channel)
 {
-  GError *error = NULL;
+  int sock;
+  GstRTSPConnection *conn;
+  GstRTSPResult res;
+  GSource *source;
+  GMainContext *context;
+
+  /* a new client connected. */
+  sock = g_io_channel_unix_get_fd (channel);
+
+  GST_RTSP_CHECK (gst_rtsp_connection_accept (sock, &conn), accept_failed);
 
-  if (!client_accept (client, channel))
-    goto accept_failed;
+  g_message ("added new client %p ip %s:%d with fd %d", client,
+               conn->url->host, conn->url->port, conn->fd.fd);
 
-  /* client accepted, spawn a thread for the client, we don't need to join the
-   * thread */
-  g_object_ref (client);
-  client->thread = g_thread_create ((GThreadFunc)handle_client, client, FALSE, &error);
-  if (client->thread == NULL)
-    goto no_thread;
+  client->connection = conn;
+
+  /* create channel for the connection and attach */
+  client->channel = gst_rtsp_channel_new (client->connection, &channel_funcs,
+                 g_object_ref (client), g_object_unref);
+
+  /* find the context to add the channel */
+  if ((source = g_main_current_source ()))
+    context = g_source_get_context (source);
+  else
+    context = NULL;
+
+  g_message ("attaching to context %p", context);
+
+  gst_rtsp_channel_attach (client->channel, context);
+  gst_rtsp_channel_unref (client->channel);
 
   return TRUE;
 
   /* ERRORS */
 accept_failed:
   {
-    return FALSE;
-  }
-no_thread:
-  {
-    if (error) {
-      g_warning ("could not create thread for client %p: %s", client, error->message);
-      g_error_free (error);
-    }
-    g_object_unref (client);
+    gchar *str = gst_rtsp_strresult (res);
+
+    g_error ("Could not accept client on server socket %d: %s",
+            sock, str);
+    g_free (str);
     return FALSE;
   }
 }
index 694361f..4746d1c 100644 (file)
@@ -70,7 +70,7 @@ struct _GstRTSPClient {
   GObject       parent;
 
   GstRTSPConnection *connection;
-  struct sockaddr_in address;
+  GstRTSPChannel *channel;
   GThread *thread;
 
   guint                 timeout;