X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtsp-server%2Frtsp-server.c;h=0edf635a0858a4f439b625fb510d79ff86aa66fa;hb=9a97de88ea9b9ca1613b57db2933cfbbd988a547;hp=d7a0d1cca31f52e0d85e2f2a33059318fc1c544c;hpb=5fb5f750208c7f35422a7fc3405fdb9b35b86fdc;p=platform%2Fupstream%2Fgstreamer.git diff --git a/gst/rtsp-server/rtsp-server.c b/gst/rtsp-server/rtsp-server.c index d7a0d1c..0edf635 100644 --- a/gst/rtsp-server/rtsp-server.c +++ b/gst/rtsp-server/rtsp-server.c @@ -13,19 +13,22 @@ * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. */ -#include +#include +#include #include "rtsp-server.h" #include "rtsp-client.h" #define DEFAULT_ADDRESS "0.0.0.0" +#define DEFAULT_BOUND_PORT -1 /* #define DEFAULT_ADDRESS "::0" */ #define DEFAULT_SERVICE "8554" #define DEFAULT_BACKLOG 5 +#define DEFAULT_MAX_THREADS 0 /* Define to use the SO_LINGER option so that the server sockets can be resused * sooner. Disabled for now because it is not very well implemented by various @@ -37,26 +40,40 @@ enum PROP_0, PROP_ADDRESS, PROP_SERVICE, + PROP_BOUND_PORT, PROP_BACKLOG, PROP_SESSION_POOL, PROP_MEDIA_MAPPING, + PROP_MAX_THREADS, PROP_LAST }; +enum +{ + SIGNAL_CLIENT_CONNECTED, + SIGNAL_LAST +}; + G_DEFINE_TYPE (GstRTSPServer, gst_rtsp_server, G_TYPE_OBJECT); GST_DEBUG_CATEGORY_STATIC (rtsp_server_debug); #define GST_CAT_DEFAULT rtsp_server_debug +typedef struct _ClientContext ClientContext; + +static guint gst_rtsp_server_signals[SIGNAL_LAST] = { 0 }; + static void gst_rtsp_server_get_property (GObject * object, guint propid, GValue * value, GParamSpec * pspec); static void gst_rtsp_server_set_property (GObject * object, guint propid, const GValue * value, GParamSpec * pspec); static void gst_rtsp_server_finalize (GObject * object); -static GstRTSPClient *default_accept_client (GstRTSPServer * server, - GIOChannel * channel); +static gpointer do_loop (ClientContext * ctx); +static GstRTSPClient *default_create_client (GstRTSPServer * server); +static gboolean default_accept_client (GstRTSPServer * server, + GstRTSPClient * client, GSocket * socket, GError ** error); static void gst_rtsp_server_class_init (GstRTSPServerClass * klass) @@ -70,7 +87,7 @@ gst_rtsp_server_class_init (GstRTSPServerClass * klass) gobject_class->finalize = gst_rtsp_server_finalize; /** - * GstRTSPServer::address + * GstRTSPServer::address: * * The address of the server. This is the address where the server will * listen on. @@ -80,7 +97,7 @@ gst_rtsp_server_class_init (GstRTSPServerClass * klass) "The address the server uses to listen on", DEFAULT_ADDRESS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** - * GstRTSPServer::service + * GstRTSPServer::service: * * The service of the server. This is either a string with the service name or * a port number (as a string) the server will listen on. @@ -90,7 +107,19 @@ gst_rtsp_server_class_init (GstRTSPServerClass * klass) "The service or port number the server uses to listen on", DEFAULT_SERVICE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** - * GstRTSPServer::backlog + * GstRTSPServer::bound-port: + * + * The actual port the server is listening on. Can be used to retrieve the + * port number when the server is started on port 0, which means bind to a + * random port. Set to -1 if the server has not been bound yet. + */ + g_object_class_install_property (gobject_class, PROP_BOUND_PORT, + g_param_spec_int ("bound-port", "Bound port", + "The port number the server is listening on", + -1, G_MAXUINT16, DEFAULT_BOUND_PORT, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + /** + * GstRTSPServer::backlog: * * The backlog argument defines the maximum length to which the queue of * pending connections for the server may grow. If a connection request arrives @@ -104,7 +133,7 @@ gst_rtsp_server_class_init (GstRTSPServerClass * klass) "of pending connections may grow", 0, G_MAXINT, DEFAULT_BACKLOG, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** - * GstRTSPServer::session-pool + * GstRTSPServer::session-pool: * * The session pool of the server. By default each server has a separate * session pool but sessions can be shared between servers by setting the same @@ -116,7 +145,7 @@ gst_rtsp_server_class_init (GstRTSPServerClass * klass) GST_TYPE_RTSP_SESSION_POOL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** - * GstRTSPServer::media-mapping + * GstRTSPServer::media-mapping: * * The media mapping to use for this server. By default the server has no * media mapping and thus cannot map urls to media streams. @@ -126,17 +155,40 @@ gst_rtsp_server_class_init (GstRTSPServerClass * klass) "The media mapping to use for client session", GST_TYPE_RTSP_MEDIA_MAPPING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - + /** + * GstRTSPServer::max-threads: + * + * The maximum amount of threads to use for client connections. A value of + * 0 means to use only the mainloop, -1 means an unlimited amount of + * threads. + */ + g_object_class_install_property (gobject_class, PROP_MAX_THREADS, + g_param_spec_int ("max-threads", "Max Threads", + "The maximum amount of threads to use for client connections " + "(0 = only mainloop, -1 = unlimited)", -1, G_MAXINT, + DEFAULT_MAX_THREADS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + gst_rtsp_server_signals[SIGNAL_CLIENT_CONNECTED] = + g_signal_new ("client-connected", G_TYPE_FROM_CLASS (gobject_class), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTSPServerClass, client_connected), + NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, + gst_rtsp_client_get_type ()); + + klass->create_client = default_create_client; klass->accept_client = default_accept_client; + klass->pool = g_thread_pool_new ((GFunc) do_loop, klass, -1, FALSE, NULL); + GST_DEBUG_CATEGORY_INIT (rtsp_server_debug, "rtspserver", 0, "GstRTSPServer"); } static void gst_rtsp_server_init (GstRTSPServer * server) { + g_mutex_init (&server->lock); server->address = g_strdup (DEFAULT_ADDRESS); server->service = g_strdup (DEFAULT_SERVICE); + server->socket = NULL; server->backlog = DEFAULT_BACKLOG; server->session_pool = gst_rtsp_session_pool_new (); server->media_mapping = gst_rtsp_media_mapping_new (); @@ -147,11 +199,22 @@ gst_rtsp_server_finalize (GObject * object) { GstRTSPServer *server = GST_RTSP_SERVER (object); + GST_DEBUG_OBJECT (server, "finalize server"); + g_free (server->address); g_free (server->service); + if (server->socket) + g_object_unref (server->socket); g_object_unref (server->session_pool); g_object_unref (server->media_mapping); + + if (server->auth) + g_object_unref (server->auth); + + g_mutex_clear (&server->lock); + + G_OBJECT_CLASS (gst_rtsp_server_parent_class)->finalize (object); } /** @@ -184,8 +247,10 @@ gst_rtsp_server_set_address (GstRTSPServer * server, const gchar * address) g_return_if_fail (GST_IS_RTSP_SERVER (server)); g_return_if_fail (address != NULL); + GST_RTSP_SERVER_LOCK (server); g_free (server->address); server->address = g_strdup (address); + GST_RTSP_SERVER_UNLOCK (server); } /** @@ -199,9 +264,44 @@ gst_rtsp_server_set_address (GstRTSPServer * server, const gchar * address) gchar * gst_rtsp_server_get_address (GstRTSPServer * server) { + gchar *result; g_return_val_if_fail (GST_IS_RTSP_SERVER (server), NULL); - return g_strdup (server->address); + GST_RTSP_SERVER_LOCK (server); + result = g_strdup (server->address); + GST_RTSP_SERVER_UNLOCK (server); + + return result; +} + +/** + * gst_rtsp_server_get_bound_port: + * @server: a #GstRTSPServer + * + * Get the port number where the server was bound to. + * + * Returns: the port number + */ +int +gst_rtsp_server_get_bound_port (GstRTSPServer * server) +{ + GSocketAddress *address; + int result = -1; + + g_return_val_if_fail (GST_IS_RTSP_SERVER (server), result); + + GST_RTSP_SERVER_LOCK (server); + if (server->socket == NULL) + goto out; + + address = g_socket_get_local_address (server->socket, NULL); + result = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (address)); + g_object_unref (address); + +out: + GST_RTSP_SERVER_UNLOCK (server); + + return result; } /** @@ -221,8 +321,10 @@ gst_rtsp_server_set_service (GstRTSPServer * server, const gchar * service) g_return_if_fail (GST_IS_RTSP_SERVER (server)); g_return_if_fail (service != NULL); + GST_RTSP_SERVER_LOCK (server); g_free (server->service); server->service = g_strdup (service); + GST_RTSP_SERVER_UNLOCK (server); } /** @@ -236,9 +338,15 @@ gst_rtsp_server_set_service (GstRTSPServer * server, const gchar * service) gchar * gst_rtsp_server_get_service (GstRTSPServer * server) { + gchar *result; + g_return_val_if_fail (GST_IS_RTSP_SERVER (server), NULL); - return g_strdup (server->service); + GST_RTSP_SERVER_LOCK (server); + result = g_strdup (server->service); + GST_RTSP_SERVER_UNLOCK (server); + + return result; } /** @@ -256,7 +364,9 @@ gst_rtsp_server_set_backlog (GstRTSPServer * server, gint backlog) { g_return_if_fail (GST_IS_RTSP_SERVER (server)); + GST_RTSP_SERVER_LOCK (server); server->backlog = backlog; + GST_RTSP_SERVER_UNLOCK (server); } /** @@ -270,9 +380,15 @@ gst_rtsp_server_set_backlog (GstRTSPServer * server, gint backlog) gint gst_rtsp_server_get_backlog (GstRTSPServer * server) { + gint result; + g_return_val_if_fail (GST_IS_RTSP_SERVER (server), -1); - return server->backlog; + GST_RTSP_SERVER_LOCK (server); + result = server->backlog; + GST_RTSP_SERVER_UNLOCK (server); + + return result; } /** @@ -290,15 +406,16 @@ gst_rtsp_server_set_session_pool (GstRTSPServer * server, g_return_if_fail (GST_IS_RTSP_SERVER (server)); + if (pool) + g_object_ref (pool); + + GST_RTSP_SERVER_LOCK (server); old = server->session_pool; + server->session_pool = pool; + GST_RTSP_SERVER_UNLOCK (server); - if (old != pool) { - if (pool) - g_object_ref (pool); - server->session_pool = pool; - if (old) - g_object_unref (old); - } + if (old) + g_object_unref (old); } /** @@ -307,7 +424,7 @@ gst_rtsp_server_set_session_pool (GstRTSPServer * server, * * Get the #GstRTSPSessionPool used as the session pool of @server. * - * Returns: the #GstRTSPSessionPool used for sessions. g_object_unref() after + * Returns: (transfer full): the #GstRTSPSessionPool used for sessions. g_object_unref() after * usage. */ GstRTSPSessionPool * @@ -317,8 +434,10 @@ gst_rtsp_server_get_session_pool (GstRTSPServer * server) g_return_val_if_fail (GST_IS_RTSP_SERVER (server), NULL); + GST_RTSP_SERVER_LOCK (server); if ((result = server->session_pool)) g_object_ref (result); + GST_RTSP_SERVER_UNLOCK (server); return result; } @@ -338,15 +457,16 @@ gst_rtsp_server_set_media_mapping (GstRTSPServer * server, g_return_if_fail (GST_IS_RTSP_SERVER (server)); + if (mapping) + g_object_ref (mapping); + + GST_RTSP_SERVER_LOCK (server); old = server->media_mapping; + server->media_mapping = mapping; + GST_RTSP_SERVER_UNLOCK (server); - if (old != mapping) { - if (mapping) - g_object_ref (mapping); - server->media_mapping = mapping; - if (old) - g_object_unref (old); - } + if (old) + g_object_unref (old); } @@ -356,7 +476,7 @@ gst_rtsp_server_set_media_mapping (GstRTSPServer * server, * * Get the #GstRTSPMediaMapping used as the media mapping of @server. * - * Returns: the #GstRTSPMediaMapping of @server. g_object_unref() after + * Returns: (transfer full): the #GstRTSPMediaMapping of @server. g_object_unref() after * usage. */ GstRTSPMediaMapping * @@ -366,8 +486,10 @@ gst_rtsp_server_get_media_mapping (GstRTSPServer * server) g_return_val_if_fail (GST_IS_RTSP_SERVER (server), NULL); + GST_RTSP_SERVER_LOCK (server); if ((result = server->media_mapping)) g_object_ref (result); + GST_RTSP_SERVER_UNLOCK (server); return result; } @@ -386,15 +508,16 @@ gst_rtsp_server_set_auth (GstRTSPServer * server, GstRTSPAuth * auth) g_return_if_fail (GST_IS_RTSP_SERVER (server)); + if (auth) + g_object_ref (auth); + + GST_RTSP_SERVER_LOCK (server); old = server->auth; + server->auth = auth; + GST_RTSP_SERVER_UNLOCK (server); - if (old != auth) { - if (auth) - g_object_ref (auth); - server->auth = auth; - if (old) - g_object_unref (old); - } + if (old) + g_object_unref (old); } @@ -404,7 +527,7 @@ gst_rtsp_server_set_auth (GstRTSPServer * server, GstRTSPAuth * auth) * * Get the #GstRTSPAuth used as the authentication manager of @server. * - * Returns: the #GstRTSPAuth of @server. g_object_unref() after + * Returns: (transfer full): the #GstRTSPAuth of @server. g_object_unref() after * usage. */ GstRTSPAuth * @@ -414,12 +537,57 @@ gst_rtsp_server_get_auth (GstRTSPServer * server) g_return_val_if_fail (GST_IS_RTSP_SERVER (server), NULL); + GST_RTSP_SERVER_LOCK (server); if ((result = server->auth)) g_object_ref (result); + GST_RTSP_SERVER_UNLOCK (server); return result; } +/** + * gst_rtsp_server_set_max_threads: + * @server: a #GstRTSPServer + * @max_threads: maximum threads + * + * Set the maximum threads used by the server to handle client requests. + * A value of 0 will use the server mainloop, a value of -1 will use an + * unlimited number of threads. + */ +void +gst_rtsp_server_set_max_threads (GstRTSPServer * server, gint max_threads) +{ + g_return_if_fail (GST_IS_RTSP_SERVER (server)); + + GST_RTSP_SERVER_LOCK (server); + server->max_threads = max_threads; + GST_RTSP_SERVER_UNLOCK (server); +} + +/** + * gst_rtsp_server_get_max_threads: + * @server: a #GstRTSPServer + * + * Get the maximum number of threads used for client connections. + * See gst_rtsp_server_set_max_threads(). + * + * Returns: the maximum number of threads. + */ +gint +gst_rtsp_server_get_max_threads (GstRTSPServer * server) +{ + gint res; + + g_return_val_if_fail (GST_IS_RTSP_SERVER (server), -1); + + GST_RTSP_SERVER_LOCK (server); + res = server->max_threads; + GST_RTSP_SERVER_UNLOCK (server); + + return res; +} + + static void gst_rtsp_server_get_property (GObject * object, guint propid, GValue * value, GParamSpec * pspec) @@ -433,6 +601,9 @@ gst_rtsp_server_get_property (GObject * object, guint propid, case PROP_SERVICE: g_value_take_string (value, gst_rtsp_server_get_service (server)); break; + case PROP_BOUND_PORT: + g_value_set_int (value, gst_rtsp_server_get_bound_port (server)); + break; case PROP_BACKLOG: g_value_set_int (value, gst_rtsp_server_get_backlog (server)); break; @@ -442,6 +613,9 @@ gst_rtsp_server_get_property (GObject * object, guint propid, case PROP_MEDIA_MAPPING: g_value_take_object (value, gst_rtsp_server_get_media_mapping (server)); break; + case PROP_MAX_THREADS: + g_value_set_int (value, gst_rtsp_server_get_max_threads (server)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec); } @@ -469,129 +643,143 @@ gst_rtsp_server_set_property (GObject * object, guint propid, case PROP_MEDIA_MAPPING: gst_rtsp_server_set_media_mapping (server, g_value_get_object (value)); break; + case PROP_MAX_THREADS: + gst_rtsp_server_set_max_threads (server, g_value_get_int (value)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec); } } -/* Prepare a server socket for @server and make it listen on the configured port */ -static gboolean -gst_rtsp_server_sink_init_send (GstRTSPServer * server) +/** + * gst_rtsp_server_create_socket: + * @server: a #GstRTSPServer + * @cancellable: a #GCancellable + * @error: a #GError + * + * Create a #GSocket for @server. The socket will listen on the + * configured service. + * + * Returns: (transfer full): the #GSocket for @server or NULL when an error occured. + */ +GSocket * +gst_rtsp_server_create_socket (GstRTSPServer * server, + GCancellable * cancellable, GError ** error) { - int ret, sockfd; - struct addrinfo hints; - struct addrinfo *result, *rp; + GSocketConnectable *conn; + GSocketAddressEnumerator *enumerator; + GSocket *socket = NULL; #ifdef USE_SOLINGER struct linger linger; #endif + GError *sock_error = NULL; + GError *bind_error = NULL; + guint16 port; - memset (&hints, 0, sizeof (struct addrinfo)); - hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ - hints.ai_socktype = SOCK_STREAM; /* stream socket */ - hints.ai_flags = AI_PASSIVE | AI_CANONNAME; /* For wildcard IP address */ - hints.ai_protocol = 0; /* Any protocol */ - hints.ai_canonname = NULL; - hints.ai_addr = NULL; - hints.ai_next = NULL; + g_return_val_if_fail (GST_IS_RTSP_SERVER (server), NULL); + GST_RTSP_SERVER_LOCK (server); GST_DEBUG_OBJECT (server, "getting address info of %s/%s", server->address, server->service); /* resolve the server IP address */ - if ((ret = - getaddrinfo (server->address, server->service, &hints, &result)) != 0) - goto no_address; + port = atoi (server->service); + if (port != 0 || !strcmp (server->service, "0")) + conn = g_network_address_new (server->address, port); + else + conn = g_network_service_new (server->service, "tcp", server->address); + + enumerator = g_socket_connectable_enumerate (conn); + g_object_unref (conn); /* create server socket, we loop through all the addresses until we manage to * create a socket and bind. */ - for (rp = result; rp; rp = rp->ai_next) { - sockfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol); - if (sockfd == -1) { - GST_DEBUG_OBJECT (server, "failed to make socket (%s), try next", - g_strerror (errno)); - continue; + while (TRUE) { + GSocketAddress *sockaddr; + + sockaddr = + g_socket_address_enumerator_next (enumerator, cancellable, error); + if (!sockaddr) { + if (!*error) + GST_DEBUG_OBJECT (server, "no more addresses %s", + *error ? (*error)->message : ""); + else + GST_DEBUG_OBJECT (server, "failed to retrieve next address %s", + (*error)->message); + break; } - /* make address reusable */ - ret = 1; - if (setsockopt (sockfd, SOL_SOCKET, SO_REUSEADDR, - (void *) &ret, sizeof (ret)) < 0) { - /* warn but try to bind anyway */ - GST_WARNING_OBJECT (server, "failed to reuse socker (%s)", - g_strerror (errno)); + /* only keep the first error */ + socket = g_socket_new (g_socket_address_get_family (sockaddr), + G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP, + sock_error ? NULL : &sock_error); + + if (socket == NULL) { + GST_DEBUG_OBJECT (server, "failed to make socket (%s), try next", + sock_error->message); + continue; } - if (bind (sockfd, rp->ai_addr, rp->ai_addrlen) == 0) { - GST_DEBUG_OBJECT (server, "bind on %s", rp->ai_canonname); + if (g_socket_bind (socket, sockaddr, TRUE, bind_error ? NULL : &bind_error)) { + g_object_unref (sockaddr); break; } GST_DEBUG_OBJECT (server, "failed to bind socket (%s), try next", - g_strerror (errno)); - close (sockfd); + bind_error->message); + g_object_unref (sockaddr); + g_object_unref (socket); + socket = NULL; } - freeaddrinfo (result); + g_object_unref (enumerator); - if (rp == NULL) + if (socket == NULL) goto no_socket; - server->server_sock.fd = sockfd; + g_clear_error (&sock_error); + g_clear_error (&bind_error); - GST_DEBUG_OBJECT (server, "opened sending server socket with fd %d", - server->server_sock.fd); + GST_DEBUG_OBJECT (server, "opened sending server socket"); /* keep connection alive; avoids SIGPIPE during write */ - ret = 1; - if (setsockopt (server->server_sock.fd, SOL_SOCKET, SO_KEEPALIVE, - (void *) &ret, sizeof (ret)) < 0) - goto keepalive_failed; + g_socket_set_keepalive (socket, TRUE); +#if 0 #ifdef USE_SOLINGER /* make sure socket is reset 5 seconds after close. This ensure that we can * reuse the socket quickly while still having a chance to send data to the * client. */ linger.l_onoff = 1; linger.l_linger = 5; - if (setsockopt (server->server_sock.fd, SOL_SOCKET, SO_LINGER, + if (setsockopt (sockfd, SOL_SOCKET, SO_LINGER, (void *) &linger, sizeof (linger)) < 0) goto linger_failed; #endif +#endif /* set the server socket to nonblocking */ - fcntl (server->server_sock.fd, F_SETFL, O_NONBLOCK); + g_socket_set_blocking (socket, FALSE); + + /* set listen backlog */ + g_socket_set_listen_backlog (socket, server->backlog); - GST_DEBUG_OBJECT (server, "listening on server socket %d with queue of %d", - server->server_sock.fd, server->backlog); - if (listen (server->server_sock.fd, server->backlog) == -1) + if (!g_socket_listen (socket, error)) goto listen_failed; - GST_DEBUG_OBJECT (server, - "listened on server socket %d, returning from connection setup", - server->server_sock.fd); + GST_DEBUG_OBJECT (server, "listening on server socket %p with queue of %d", + socket, server->backlog); - GST_INFO_OBJECT (server, "listening on service %s", server->service); + GST_RTSP_SERVER_UNLOCK (server); - return TRUE; + return socket; /* ERRORS */ -no_address: - { - GST_ERROR_OBJECT (server, "failed to resolve address: %s", - gai_strerror (ret)); - return FALSE; - } no_socket: { - GST_ERROR_OBJECT (server, "failed to create socket: %s", - g_strerror (errno)); - return FALSE; - } -keepalive_failed: - { - GST_ERROR_OBJECT (server, "failed to configure keepalive socket: %s", - g_strerror (errno)); + GST_ERROR_OBJECT (server, "failed to create socket"); goto close_error; } +#if 0 #ifdef USE_SOLINGER linger_failed: { @@ -600,26 +788,128 @@ linger_failed: goto close_error; } #endif +#endif listen_failed: { GST_ERROR_OBJECT (server, "failed to listen on socket: %s", - g_strerror (errno)); + (*error)->message); goto close_error; } close_error: { - if (server->server_sock.fd >= 0) { - close (server->server_sock.fd); - server->server_sock.fd = -1; + if (socket) + g_object_unref (socket); + + if (sock_error) { + if (error == NULL) + g_propagate_error (error, sock_error); + else + g_error_free (sock_error); } - return FALSE; + if (bind_error) { + if ((error == NULL) || (*error == NULL)) + g_propagate_error (error, bind_error); + else + g_error_free (bind_error); + } + GST_RTSP_SERVER_UNLOCK (server); + return NULL; + } +} + +struct _ClientContext +{ + GstRTSPServer *server; + GMainLoop *loop; + GMainContext *context; + GstRTSPClient *client; +}; + +static void +free_client_context (ClientContext * ctx) +{ + g_main_context_unref (ctx->context); + if (ctx->loop) + g_main_loop_unref (ctx->loop); + g_object_unref (ctx->client); + g_slice_free (ClientContext, ctx); +} + +static gpointer +do_loop (ClientContext * ctx) +{ + GST_INFO ("enter mainloop"); + g_main_loop_run (ctx->loop); + GST_INFO ("exit mainloop"); + + free_client_context (ctx); + + return NULL; +} + +static void +unmanage_client (GstRTSPClient * client, ClientContext * ctx) +{ + GstRTSPServer *server = ctx->server; + + GST_DEBUG_OBJECT (server, "unmanage client %p", client); + + g_object_ref (server); + gst_rtsp_client_set_server (client, NULL); + + GST_RTSP_SERVER_LOCK (server); + server->clients = g_list_remove (server->clients, ctx); + GST_RTSP_SERVER_UNLOCK (server); + + if (ctx->loop) + g_main_loop_quit (ctx->loop); + else + free_client_context (ctx); + + g_object_unref (server); +} + +/* add the client context to the active list of clients, takes ownership + * of client */ +static void +manage_client (GstRTSPServer * server, GstRTSPClient * client) +{ + ClientContext *ctx; + + GST_DEBUG_OBJECT (server, "manage client %p", client); + gst_rtsp_client_set_server (client, server); + + ctx = g_slice_new0 (ClientContext); + ctx->server = server; + ctx->client = client; + if (server->max_threads == 0) { + GSource *source; + + /* find the context to add the watch */ + if ((source = g_main_current_source ())) + ctx->context = g_main_context_ref (g_source_get_context (source)); + else + ctx->context = NULL; + } else { + ctx->context = g_main_context_new (); + ctx->loop = g_main_loop_new (ctx->context, TRUE); + } + gst_rtsp_client_attach (client, ctx->context); + + GST_RTSP_SERVER_LOCK (server); + g_signal_connect (client, "closed", (GCallback) unmanage_client, ctx); + server->clients = g_list_prepend (server->clients, ctx); + GST_RTSP_SERVER_UNLOCK (server); + + if (ctx->loop) { + GstRTSPServerClass *klass = GST_RTSP_SERVER_GET_CLASS (server); + + g_thread_pool_push (klass->pool, ctx, NULL); } } -/* default method for creating a new client object in the server to accept and - * handle a client connection on this server */ static GstRTSPClient * -default_accept_client (GstRTSPServer * server, GIOChannel * channel) +default_create_client (GstRTSPServer * server) { GstRTSPClient *client; @@ -627,63 +917,82 @@ default_accept_client (GstRTSPServer * server, GIOChannel * channel) client = gst_rtsp_client_new (); /* set the session pool that this client should use */ + GST_RTSP_SERVER_LOCK (server); gst_rtsp_client_set_session_pool (client, server->session_pool); /* set the media mapping that this client should use */ gst_rtsp_client_set_media_mapping (client, server->media_mapping); /* set authentication manager */ gst_rtsp_client_set_auth (client, server->auth); + GST_RTSP_SERVER_UNLOCK (server); + return client; +} + +/* default method for creating a new client object in the server to accept and + * handle a client connection on this server */ +static gboolean +default_accept_client (GstRTSPServer * server, GstRTSPClient * client, + GSocket * socket, GError ** error) +{ /* accept connections for that client, this function returns after accepting * the connection and will run the remainder of the communication with the * client asyncronously. */ - if (!gst_rtsp_client_accept (client, channel)) + if (!gst_rtsp_client_accept (client, socket, NULL, error)) goto accept_failed; - return client; + return TRUE; /* ERRORS */ accept_failed: { GST_ERROR_OBJECT (server, - "Could not accept client on server socket %d: %s (%d)", - server->server_sock.fd, g_strerror (errno), errno); - gst_object_unref (client); - return NULL; + "Could not accept client on server : %s", (*error)->message); + return FALSE; } } /** - * gst_rtsp_server_io_func: - * @channel: a #GIOChannel - * @condition: the condition on @source + * gst_rtsp_server_transfer_connection: + * @server: a #GstRTSPServer + * @socket: a network socket + * @ip: the IP address of the remote client + * @port: the port used by the other end + * @initial_buffer: any initial data that was already read from the socket * - * A default #GIOFunc that creates a new #GstRTSPClient to accept and handle a - * new connection on @channel or @server. + * Take an existing network socket and use it for an RTSP connection. This + * is used when transferring a socket from an HTTP server which should be used + * as an RTSP over HTTP tunnel. The @initial_buffer contains any remaining data + * that the HTTP server read from the socket while parsing the HTTP header. * - * Returns: TRUE if the source could be connected, FALSE if an error occured. + * Returns: TRUE if all was ok, FALSE if an error occured. */ gboolean -gst_rtsp_server_io_func (GIOChannel * channel, GIOCondition condition, - GstRTSPServer * server) +gst_rtsp_server_transfer_connection (GstRTSPServer * server, GSocket * socket, + const gchar * ip, gint port, const gchar * initial_buffer) { GstRTSPClient *client = NULL; GstRTSPServerClass *klass; + GError *error = NULL; - if (condition & G_IO_IN) { - klass = GST_RTSP_SERVER_GET_CLASS (server); + klass = GST_RTSP_SERVER_GET_CLASS (server); - /* a new client connected, create a client object to handle the client. */ - if (klass->accept_client) - client = klass->accept_client (server, channel); - if (client == NULL) - goto client_failed; + if (klass->create_client) + client = klass->create_client (server); + if (client == NULL) + goto client_failed; - /* can unref the client now, when the request is finished, it will be - * unreffed async. */ - gst_object_unref (client); - } else { - GST_WARNING_OBJECT (server, "received unknown event %08x", condition); + /* a new client connected, create a client object to handle the client. */ + if (!gst_rtsp_client_use_socket (client, socket, ip, + port, initial_buffer, &error)) { + goto transfer_failed; } + + /* manage the client connection */ + manage_client (server, client); + + g_signal_emit (server, gst_rtsp_server_signals[SIGNAL_CLIENT_CONNECTED], 0, + client); + return TRUE; /* ERRORS */ @@ -692,71 +1001,134 @@ client_failed: GST_ERROR_OBJECT (server, "failed to create a client"); return FALSE; } +transfer_failed: + { + GST_ERROR_OBJECT (server, "failed to accept client: %s", error->message); + g_error_free (error); + g_object_unref (client); + return FALSE; + } } /** - * gst_rtsp_server_get_io_channel: + * gst_rtsp_server_io_func: + * @socket: a #GSocket + * @condition: the condition on @source * @server: a #GstRTSPServer * - * Create a #GIOChannel for @server. + * A default #GSocketSourceFunc that creates a new #GstRTSPClient to accept and handle a + * new connection on @socket or @server. * - * Returns: the GIOChannel for @server or NULL when an error occured. + * Returns: TRUE if the source could be connected, FALSE if an error occured. */ -GIOChannel * -gst_rtsp_server_get_io_channel (GstRTSPServer * server) +gboolean +gst_rtsp_server_io_func (GSocket * socket, GIOCondition condition, + GstRTSPServer * server) { - g_return_val_if_fail (GST_IS_RTSP_SERVER (server), NULL); + gboolean result = TRUE; + GstRTSPClient *client = NULL; + GstRTSPServerClass *klass; + GError *error = NULL; + + if (condition & G_IO_IN) { + klass = GST_RTSP_SERVER_GET_CLASS (server); + + if (klass->create_client) + client = klass->create_client (server); + if (client == NULL) + goto client_failed; + + /* a new client connected, create a client object to handle the client. */ + if (klass->accept_client) + result = klass->accept_client (server, client, socket, &error); + if (!result) + goto accept_failed; - if (server->io_channel == NULL) { - if (!gst_rtsp_server_sink_init_send (server)) - goto init_failed; + /* manage the client connection */ + manage_client (server, client); - /* create IO channel for the socket */ - server->io_channel = g_io_channel_unix_new (server->server_sock.fd); + g_signal_emit (server, gst_rtsp_server_signals[SIGNAL_CLIENT_CONNECTED], 0, + client); + } else { + GST_WARNING_OBJECT (server, "received unknown event %08x", condition); } - return server->io_channel; + return TRUE; -init_failed: + /* ERRORS */ +client_failed: { - GST_ERROR_OBJECT (server, "failed to initialize server"); - return NULL; + GST_ERROR_OBJECT (server, "failed to create a client"); + return FALSE; + } +accept_failed: + { + GST_ERROR_OBJECT (server, "failed to accept client: %s", error->message); + g_error_free (error); + g_object_unref (client); + return FALSE; } } +static void +watch_destroyed (GstRTSPServer * server) +{ + GST_DEBUG_OBJECT (server, "source destroyed"); + g_object_unref (server); +} + /** - * gst_rtsp_server_create_watch: + * gst_rtsp_server_create_source: * @server: a #GstRTSPServer + * @cancellable: a #GCancellable or %NULL. + * @error: a #GError * * Create a #GSource for @server. The new source will have a default - * #GIOFunc of gst_rtsp_server_io_func(). + * #GSocketSourceFunc of gst_rtsp_server_io_func(). + * + * @cancellable if not NULL can be used to cancel the source, which will cause + * the source to trigger, reporting the current condition (which is likely 0 + * unless cancellation happened at the same time as a condition change). You can + * check for this in the callback using g_cancellable_is_cancelled(). * - * Returns: the #GSource for @server or NULL when an error occured. + * Returns: the #GSource for @server or NULL when an error occured. Free with + * g_source_unref () */ GSource * -gst_rtsp_server_create_watch (GstRTSPServer * server) +gst_rtsp_server_create_source (GstRTSPServer * server, + GCancellable * cancellable, GError ** error) { + GSocket *socket, *old; + GSource *source; + g_return_val_if_fail (GST_IS_RTSP_SERVER (server), NULL); - if (server->io_watch == NULL) { - GIOChannel *channel; + socket = gst_rtsp_server_create_socket (server, NULL, error); + if (socket == NULL) + goto no_socket; - channel = gst_rtsp_server_get_io_channel (server); - if (channel == NULL) - goto no_channel; + GST_RTSP_SERVER_LOCK (server); + old = server->socket; + server->socket = g_object_ref (socket); + GST_RTSP_SERVER_UNLOCK (server); - /* create a watch for reads (new connections) and possible errors */ - server->io_watch = g_io_create_watch (channel, G_IO_IN | - G_IO_ERR | G_IO_HUP | G_IO_NVAL); + if (old) + g_object_unref (old); - /* configure the callback */ - g_source_set_callback (server->io_watch, - (GSourceFunc) gst_rtsp_server_io_func, server, NULL); - } - return server->io_watch; + /* create a watch for reads (new connections) and possible errors */ + source = g_socket_create_source (socket, G_IO_IN | + G_IO_ERR | G_IO_HUP | G_IO_NVAL, cancellable); + g_object_unref (socket); -no_channel: + /* configure the callback */ + g_source_set_callback (source, + (GSourceFunc) gst_rtsp_server_io_func, g_object_ref (server), + (GDestroyNotify) watch_destroyed); + + return source; + +no_socket: { - GST_ERROR_OBJECT (server, "failed to create IO channel"); + GST_ERROR_OBJECT (server, "failed to create socket"); return NULL; } } @@ -764,10 +1136,11 @@ no_channel: /** * gst_rtsp_server_attach: * @server: a #GstRTSPServer - * @context: a #GMainContext + * @context: (allow-none): a #GMainContext * * Attaches @server to @context. When the mainloop for @context is run, the - * server will be dispatched. + * server will be dispatched. When @context is NULL, the default context will be + * used). * * This function should be called when the server properties and urls are fully * configured and the server is ready to start. @@ -779,21 +1152,24 @@ gst_rtsp_server_attach (GstRTSPServer * server, GMainContext * context) { guint res; GSource *source; + GError *error = NULL; g_return_val_if_fail (GST_IS_RTSP_SERVER (server), 0); - source = gst_rtsp_server_create_watch (server); + source = gst_rtsp_server_create_source (server, NULL, &error); if (source == NULL) goto no_source; res = g_source_attach (source, context); + g_source_unref (source); return res; /* ERRORS */ no_source: { - GST_ERROR_OBJECT (server, "failed to create watch"); + GST_ERROR_OBJECT (server, "failed to create watch: %s", error->message); + g_error_free (error); return 0; } }