From: Wim Taymans Date: Wed, 18 Feb 2009 17:57:31 +0000 (+0100) Subject: Use ASYNC RTSP io X-Git-Tag: 1.6.0~978 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=39c2e31e6548448d9cd208af63764d5c5920195f;p=platform%2Fupstream%2Fgst-rtsp-server.git Use ASYNC RTSP io Use the async RTSP channels instead of spawning a new thread for each client. If a sessionid is specified in a request, fail if we don't have the session. --- diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c index f0c0eb7..c615f85 100644 --- a/gst/rtsp-server/rtsp-client.c +++ b/gst/rtsp-server/rtsp-client.c @@ -184,7 +184,10 @@ send_response (GstRTSPClient *client, GstRTSPSession *session, GstRTSPMessage *r gst_rtsp_message_remove_header (response, GST_RTSP_HDR_SESSION, -1); } +#if 0 gst_rtsp_connection_send (client->connection, response, &timeout); +#endif + gst_rtsp_channel_queue_message (client->channel, response); gst_rtsp_message_unset (response); } @@ -287,45 +290,6 @@ no_prepare: } } -/* Get the session or NULL when there was no session */ -static GstRTSPSession * -find_session (GstRTSPClient *client, GstRTSPMessage *request) -{ - GstRTSPResult res; - GstRTSPSession *session; - gchar *sessid; - - res = gst_rtsp_message_get_header (request, GST_RTSP_HDR_SESSION, &sessid, 0); - if (res == GST_RTSP_OK) { - if (client->session_pool == NULL) - goto no_pool; - - /* we had a session in the request, find it again */ - if (!(session = gst_rtsp_session_pool_find (client->session_pool, sessid))) - goto session_not_found; - - client->timeout = gst_rtsp_session_get_timeout (session); - } - else - goto service_unavailable; - - return session; - - /* ERRORS */ -no_pool: - { - return NULL; - } -session_not_found: - { - return NULL; - } -service_unavailable: - { - return NULL; - } -} - static gboolean handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request) { @@ -490,7 +454,7 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses /* ERRORS */ no_session: { - /* error was sent */ + send_generic_response (client, GST_RTSP_STS_SESSION_NOT_FOUND, request); return FALSE; } not_found: @@ -580,7 +544,7 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se /* we have a valid transport now, set the destination of the client. */ g_free (ct->destination); - ct->destination = g_strdup (inet_ntoa (client->address.sin_addr)); + ct->destination = g_strdup (client->connection->url->host); if (session) { g_object_ref (session); @@ -813,163 +777,101 @@ santize_uri (GstRTSPUrl *uri) *d = '\0'; } -/* this function runs in a client specific thread and handles all rtsp messages - * with the client */ -static gpointer -handle_client (GstRTSPClient *client) +static void +handle_request (GstRTSPClient *client, GstRTSPMessage *request) { - GstRTSPMessage request = { 0 }; - GstRTSPResult res; GstRTSPMethod method; const gchar *uristr; GstRTSPUrl *uri; GstRTSPVersion version; - - while (TRUE) { - GTimeVal timeout; - GstRTSPSession *session; - - timeout.tv_sec = client->timeout; - timeout.tv_usec = 0; - - /* start by waiting for a message from the client */ - res = gst_rtsp_connection_receive (client->connection, &request, &timeout); - if (res < 0) { - if (res == GST_RTSP_ETIMEOUT) - goto timeout; - - goto receive_failed; - } + GstRTSPResult res; + GstRTSPSession *session; + gchar *sessid; #ifdef DEBUG - gst_rtsp_message_dump (&request); + gst_rtsp_message_dump (request); #endif - gst_rtsp_message_parse_request (&request, &method, &uristr, &version); - - if (version != GST_RTSP_VERSION_1_0) { - /* we can only handle 1.0 requests */ - send_generic_response (client, GST_RTSP_STS_RTSP_VERSION_NOT_SUPPORTED, &request); - continue; - } - - /* we always try to parse the url first */ - if ((res = gst_rtsp_url_parse (uristr, &uri)) != GST_RTSP_OK) { - send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, &request); - continue; - } + gst_rtsp_message_parse_request (request, &method, &uristr, &version); - /* sanitize the uri */ - santize_uri (uri); - - /* get the session if there is any */ - session = find_session (client, &request); - - /* now see what is asked and dispatch to a dedicated handler */ - switch (method) { - case GST_RTSP_OPTIONS: - handle_options_request (client, uri, session, &request); - break; - case GST_RTSP_DESCRIBE: - handle_describe_request (client, uri, session, &request); - break; - case GST_RTSP_SETUP: - handle_setup_request (client, uri, session, &request); - break; - case GST_RTSP_PLAY: - handle_play_request (client, uri, session, &request); - break; - case GST_RTSP_PAUSE: - handle_pause_request (client, uri, session, &request); - break; - case GST_RTSP_TEARDOWN: - handle_teardown_request (client, uri, session, &request); - break; - case GST_RTSP_ANNOUNCE: - case GST_RTSP_GET_PARAMETER: - case GST_RTSP_RECORD: - case GST_RTSP_REDIRECT: - case GST_RTSP_SET_PARAMETER: - send_generic_response (client, GST_RTSP_STS_NOT_IMPLEMENTED, &request); - break; - case GST_RTSP_INVALID: - default: - send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, &request); - break; - } - if (session) - g_object_unref (session); - gst_rtsp_url_free (uri); + if (version != GST_RTSP_VERSION_1_0) { + /* we can only handle 1.0 requests */ + send_generic_response (client, GST_RTSP_STS_RTSP_VERSION_NOT_SUPPORTED, request); + return; } - g_object_unref (client); - return NULL; - /* ERRORS */ -timeout: - { - g_message ("client timed out"); - if (client->session_pool) - gst_rtsp_session_pool_cleanup (client->session_pool); - goto cleanup; - } -receive_failed: - { - gchar *str; - str = gst_rtsp_strresult (res); - g_message ("receive failed %d (%s), disconnect client %p", res, - str, client); - g_free (str); - goto cleanup; - } -cleanup: - { - gst_rtsp_message_unset (&request); - gst_rtsp_connection_close (client->connection); - g_object_unref (client); - return NULL; + /* we always try to parse the url first */ + if ((res = gst_rtsp_url_parse (uristr, &uri)) != GST_RTSP_OK) { + send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, request); + return; } -} -/* called when we need to accept a new request from a client */ -static gboolean -client_accept (GstRTSPClient *client, GIOChannel *channel) -{ - /* a new client connected. */ - int server_sock_fd, fd; - unsigned int address_len; - GstRTSPConnection *conn; - - server_sock_fd = g_io_channel_unix_get_fd (channel); + /* sanitize the uri */ + santize_uri (uri); - address_len = sizeof (client->address); - memset (&client->address, 0, address_len); - - fd = accept (server_sock_fd, (struct sockaddr *) &client->address, - &address_len); - if (fd == -1) - goto accept_failed; - - /* now create the connection object */ - gst_rtsp_connection_create (NULL, &conn); - conn->fd.fd = fd; + /* get the session if there is any */ + res = gst_rtsp_message_get_header (request, GST_RTSP_HDR_SESSION, &sessid, 0); + if (res == GST_RTSP_OK) { + if (client->session_pool == NULL) + goto no_pool; - /* FIXME some hackery, we need to have a connection method to accept server - * connections */ - gst_poll_add_fd (conn->fdset, &conn->fd); + /* we had a session in the request, find it again */ + if (!(session = gst_rtsp_session_pool_find (client->session_pool, sessid))) + goto session_not_found; - g_message ("added new client %p ip %s with fd %d", client, - inet_ntoa (client->address.sin_addr), conn->fd.fd); + client->timeout = gst_rtsp_session_get_timeout (session); + } + else + session = NULL; - client->connection = conn; + /* now see what is asked and dispatch to a dedicated handler */ + switch (method) { + case GST_RTSP_OPTIONS: + handle_options_request (client, uri, session, request); + break; + case GST_RTSP_DESCRIBE: + handle_describe_request (client, uri, session, request); + break; + case GST_RTSP_SETUP: + handle_setup_request (client, uri, session, request); + break; + case GST_RTSP_PLAY: + handle_play_request (client, uri, session, request); + break; + case GST_RTSP_PAUSE: + handle_pause_request (client, uri, session, request); + break; + case GST_RTSP_TEARDOWN: + handle_teardown_request (client, uri, session, request); + break; + case GST_RTSP_ANNOUNCE: + case GST_RTSP_GET_PARAMETER: + case GST_RTSP_RECORD: + case GST_RTSP_REDIRECT: + case GST_RTSP_SET_PARAMETER: + send_generic_response (client, GST_RTSP_STS_NOT_IMPLEMENTED, request); + break; + case GST_RTSP_INVALID: + default: + send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, request); + break; + } + if (session) + g_object_unref (session); - return TRUE; + gst_rtsp_url_free (uri); + return; /* ERRORS */ -accept_failed: +no_pool: { - g_error ("Could not accept client on server socket %d: %s (%d)", - server_sock_fd, g_strerror (errno), errno); - return FALSE; + send_generic_response (client, GST_RTSP_STS_SERVICE_UNAVAILABLE, request); + return; + } +session_not_found: + { + send_generic_response (client, GST_RTSP_STS_SESSION_NOT_FOUND, request); + return; } } @@ -1087,6 +989,58 @@ gst_rtsp_client_get_media_mapping (GstRTSPClient *client) return result; } +static GstRTSPResult +message_received (GstRTSPChannel *channel, GstRTSPMessage *message, gpointer user_data) +{ + GstRTSPClient *client = GST_RTSP_CLIENT (user_data); + + g_message ("client %p: received a message", client); + + handle_request (client, message); + + return GST_RTSP_OK; +} + +static GstRTSPResult +message_sent (GstRTSPChannel *channel, guint cseq, gpointer user_data) +{ + GstRTSPClient *client = GST_RTSP_CLIENT (user_data); + + g_message ("client %p: sent a message with cseq %d", client, cseq); + + return GST_RTSP_OK; +} + +static GstRTSPResult +closed (GstRTSPChannel *channel, gpointer user_data) +{ + GstRTSPClient *client = GST_RTSP_CLIENT (user_data); + + g_message ("client %p: connection closed", client); + + return GST_RTSP_OK; +} + +static GstRTSPResult +error (GstRTSPChannel *channel, GstRTSPResult result, gpointer user_data) +{ + GstRTSPClient *client = GST_RTSP_CLIENT (user_data); + gchar *str; + + str = gst_rtsp_strresult (result); + g_message ("client %p: received an error %s", client, str); + g_free (str); + + return GST_RTSP_OK; +} + +static GstRTSPChannelFuncs channel_funcs = { + message_received, + message_sent, + closed, + error +}; + /** * gst_rtsp_client_attach: * @client: a #GstRTSPClient @@ -1102,32 +1056,47 @@ gst_rtsp_client_get_media_mapping (GstRTSPClient *client) gboolean gst_rtsp_client_accept (GstRTSPClient *client, GIOChannel *channel) { - GError *error = NULL; + int sock; + GstRTSPConnection *conn; + GstRTSPResult res; + GSource *source; + GMainContext *context; + + /* a new client connected. */ + sock = g_io_channel_unix_get_fd (channel); + + GST_RTSP_CHECK (gst_rtsp_connection_accept (sock, &conn), accept_failed); - if (!client_accept (client, channel)) - goto accept_failed; + g_message ("added new client %p ip %s:%d with fd %d", client, + conn->url->host, conn->url->port, conn->fd.fd); - /* client accepted, spawn a thread for the client, we don't need to join the - * thread */ - g_object_ref (client); - client->thread = g_thread_create ((GThreadFunc)handle_client, client, FALSE, &error); - if (client->thread == NULL) - goto no_thread; + client->connection = conn; + + /* create channel for the connection and attach */ + client->channel = gst_rtsp_channel_new (client->connection, &channel_funcs, + g_object_ref (client), g_object_unref); + + /* find the context to add the channel */ + if ((source = g_main_current_source ())) + context = g_source_get_context (source); + else + context = NULL; + + g_message ("attaching to context %p", context); + + gst_rtsp_channel_attach (client->channel, context); + gst_rtsp_channel_unref (client->channel); return TRUE; /* ERRORS */ accept_failed: { - return FALSE; - } -no_thread: - { - if (error) { - g_warning ("could not create thread for client %p: %s", client, error->message); - g_error_free (error); - } - g_object_unref (client); + gchar *str = gst_rtsp_strresult (res); + + g_error ("Could not accept client on server socket %d: %s", + sock, str); + g_free (str); return FALSE; } } diff --git a/gst/rtsp-server/rtsp-client.h b/gst/rtsp-server/rtsp-client.h index 694361f..4746d1c 100644 --- a/gst/rtsp-server/rtsp-client.h +++ b/gst/rtsp-server/rtsp-client.h @@ -70,7 +70,7 @@ struct _GstRTSPClient { GObject parent; GstRTSPConnection *connection; - struct sockaddr_in address; + GstRTSPChannel *channel; GThread *thread; guint timeout;