gio: GCancellable can be used concurrently
[platform/upstream/glib.git] / gio / gthreadedsocketservice.c
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright © 2009 Codethink Limited
4  * Copyright © 2009 Red Hat, Inc
5  *
6  * This program is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU Lesser General Public License as published
8  * by the Free Software Foundation; either version 2 of the licence or (at
9  * your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Authors: Ryan Lortie <desrt@desrt.ca>
22  *          Alexander Larsson <alexl@redhat.com>
23  */
24
25 /**
26  * SECTION:gthreadedsocketservice
27  * @title: GThreadedSocketService
28  * @short_description: A threaded GSocketService
29  * @see_also: #GSocketService.
30  *
31  * A #GThreadedSocketService is a simple subclass of #GSocketService
32  * that handles incoming connections by creating a worker thread and
33  * dispatching the connection to it by emitting the
34  * #GThreadedSocketService::run signal in the new thread.
35  *
36  * The signal handler may perform blocking IO and need not return
37  * until the connection is closed.
38  *
39  * The service is implemented using a thread pool, so there is a
40  * limited amount of threads available to serve incoming requests.
41  * The service automatically stops the #GSocketService from accepting
42  * new connections when all threads are busy.
43  *
44  * As with #GSocketService, you may connect to #GThreadedSocketService::run,
45  * or subclass and override the default handler.
46  */
47
48 #include "config.h"
49 #include "gsocketconnection.h"
50 #include "gthreadedsocketservice.h"
51 #include "glibintl.h"
52
53
54 static guint g_threaded_socket_service_run_signal;
55
56 G_DEFINE_TYPE (GThreadedSocketService,
57                g_threaded_socket_service,
58                G_TYPE_SOCKET_SERVICE);
59
60 enum
61 {
62   PROP_0,
63   PROP_MAX_THREADS
64 };
65
66
67 G_LOCK_DEFINE_STATIC(job_count);
68
69 struct _GThreadedSocketServicePrivate
70 {
71   GThreadPool *thread_pool;
72   int max_threads;
73   gint job_count;
74 };
75
76 typedef struct
77 {
78   GThreadedSocketService *service;
79   GSocketConnection *connection;
80   GObject *source_object;
81 } GThreadedSocketServiceData;
82
83 static void
84 g_threaded_socket_service_func (gpointer _data,
85                                 gpointer user_data)
86 {
87   GThreadedSocketService *threaded = user_data;
88   GThreadedSocketServiceData *data = _data;
89   gboolean result;
90
91   g_signal_emit (data->service, g_threaded_socket_service_run_signal,
92                  0, data->connection, data->source_object, &result);
93
94   g_object_unref (data->service);
95   g_object_unref (data->connection);
96   if (data->source_object)
97     g_object_unref (data->source_object);
98   g_slice_free (GThreadedSocketServiceData, data);
99
100   G_LOCK (job_count);
101   if (threaded->priv->job_count-- == threaded->priv->max_threads)
102     g_socket_service_start (G_SOCKET_SERVICE (threaded));
103   G_UNLOCK (job_count);
104 }
105
106 static gboolean
107 g_threaded_socket_service_incoming (GSocketService    *service,
108                                     GSocketConnection *connection,
109                                     GObject           *source_object)
110 {
111   GThreadedSocketService *threaded;
112   GThreadedSocketServiceData *data;
113
114   threaded = G_THREADED_SOCKET_SERVICE (service);
115
116   data = g_slice_new (GThreadedSocketServiceData);
117   data->service = g_object_ref (service);
118   data->connection = g_object_ref (connection);
119   if (source_object)
120     data->source_object = g_object_ref (source_object);
121   else
122     data->source_object = NULL;
123
124   G_LOCK (job_count);
125   if (++threaded->priv->job_count == threaded->priv->max_threads)
126     g_socket_service_stop (service);
127   G_UNLOCK (job_count);
128
129   g_thread_pool_push (threaded->priv->thread_pool, data, NULL);
130
131
132
133   return FALSE;
134 }
135
136 static void
137 g_threaded_socket_service_init (GThreadedSocketService *service)
138 {
139   service->priv = G_TYPE_INSTANCE_GET_PRIVATE (service,
140                                                G_TYPE_THREADED_SOCKET_SERVICE,
141                                                GThreadedSocketServicePrivate);
142   service->priv->max_threads = 10;
143 }
144
145 static void
146 g_threaded_socket_service_constructed (GObject *object)
147 {
148   GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object);
149
150   service->priv->thread_pool =
151     g_thread_pool_new  (g_threaded_socket_service_func,
152                         service,
153                         service->priv->max_threads,
154                         FALSE,
155                         NULL);
156 }
157
158
159 static void
160 g_threaded_socket_service_finalize (GObject *object)
161 {
162   GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object);
163
164   g_thread_pool_free (service->priv->thread_pool, FALSE, TRUE);
165
166   G_OBJECT_CLASS (g_threaded_socket_service_parent_class)
167     ->finalize (object);
168 }
169
170 static void
171 g_threaded_socket_service_get_property (GObject    *object,
172                                         guint       prop_id,
173                                         GValue     *value,
174                                         GParamSpec *pspec)
175 {
176   GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object);
177
178   switch (prop_id)
179     {
180       case PROP_MAX_THREADS:
181         g_value_set_int (value, service->priv->max_threads);
182         break;
183
184       default:
185         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
186     }
187 }
188
189 static void
190 g_threaded_socket_service_set_property (GObject      *object,
191                                         guint         prop_id,
192                                         const GValue *value,
193                                         GParamSpec   *pspec)
194 {
195   GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object);
196
197   switch (prop_id)
198     {
199       case PROP_MAX_THREADS:
200         service->priv->max_threads = g_value_get_int (value);
201         break;
202
203       default:
204         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
205     }
206 }
207
208
209 static void
210 g_threaded_socket_service_class_init (GThreadedSocketServiceClass *class)
211 {
212   GObjectClass *gobject_class = G_OBJECT_CLASS (class);
213   GSocketServiceClass *ss_class = &class->parent_class;
214
215   g_type_class_add_private (class, sizeof (GThreadedSocketServicePrivate));
216
217   gobject_class->constructed = g_threaded_socket_service_constructed;
218   gobject_class->finalize = g_threaded_socket_service_finalize;
219   gobject_class->set_property = g_threaded_socket_service_set_property;
220   gobject_class->get_property = g_threaded_socket_service_get_property;
221
222   ss_class->incoming = g_threaded_socket_service_incoming;
223
224   /**
225    * GThreadedSocketService::run:
226    * @service: the #GThreadedSocketService.
227    * @connection: a new #GSocketConnection object.
228    * @source_object: the source_object passed to g_socket_listener_add_address().
229    *
230    * The ::run signal is emitted in a worker thread in response to an
231    * incoming connection. This thread is dedicated to handling
232    * @connection and may perform blocking IO. The signal handler need
233    * not return until the connection is closed.
234    *
235    * Returns: %TRUE to stop further signal handlers from being called
236    */
237   g_threaded_socket_service_run_signal =
238     g_signal_new ("run", G_TYPE_FROM_CLASS (class), G_SIGNAL_RUN_LAST,
239                   G_STRUCT_OFFSET (GThreadedSocketServiceClass, run),
240                   g_signal_accumulator_true_handled, NULL,
241                   NULL, G_TYPE_BOOLEAN,
242                   2, G_TYPE_SOCKET_CONNECTION, G_TYPE_OBJECT);
243
244   g_object_class_install_property (gobject_class, PROP_MAX_THREADS,
245                                    g_param_spec_int ("max-threads",
246                                                      P_("Max threads"),
247                                                      P_("The max number of threads handling clients for this service"),
248                                                      -1,
249                                                      G_MAXINT,
250                                                      10,
251                                                      G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
252 }
253
254 /**
255  * g_threaded_socket_service_new:
256  * @max_threads: the maximal number of threads to execute concurrently
257  *   handling incoming clients, -1 means no limit
258  *
259  * Creates a new #GThreadedSocketService with no listeners. Listeners
260  * must be added with one of the #GSocketListener "add" methods.
261  *
262  * Returns: a new #GSocketService.
263  *
264  * Since: 2.22
265  */
266 GSocketService *
267 g_threaded_socket_service_new (int max_threads)
268 {
269   return g_object_new (G_TYPE_THREADED_SOCKET_SERVICE,
270                        "max-threads", max_threads,
271                        NULL);
272 }