mirror: multiple mirror contexts by mirror= url arg
[platform/upstream/libwebsockets.git] / plugins / protocol_lws_mirror.c
1 /*
2  * libwebsockets-test-server - libwebsockets test implementation
3  *
4  * Copyright (C) 2010-2016 Andy Green <andy@warmcat.com>
5  *
6  * This file is made available under the Creative Commons CC0 1.0
7  * Universal Public Domain Dedication.
8  *
9  * The person who associated a work with this deed has dedicated
10  * the work to the public domain by waiving all of his or her rights
11  * to the work worldwide under copyright law, including all related
12  * and neighboring rights, to the extent allowed by law. You can copy,
13  * modify, distribute and perform the work, even for commercial purposes,
14  * all without asking permission.
15  *
16  * The test apps are intended to be adapted for use in your code, which
17  * may be proprietary.  So unlike the library itself, they are licensed
18  * Public Domain.
19  */
20
21 #if !defined (LWS_PLUGIN_STATIC)
22 #define LWS_DLL
23 #define LWS_INTERNAL
24 #include "../lib/libwebsockets.h"
25 #endif
26
27 #include <string.h>
28 #include <stdlib.h>
29
30 /* lws-mirror_protocol */
31
32 #if defined(LWS_WITH_ESP8266)
33 #define MAX_MESSAGE_QUEUE 64
34 #else
35 #define MAX_MESSAGE_QUEUE 512
36 #endif
37
38 #define MAX_MIRROR_INSTANCES 10
39
40 struct lws_mirror_instance;
41
42 struct per_session_data__lws_mirror {
43         struct lws *wsi;
44         struct lws_mirror_instance *mi;
45         struct per_session_data__lws_mirror *same_mi_pss_list;
46         int ringbuffer_tail;
47 };
48
49 struct a_message {
50         void *payload;
51         size_t len;
52 };
53
54 struct lws_mirror_instance {
55         struct lws_mirror_instance *next;
56         struct per_session_data__lws_mirror *same_mi_pss_list;
57         char name[30];
58         struct a_message ringbuffer[MAX_MESSAGE_QUEUE];
59         int ringbuffer_head;
60 };
61
62 struct per_vhost_data__lws_mirror {
63         struct lws_mirror_instance *mi_list;
64 };
65
66 static int
67 callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
68                     void *user, void *in, size_t len)
69 {
70         struct per_session_data__lws_mirror *pss =
71                         (struct per_session_data__lws_mirror *)user, **ppss,
72                         *pss1;
73         struct per_vhost_data__lws_mirror *v =
74                         (struct per_vhost_data__lws_mirror *)
75                         lws_protocol_vh_priv_get(lws_get_vhost(wsi),
76                                         lws_get_protocol(wsi));
77         struct lws_mirror_instance *mi, **pmi;
78         char name[30];
79         int n, m, count_mi = 0;
80
81         switch (reason) {
82
83         case LWS_CALLBACK_ESTABLISHED:
84                 lwsl_info("%s: LWS_CALLBACK_ESTABLISHED\n", __func__);
85
86                 /*
87                  * mirror instance name... defaults to "", but if URL includes
88                  * "?mirror=xxx", will be "xxx"
89                  */
90
91                 name[0] = '\0';
92                 lws_get_urlarg_by_name(wsi, "mirror", name, sizeof(name) - 1);
93
94                 lwsl_notice("mirror %s\n", name);
95
96                 /* is there already a mirror instance of this name? */
97
98                 mi = v->mi_list;
99                 while (mi) {
100                         if (!strcmp(name, mi->name)) {
101                                 lwsl_notice("Joining existing mi %p '%s'\n",
102                                                 mi, name);
103                                 /* yes... we will join it */
104                                 break;
105                         }
106                         count_mi++;
107                         mi = mi->next;
108                 }
109
110                 if (!mi) {
111
112                         /* no existing mirror instance for name */
113
114                         if (count_mi == MAX_MIRROR_INSTANCES)
115                                 return -1;
116
117                         /* create one with this name, and join it */
118
119                         mi = malloc(sizeof(*mi));
120                         memset(mi, 0, sizeof(*mi));
121                         mi->next = v->mi_list;
122                         v->mi_list = mi;
123                         strcpy(mi->name, name);
124                         mi->ringbuffer_head = 0;
125
126
127                         lwsl_notice("Created new mi %p '%s'\n", mi, name);
128                 }
129
130                 /* add our pss to list of guys bound to this mi */
131
132                 pss->same_mi_pss_list = mi->same_mi_pss_list;
133                 mi->same_mi_pss_list = pss;
134
135                 /* init the pss */
136
137                 pss->mi = mi;
138                 pss->ringbuffer_tail = mi->ringbuffer_head;
139                 pss->wsi = wsi;
140
141                 break;
142
143         case LWS_CALLBACK_CLOSED:
144
145                 /* detach our pss from the mirror instance */
146
147                 mi = pss->mi;
148                 if (!mi)
149                         break;
150                 ppss = &mi->same_mi_pss_list;
151
152                 while (*ppss) {
153                         if (*ppss == pss) {
154                                 *ppss = pss->same_mi_pss_list;
155                                 break;
156                         }
157
158                         ppss = &(*ppss)->same_mi_pss_list;
159                 }
160
161                 if (!mi->same_mi_pss_list) {
162
163                         /* last pss unbound from mi... delete mi */
164
165                         pmi = &v->mi_list;
166                         while (*pmi) {
167                                 if (*pmi == mi) {
168                                         *pmi = (*pmi)->next;
169
170                                         if (!pss->mi)
171                                                 break;
172                                         lwsl_info("%s: mirror protocol cleaning up %p\n", __func__, v);
173                                         for (n = 0; n < ARRAY_SIZE(pss->mi->ringbuffer); n++)
174                                                 if (pss->mi->ringbuffer[n].payload) {
175                                                         free(pss->mi->ringbuffer[n].payload);
176                                                         pss->mi->ringbuffer[n].payload = NULL;
177                                                 }
178
179                                         free(mi);
180                                         break;
181                                 }
182                                 count_mi++;
183                                 pmi = &(*pmi)->next;
184                         }
185                 }
186
187                 break;
188
189         case LWS_CALLBACK_PROTOCOL_INIT: /* per vhost */
190                 lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
191                                 lws_get_protocol(wsi),
192                                 sizeof(struct per_vhost_data__lws_mirror));
193                 break;
194
195         case LWS_CALLBACK_PROTOCOL_DESTROY: /* per vhost */
196                 break;
197
198         case LWS_CALLBACK_SERVER_WRITEABLE:
199                 while (pss->ringbuffer_tail != pss->mi->ringbuffer_head) {
200                         m = pss->mi->ringbuffer[pss->ringbuffer_tail].len;
201                         n = lws_write(wsi, (unsigned char *)
202                                         pss->mi->ringbuffer[pss->ringbuffer_tail].payload +
203                                    LWS_PRE, m, LWS_WRITE_TEXT);
204                         if (n < 0) {
205                                 lwsl_err("ERROR %d writing to mirror socket\n", n);
206                                 return -1;
207                         }
208                         if (n < m)
209                                 lwsl_err("mirror partial write %d vs %d\n", n, m);
210
211                         if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
212                                 pss->ringbuffer_tail = 0;
213                         else
214                                 pss->ringbuffer_tail++;
215
216                         if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) &
217                             (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15))
218                                 lws_rx_flow_allow_all_protocol(lws_get_context(wsi),
219                                                lws_get_protocol(wsi));
220
221                         if (lws_send_pipe_choked(wsi)) {
222                                 lws_callback_on_writable(wsi);
223                                 break;
224                         }
225                 }
226                 break;
227
228         case LWS_CALLBACK_RECEIVE:
229                 if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) &
230                     (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) {
231                         lwsl_err("dropping!\n");
232                         goto choke;
233                 }
234
235                 if (pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload)
236                         free(pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload);
237
238                 pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload = malloc(LWS_PRE + len);
239                 pss->mi->ringbuffer[pss->mi->ringbuffer_head].len = len;
240                 memcpy((char *)pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload +
241                        LWS_PRE, in, len);
242                 if (pss->mi->ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
243                         pss->mi->ringbuffer_head = 0;
244                 else
245                         pss->mi->ringbuffer_head++;
246
247                 if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) &
248                     (MAX_MESSAGE_QUEUE - 1)) != (MAX_MESSAGE_QUEUE - 2))
249                         goto done;
250
251 choke:
252                 lwsl_debug("LWS_CALLBACK_RECEIVE: throttling %p\n", wsi);
253                 lws_rx_flow_control(wsi, 0);
254
255 done:
256                 /*
257                  *  ask for WRITABLE callback for every wsi bound to this
258                  * mirror instance
259                  */
260
261                 pss1 = pss->mi->same_mi_pss_list;
262                 while (pss1) {
263                         lws_callback_on_writable(pss1->wsi);
264                         pss1 = pss1->same_mi_pss_list;
265                 }
266                 break;
267
268         default:
269                 break;
270         }
271
272         return 0;
273 }
274
275 #define LWS_PLUGIN_PROTOCOL_MIRROR { \
276                 "lws-mirror-protocol", \
277                 callback_lws_mirror, \
278                 sizeof(struct per_session_data__lws_mirror), \
279                 128, /* rx buf size must be >= permessage-deflate rx size */ \
280         }
281
282 #if !defined (LWS_PLUGIN_STATIC)
283
284 static const struct lws_protocols protocols[] = {
285         LWS_PLUGIN_PROTOCOL_MIRROR
286 };
287
288 LWS_EXTERN LWS_VISIBLE int
289 init_protocol_lws_mirror(struct lws_context *context,
290                              struct lws_plugin_capability *c)
291 {
292         if (c->api_magic != LWS_PLUGIN_API_MAGIC) {
293                 lwsl_err("Plugin API %d, library API %d", LWS_PLUGIN_API_MAGIC,
294                          c->api_magic);
295                 return 1;
296         }
297
298         c->protocols = protocols;
299         c->count_protocols = ARRAY_SIZE(protocols);
300         c->extensions = NULL;
301         c->count_extensions = 0;
302
303         return 0;
304 }
305
306 LWS_EXTERN LWS_VISIBLE int
307 destroy_protocol_lws_mirror(struct lws_context *context)
308 {
309         return 0;
310 }
311 #endif