Imported Upstream version 3.2.0
[platform/upstream/libwebsockets.git] / minimal-examples / ws-server / minimal-ws-server-threadpool / protocol_lws_minimal_threadpool.c
1 /*
2  * ws protocol handler plugin for "lws-minimal" demonstrating lws threadpool
3  *
4  * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5  *
6  * This file is made available under the Creative Commons CC0 1.0
7  * Universal Public Domain Dedication.
8  *
9  * The main reason some things are as they are is that the task lifecycle may
10  * be unrelated to the wsi lifecycle that queued that task.
11  *
12  * Consider the task may call an external library and run for 30s without
13  * "checking in" to see if it should stop.  The wsi that started the task may
14  * have closed at any time before the 30s are up, with the browser window
15  * closing or whatever.
16  *
17  * So data shared between the asynchronous task and the wsi must have its
18  * lifecycle determined by the task, not the wsi.  That means a separate struct
19  * that can be freed by the task.
20  *
21  * In the case the wsi outlives the task, the tasks do not get destroyed until
22  * the service thread has called lws_threadpool_task_status() on the completed
23  * task.  So there is no danger of the shared task private data getting randomly
24  * freed.
25  */
26
27 #if !defined (LWS_PLUGIN_STATIC)
28 #define LWS_DLL
29 #define LWS_INTERNAL
30 #include <libwebsockets.h>
31 #endif
32
33 #include <string.h>
34
35 struct per_vhost_data__minimal {
36         struct lws_threadpool *tp;
37         const char *config;
38 };
39
40 struct task_data {
41         char result[64];
42
43         uint64_t pos, end;
44 };
45
46 /*
47  * Create the private data for the task
48  *
49  * Notice we hand over responsibility for the cleanup and freeing of the
50  * allocated task_data to the threadpool, because the wsi it was originally
51  * bound to may close while the thread is still running.  So we allocate
52  * something discrete for the task private data that can be definitively owned
53  * and freed by the threadpool, not the wsi... the pss won't do, as it only
54  * exists for the lifecycle of the wsi connection.
55  *
56  * When the task is created, we also tell it how to destroy the private data
57  * by giving it args.cleanup as cleanup_task_private_data() defined below.
58  */
59
60 static struct task_data *
61 create_task_private_data(void)
62 {
63         struct task_data *priv = malloc(sizeof(*priv));
64
65         return priv;
66 }
67
68 /*
69  * Destroy the private data for the task
70  *
71  * Notice the wsi the task was originally bound to may be long gone, in the
72  * case we are destroying the lws context and the thread was doing something
73  * for a long time without checking in.
74  */
75 static void
76 cleanup_task_private_data(struct lws *wsi, void *user)
77 {
78         struct task_data *priv = (struct task_data *)user;
79
80         free(priv);
81 }
82
83 /*
84  * This runs in its own thread, from the threadpool.
85  *
86  * The implementation behind this in lws uses pthreads, but no pthreadisms are
87  * required in the user code.
88  *
89  * The example counts to 10M, "checking in" to see if it should stop after every
90  * 100K and pausing to sync with the service thread to send a ws message every
91  * 1M.  It resumes after the service thread determines the wsi is writable and
92  * the LWS_CALLBACK_SERVER_WRITEABLE indicates the task thread can continue by
93  * calling lws_threadpool_task_sync().
94  */
95
96 static enum lws_threadpool_task_return
97 task_function(void *user, enum lws_threadpool_task_status s)
98 {
99         struct task_data *priv = (struct task_data *)user;
100         int budget = 100 * 1000;
101
102         if (priv->pos == priv->end)
103                 return LWS_TP_RETURN_FINISHED;
104
105         /*
106          * Preferably replace this with ~100ms of your real task, so it
107          * can "check in" at short intervals to see if it has been asked to
108          * stop.
109          *
110          * You can just run tasks atomically here with the thread dedicated
111          * to it, but it will cause odd delays while shutting down etc and
112          * the task will run to completion even if the wsi that started it
113          * has since closed.
114          */
115
116         while (budget--)
117                 priv->pos++;
118
119         usleep(100000);
120
121         if (!(priv->pos % (1000 * 1000))) {
122                 lws_snprintf(priv->result + LWS_PRE,
123                              sizeof(priv->result) - LWS_PRE,
124                              "pos %llu", (unsigned long long)priv->pos);
125
126                 return LWS_TP_RETURN_SYNC;
127         }
128
129         return LWS_TP_RETURN_CHECKING_IN;
130 }
131
132 static int
133 callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
134                         void *user, void *in, size_t len)
135 {
136         struct per_vhost_data__minimal *vhd =
137                         (struct per_vhost_data__minimal *)
138                         lws_protocol_vh_priv_get(lws_get_vhost(wsi),
139                                         lws_get_protocol(wsi));
140         const struct lws_protocol_vhost_options *pvo;
141         struct lws_threadpool_create_args cargs;
142         struct lws_threadpool_task_args args;
143         struct lws_threadpool_task *task;
144         struct task_data *priv;
145         int n, m, r = 0;
146         char name[32];
147         void *_user;
148
149         switch (reason) {
150         case LWS_CALLBACK_PROTOCOL_INIT:
151                 /* create our per-vhost struct */
152                 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
153                                 lws_get_protocol(wsi),
154                                 sizeof(struct per_vhost_data__minimal));
155                 if (!vhd)
156                         return 1;
157
158                 /* recover the pointer to the globals struct */
159                 pvo = lws_pvo_search(
160                         (const struct lws_protocol_vhost_options *)in,
161                         "config");
162                 if (!pvo || !pvo->value) {
163                         lwsl_err("%s: Can't find \"config\" pvo\n", __func__);
164                         return 1;
165                 }
166                 vhd->config = pvo->value;
167
168                 memset(&cargs, 0, sizeof(cargs));
169
170                 cargs.max_queue_depth = 8;
171                 cargs.threads = 3;
172                 vhd->tp = lws_threadpool_create(lws_get_context(wsi),
173                                 &cargs, "%s",
174                                 lws_get_vhost_name(lws_get_vhost(wsi)));
175                 if (!vhd->tp)
176                         return 1;
177
178                 lws_timed_callback_vh_protocol(lws_get_vhost(wsi),
179                                                lws_get_protocol(wsi),
180                                                LWS_CALLBACK_USER, 1);
181
182                 break;
183
184         case LWS_CALLBACK_PROTOCOL_DESTROY:
185                 lws_threadpool_finish(vhd->tp);
186                 lws_threadpool_destroy(vhd->tp);
187                 break;
188
189         case LWS_CALLBACK_USER:
190
191                 /*
192                  * in debug mode, dump the threadpool stat to the logs once
193                  * a second
194                  */
195                 lws_threadpool_dump(vhd->tp);
196                 lws_timed_callback_vh_protocol(lws_get_vhost(wsi),
197                                                lws_get_protocol(wsi),
198                                                LWS_CALLBACK_USER, 1);
199                 break;
200
201         case LWS_CALLBACK_ESTABLISHED:
202
203                 memset(&args, 0, sizeof(args));
204                 priv = args.user = create_task_private_data();
205                 if (!args.user)
206                         return 1;
207
208                 priv->pos = 0;
209                 priv->end = 10 * 1000 * 1000;
210
211                 /* queue the task... the task takes on responsibility for
212                  * destroying args.user.  pss->priv just has a copy of it */
213
214                 args.wsi = wsi;
215                 args.task = task_function;
216                 args.cleanup = cleanup_task_private_data;
217
218                 lws_get_peer_simple(wsi, name, sizeof(name));
219
220                 if (!lws_threadpool_enqueue(vhd->tp, &args, "ws %s", name)) {
221                         lwsl_user("%s: Couldn't enqueue task\n", __func__);
222                         cleanup_task_private_data(wsi, priv);
223                         return 1;
224                 }
225
226                 lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL, 30);
227
228                 /*
229                  * so the asynchronous worker will let us know the next step
230                  * by causing LWS_CALLBACK_SERVER_WRITEABLE
231                  */
232
233                 break;
234
235         case LWS_CALLBACK_CLOSED:
236                 break;
237
238         case LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL:
239                 lwsl_debug("LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL: %p\n", wsi);
240                 lws_threadpool_dequeue(wsi);
241                 break;
242
243         case LWS_CALLBACK_SERVER_WRITEABLE:
244
245                 /*
246                  * even completed tasks wait in a queue until we call the
247                  * below on them.  Then they may destroy themselves and their
248                  * args.user data (by calling the cleanup callback).
249                  *
250                  * If you need to get things from the still-valid private task
251                  * data, copy it here before calling
252                  * lws_threadpool_task_status() that may free the task and the
253                  * private task data.
254                  */
255
256                 n = lws_threadpool_task_status_wsi(wsi, &task, &_user);
257                 lwsl_debug("%s: LWS_CALLBACK_SERVER_WRITEABLE: status %d\n",
258                            __func__, n);
259                 switch(n) {
260
261                 case LWS_TP_STATUS_FINISHED:
262                 case LWS_TP_STATUS_STOPPED:
263                 case LWS_TP_STATUS_QUEUED:
264                 case LWS_TP_STATUS_RUNNING:
265                 case LWS_TP_STATUS_STOPPING:
266                         return 0;
267
268                 case LWS_TP_STATUS_SYNCING:
269                         /* the task has paused for us to do something */
270                         break;
271                 default:
272                         return -1;
273                 }
274
275                 priv = (struct task_data *)_user;
276
277                 lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL_TASK, 5);
278
279                 n = strlen(priv->result + LWS_PRE);
280                 m = lws_write(wsi, (unsigned char *)priv->result + LWS_PRE,
281                               n, LWS_WRITE_TEXT);
282                 if (m < n) {
283                         lwsl_err("ERROR %d writing to ws socket\n", m);
284                         lws_threadpool_task_sync(task, 1);
285                         return -1;
286                 }
287
288                 /*
289                  * service thread has done whatever it wanted to do with the
290                  * data the task produced: if it's waiting to do more it can
291                  * continue now.
292                  */
293                 lws_threadpool_task_sync(task, 0);
294                 break;
295
296         default:
297                 break;
298         }
299
300         return r;
301 }
302
303 #define LWS_PLUGIN_PROTOCOL_MINIMAL \
304         { \
305                 "lws-minimal", \
306                 callback_minimal, \
307                 0, \
308                 128, \
309                 0, NULL, 0 \
310         }
311
312 #if !defined (LWS_PLUGIN_STATIC)
313
314 /* boilerplate needed if we are built as a dynamic plugin */
315
316 static const struct lws_protocols protocols[] = {
317         LWS_PLUGIN_PROTOCOL_MINIMAL
318 };
319
320 LWS_EXTERN LWS_VISIBLE int
321 init_protocol_minimal(struct lws_context *context,
322                       struct lws_plugin_capability *c)
323 {
324         if (c->api_magic != LWS_PLUGIN_API_MAGIC) {
325                 lwsl_err("Plugin API %d, library API %d", LWS_PLUGIN_API_MAGIC,
326                          c->api_magic);
327                 return 1;
328         }
329
330         c->protocols = protocols;
331         c->count_protocols = LWS_ARRAY_SIZE(protocols);
332         c->extensions = NULL;
333         c->count_extensions = 0;
334
335         return 0;
336 }
337
338 LWS_EXTERN LWS_VISIBLE int
339 destroy_protocol_minimal(struct lws_context *context)
340 {
341         return 0;
342 }
343 #endif