afac1b6d5e5b0f2316f97673cf630965dfe1cca7
[platform/upstream/libwebsockets.git] / plugins / protocol_lws_mirror.c
1 /*
2  * libwebsockets-test-server - libwebsockets test implementation
3  *
4  * Copyright (C) 2010-2016 Andy Green <andy@warmcat.com>
5  *
6  * This file is made available under the Creative Commons CC0 1.0
7  * Universal Public Domain Dedication.
8  *
9  * The person who associated a work with this deed has dedicated
10  * the work to the public domain by waiving all of his or her rights
11  * to the work worldwide under copyright law, including all related
12  * and neighboring rights, to the extent allowed by law. You can copy,
13  * modify, distribute and perform the work, even for commercial purposes,
14  * all without asking permission.
15  *
16  * The test apps are intended to be adapted for use in your code, which
17  * may be proprietary.  So unlike the library itself, they are licensed
18  * Public Domain.
19  */
20
21 #if !defined (LWS_PLUGIN_STATIC)
22 #define LWS_DLL
23 #define LWS_INTERNAL
24 #include "../lib/libwebsockets.h"
25 #endif
26
27 #include <string.h>
28 #include <stdlib.h>
29
30 /* lws-mirror_protocol */
31
32 #if defined(LWS_WITH_ESP8266)
33 #define MAX_MESSAGE_QUEUE 64
34 #else
35 #define MAX_MESSAGE_QUEUE 512
36 #endif
37
38 #define MAX_MIRROR_INSTANCES 10
39
40 struct lws_mirror_instance;
41
42 struct per_session_data__lws_mirror {
43         struct lws *wsi;
44         struct lws_mirror_instance *mi;
45         struct per_session_data__lws_mirror *same_mi_pss_list;
46         int ringbuffer_tail;
47 };
48
49 struct a_message {
50         void *payload;
51         size_t len;
52 };
53
54 struct lws_mirror_instance {
55         struct lws_mirror_instance *next;
56         struct per_session_data__lws_mirror *same_mi_pss_list;
57         char name[30];
58         struct a_message ringbuffer[MAX_MESSAGE_QUEUE];
59         int ringbuffer_head;
60 };
61
62 struct per_vhost_data__lws_mirror {
63         struct lws_mirror_instance *mi_list;
64 };
65
66 static int
67 callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
68                     void *user, void *in, size_t len)
69 {
70         struct per_session_data__lws_mirror *pss =
71                         (struct per_session_data__lws_mirror *)user;
72         struct per_vhost_data__lws_mirror *v =
73                         (struct per_vhost_data__lws_mirror *)
74                         lws_protocol_vh_priv_get(lws_get_vhost(wsi),
75                                         lws_get_protocol(wsi));
76         struct lws_mirror_instance *mi = NULL;
77         char name[30];
78         int n, m, count_mi = 0;
79
80         switch (reason) {
81
82         case LWS_CALLBACK_ESTABLISHED:
83                 lwsl_info("%s: LWS_CALLBACK_ESTABLISHED\n", __func__);
84
85                 /*
86                  * mirror instance name... defaults to "", but if URL includes
87                  * "?mirror=xxx", will be "xxx"
88                  */
89
90                 name[0] = '\0';
91                 lws_get_urlarg_by_name(wsi, "mirror", name, sizeof(name) - 1);
92
93                 lwsl_notice("mirror %s\n", name);
94
95                 /* is there already a mirror instance of this name? */
96
97                 lws_start_foreach_ll(struct lws_mirror_instance *,
98                                      mi1, v->mi_list) {
99                         count_mi++;
100                         if (strcmp(name, mi1->name))
101                                 continue;
102                         /* yes... we will join it */
103                         lwsl_notice("Joining existing mi %p '%s'\n", mi1, name);
104                         mi = mi1;
105                         break;
106                 } lws_end_foreach_ll(mi1, next);
107
108                 if (!mi) {
109
110                         /* no existing mirror instance for name */
111
112                         if (count_mi == MAX_MIRROR_INSTANCES)
113                                 return -1;
114
115                         /* create one with this name, and join it */
116
117                         mi = malloc(sizeof(*mi));
118                         memset(mi, 0, sizeof(*mi));
119                         mi->next = v->mi_list;
120                         v->mi_list = mi;
121                         strcpy(mi->name, name);
122                         mi->ringbuffer_head = 0;
123
124                         lwsl_notice("Created new mi %p '%s'\n", mi, name);
125                 }
126
127                 /* add our pss to list of guys bound to this mi */
128
129                 pss->same_mi_pss_list = mi->same_mi_pss_list;
130                 mi->same_mi_pss_list = pss;
131
132                 /* init the pss */
133
134                 pss->mi = mi;
135                 pss->ringbuffer_tail = mi->ringbuffer_head;
136                 pss->wsi = wsi;
137
138                 break;
139
140         case LWS_CALLBACK_CLOSED:
141
142                 /* detach our pss from the mirror instance */
143
144                 mi = pss->mi;
145                 if (!mi)
146                         break;
147
148                 lws_start_foreach_llp(struct per_session_data__lws_mirror **,
149                         ppss, mi->same_mi_pss_list) {
150                         if (*ppss == pss) {
151
152                                 *ppss = pss->same_mi_pss_list;
153                                 break;
154                         }
155                 } lws_end_foreach_llp(ppss, same_mi_pss_list);
156
157                 pss->mi = NULL;
158
159                 if (mi->same_mi_pss_list)
160                         break;
161
162                 /* last pss unbound from mi... delete mi */
163
164                 lws_start_foreach_llp(struct lws_mirror_instance **,
165                                 pmi, v->mi_list) {
166                         if (*pmi != mi)
167                                 continue;
168
169                         *pmi = (*pmi)->next;
170
171                         lwsl_info("%s: mirror cleaniup %p\n", __func__, v);
172                         for (n = 0; n < ARRAY_SIZE(mi->ringbuffer); n++)
173                                 if (mi->ringbuffer[n].payload) {
174                                         free(mi->ringbuffer[n].payload);
175                                         mi->ringbuffer[n].payload = NULL;
176                                 }
177
178                         free(mi);
179                         break;
180                 } lws_end_foreach_llp(pmi, next);
181
182                 break;
183
184         case LWS_CALLBACK_PROTOCOL_INIT: /* per vhost */
185                 lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
186                                 lws_get_protocol(wsi),
187                                 sizeof(struct per_vhost_data__lws_mirror));
188                 break;
189
190         case LWS_CALLBACK_PROTOCOL_DESTROY: /* per vhost */
191                 break;
192
193         case LWS_CALLBACK_SERVER_WRITEABLE:
194                 while (pss->ringbuffer_tail != pss->mi->ringbuffer_head) {
195                         m = pss->mi->ringbuffer[pss->ringbuffer_tail].len;
196                         n = lws_write(wsi, (unsigned char *)
197                                         pss->mi->ringbuffer[pss->ringbuffer_tail].payload +
198                                    LWS_PRE, m, LWS_WRITE_TEXT);
199                         if (n < 0) {
200                                 lwsl_err("ERROR %d writing to mirror socket\n", n);
201                                 return -1;
202                         }
203                         if (n < m)
204                                 lwsl_err("mirror partial write %d vs %d\n", n, m);
205
206                         if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
207                                 pss->ringbuffer_tail = 0;
208                         else
209                                 pss->ringbuffer_tail++;
210
211                         if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) &
212                             (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15))
213                                 lws_rx_flow_allow_all_protocol(lws_get_context(wsi),
214                                                lws_get_protocol(wsi));
215
216                         if (lws_send_pipe_choked(wsi)) {
217                                 lws_callback_on_writable(wsi);
218                                 break;
219                         }
220                 }
221                 break;
222
223         case LWS_CALLBACK_RECEIVE:
224                 if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) &
225                     (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) {
226                         lwsl_err("dropping!\n");
227                         goto choke;
228                 }
229
230                 if (pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload)
231                         free(pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload);
232
233                 pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload = malloc(LWS_PRE + len);
234                 pss->mi->ringbuffer[pss->mi->ringbuffer_head].len = len;
235                 memcpy((char *)pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload +
236                        LWS_PRE, in, len);
237                 if (pss->mi->ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
238                         pss->mi->ringbuffer_head = 0;
239                 else
240                         pss->mi->ringbuffer_head++;
241
242                 if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) &
243                     (MAX_MESSAGE_QUEUE - 1)) != (MAX_MESSAGE_QUEUE - 2))
244                         goto done;
245
246 choke:
247                 lwsl_debug("LWS_CALLBACK_RECEIVE: throttling %p\n", wsi);
248                 lws_rx_flow_control(wsi, 0);
249
250 done:
251                 /*
252                  *  ask for WRITABLE callback for every wsi bound to this
253                  * mirror instance
254                  */
255                 lws_start_foreach_ll(struct per_session_data__lws_mirror *,
256                                         pss1, pss->mi->same_mi_pss_list) {
257                         lws_callback_on_writable(pss1->wsi);
258                 } lws_end_foreach_ll(pss1, same_mi_pss_list);
259                 break;
260
261         default:
262                 break;
263         }
264
265         return 0;
266 }
267
268 #define LWS_PLUGIN_PROTOCOL_MIRROR { \
269                 "lws-mirror-protocol", \
270                 callback_lws_mirror, \
271                 sizeof(struct per_session_data__lws_mirror), \
272                 128, /* rx buf size must be >= permessage-deflate rx size */ \
273         }
274
275 #if !defined (LWS_PLUGIN_STATIC)
276
277 static const struct lws_protocols protocols[] = {
278         LWS_PLUGIN_PROTOCOL_MIRROR
279 };
280
281 LWS_EXTERN LWS_VISIBLE int
282 init_protocol_lws_mirror(struct lws_context *context,
283                              struct lws_plugin_capability *c)
284 {
285         if (c->api_magic != LWS_PLUGIN_API_MAGIC) {
286                 lwsl_err("Plugin API %d, library API %d", LWS_PLUGIN_API_MAGIC,
287                          c->api_magic);
288                 return 1;
289         }
290
291         c->protocols = protocols;
292         c->count_protocols = ARRAY_SIZE(protocols);
293         c->extensions = NULL;
294         c->count_extensions = 0;
295
296         return 0;
297 }
298
299 LWS_EXTERN LWS_VISIBLE int
300 destroy_protocol_lws_mirror(struct lws_context *context)
301 {
302         return 0;
303 }
304 #endif