more distcheck fixes
[platform/upstream/glib.git] / gio / gthreadedsocketservice.c
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright © 2009 Codethink Limited
4  * Copyright © 2009 Red Hat, Inc
5  *
6  * This program is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU Lesser General Public License as published
8  * by the Free Software Foundation; either version 2 of the licence or (at
9  * your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Authors: Ryan Lortie <desrt@desrt.ca>
22  *          Alexander Larsson <alexl@redhat.com>
23  */
24
25 /**
26  * SECTION:gthreadedsocketservice
27  * @title: GThreadedSocketService
28  * @short_description: A threaded GSocketService
29  * @include: gio/gio.h
30  * @see_also: #GSocketService.
31  *
32  * A #GThreadedSocketService is a simple subclass of #GSocketService
33  * that handles incoming connections by creating a worker thread and
34  * dispatching the connection to it by emitting the
35  * #GThreadedSocketService::run signal in the new thread.
36  *
37  * The signal handler may perform blocking IO and need not return
38  * until the connection is closed.
39  *
40  * The service is implemented using a thread pool, so there is a
41  * limited amount of threads available to serve incoming requests.
42  * The service automatically stops the #GSocketService from accepting
43  * new connections when all threads are busy.
44  *
45  * As with #GSocketService, you may connect to #GThreadedSocketService::run,
46  * or subclass and override the default handler.
47  */
48
49 #include "config.h"
50 #include "gsocketconnection.h"
51 #include "gthreadedsocketservice.h"
52 #include "glibintl.h"
53
54 struct _GThreadedSocketServicePrivate
55 {
56   GThreadPool *thread_pool;
57   int max_threads;
58   gint job_count;
59 };
60
61 static guint g_threaded_socket_service_run_signal;
62
63 G_DEFINE_TYPE_WITH_PRIVATE (GThreadedSocketService,
64                             g_threaded_socket_service,
65                             G_TYPE_SOCKET_SERVICE)
66
67 enum
68 {
69   PROP_0,
70   PROP_MAX_THREADS
71 };
72
73 G_LOCK_DEFINE_STATIC(job_count);
74
75 typedef struct
76 {
77   GThreadedSocketService *service;
78   GSocketConnection *connection;
79   GObject *source_object;
80 } GThreadedSocketServiceData;
81
82 static void
83 g_threaded_socket_service_func (gpointer _data,
84                                 gpointer user_data)
85 {
86   GThreadedSocketService *threaded = user_data;
87   GThreadedSocketServiceData *data = _data;
88   gboolean result;
89
90   g_signal_emit (data->service, g_threaded_socket_service_run_signal,
91                  0, data->connection, data->source_object, &result);
92
93   g_object_unref (data->service);
94   g_object_unref (data->connection);
95   if (data->source_object)
96     g_object_unref (data->source_object);
97   g_slice_free (GThreadedSocketServiceData, data);
98
99   G_LOCK (job_count);
100   if (threaded->priv->job_count-- == threaded->priv->max_threads)
101     g_socket_service_start (G_SOCKET_SERVICE (threaded));
102   G_UNLOCK (job_count);
103 }
104
105 static gboolean
106 g_threaded_socket_service_incoming (GSocketService    *service,
107                                     GSocketConnection *connection,
108                                     GObject           *source_object)
109 {
110   GThreadedSocketService *threaded;
111   GThreadedSocketServiceData *data;
112
113   threaded = G_THREADED_SOCKET_SERVICE (service);
114
115   data = g_slice_new (GThreadedSocketServiceData);
116   data->service = g_object_ref (service);
117   data->connection = g_object_ref (connection);
118   if (source_object)
119     data->source_object = g_object_ref (source_object);
120   else
121     data->source_object = NULL;
122
123   G_LOCK (job_count);
124   if (++threaded->priv->job_count == threaded->priv->max_threads)
125     g_socket_service_stop (service);
126   G_UNLOCK (job_count);
127
128   g_thread_pool_push (threaded->priv->thread_pool, data, NULL);
129
130
131
132   return FALSE;
133 }
134
135 static void
136 g_threaded_socket_service_init (GThreadedSocketService *service)
137 {
138   service->priv = g_threaded_socket_service_get_instance_private (service);
139   service->priv->max_threads = 10;
140 }
141
142 static void
143 g_threaded_socket_service_constructed (GObject *object)
144 {
145   GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object);
146
147   service->priv->thread_pool =
148     g_thread_pool_new  (g_threaded_socket_service_func,
149                         service,
150                         service->priv->max_threads,
151                         FALSE,
152                         NULL);
153 }
154
155
156 static void
157 g_threaded_socket_service_finalize (GObject *object)
158 {
159   GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object);
160
161   g_thread_pool_free (service->priv->thread_pool, FALSE, TRUE);
162
163   G_OBJECT_CLASS (g_threaded_socket_service_parent_class)
164     ->finalize (object);
165 }
166
167 static void
168 g_threaded_socket_service_get_property (GObject    *object,
169                                         guint       prop_id,
170                                         GValue     *value,
171                                         GParamSpec *pspec)
172 {
173   GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object);
174
175   switch (prop_id)
176     {
177       case PROP_MAX_THREADS:
178         g_value_set_int (value, service->priv->max_threads);
179         break;
180
181       default:
182         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
183     }
184 }
185
186 static void
187 g_threaded_socket_service_set_property (GObject      *object,
188                                         guint         prop_id,
189                                         const GValue *value,
190                                         GParamSpec   *pspec)
191 {
192   GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object);
193
194   switch (prop_id)
195     {
196       case PROP_MAX_THREADS:
197         service->priv->max_threads = g_value_get_int (value);
198         break;
199
200       default:
201         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
202     }
203 }
204
205
206 static void
207 g_threaded_socket_service_class_init (GThreadedSocketServiceClass *class)
208 {
209   GObjectClass *gobject_class = G_OBJECT_CLASS (class);
210   GSocketServiceClass *ss_class = &class->parent_class;
211
212   gobject_class->constructed = g_threaded_socket_service_constructed;
213   gobject_class->finalize = g_threaded_socket_service_finalize;
214   gobject_class->set_property = g_threaded_socket_service_set_property;
215   gobject_class->get_property = g_threaded_socket_service_get_property;
216
217   ss_class->incoming = g_threaded_socket_service_incoming;
218
219   /**
220    * GThreadedSocketService::run:
221    * @service: the #GThreadedSocketService.
222    * @connection: a new #GSocketConnection object.
223    * @source_object: the source_object passed to g_socket_listener_add_address().
224    *
225    * The ::run signal is emitted in a worker thread in response to an
226    * incoming connection. This thread is dedicated to handling
227    * @connection and may perform blocking IO. The signal handler need
228    * not return until the connection is closed.
229    *
230    * Returns: %TRUE to stop further signal handlers from being called
231    */
232   g_threaded_socket_service_run_signal =
233     g_signal_new ("run", G_TYPE_FROM_CLASS (class), G_SIGNAL_RUN_LAST,
234                   G_STRUCT_OFFSET (GThreadedSocketServiceClass, run),
235                   g_signal_accumulator_true_handled, NULL,
236                   NULL, G_TYPE_BOOLEAN,
237                   2, G_TYPE_SOCKET_CONNECTION, G_TYPE_OBJECT);
238
239   g_object_class_install_property (gobject_class, PROP_MAX_THREADS,
240                                    g_param_spec_int ("max-threads",
241                                                      P_("Max threads"),
242                                                      P_("The max number of threads handling clients for this service"),
243                                                      -1,
244                                                      G_MAXINT,
245                                                      10,
246                                                      G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
247 }
248
249 /**
250  * g_threaded_socket_service_new:
251  * @max_threads: the maximal number of threads to execute concurrently
252  *   handling incoming clients, -1 means no limit
253  *
254  * Creates a new #GThreadedSocketService with no listeners. Listeners
255  * must be added with one of the #GSocketListener "add" methods.
256  *
257  * Returns: a new #GSocketService.
258  *
259  * Since: 2.22
260  */
261 GSocketService *
262 g_threaded_socket_service_new (int max_threads)
263 {
264   return g_object_new (G_TYPE_THREADED_SOCKET_SERVICE,
265                        "max-threads", max_threads,
266                        NULL);
267 }