Imported Upstream version 3.2
[platform/upstream/libwebsockets.git] / plugins / raw-proxy / protocol_lws_raw_proxy.c
1 /*
2  * libwebsockets - plugin for raw proxying
3  *
4  * Copyright (C) 2010-2018 Andy Green <andy@warmcat.com>
5  *
6  *  This library is free software; you can redistribute it and/or
7  *  modify it under the terms of the GNU Lesser General Public
8  *  License as published by the Free Software Foundation:
9  *  version 2.1 of the License.
10  *
11  *  This library is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  *  Lesser General Public License for more details.
15  *
16  *  You should have received a copy of the GNU Lesser General Public
17  *  License along with this library; if not, write to the Free Software
18  *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19  *  MA  02110-1301  USA
20  */
21
22 #if !defined (LWS_PLUGIN_STATIC)
23 #define LWS_DLL
24 #define LWS_INTERNAL
25 #include <libwebsockets.h>
26 #endif
27
28 #include <string.h>
29 #include <sys/types.h>
30 #include <fcntl.h>
31
32 #define RING_DEPTH 8
33
34 struct packet {
35         void *payload;
36         uint32_t len;
37         uint32_t ticket;
38 };
39
40 enum {
41         ACC,
42         ONW
43 };
44
45 /*
46  * Because both sides of the connection want to share this, we allocate it
47  * during accepted adoption and both sides have a pss that is just a wrapper
48  * pointing to this.
49  *
50  * The last one of the accepted side and the onward side to close frees it.
51  * This removes any chance of one side or the other having an invalidated
52  * pointer to the pss.
53  */
54
55 struct conn {
56         struct lws *wsi[2];
57
58         /* rings containing unsent rx from accepted and onward sides */
59         struct lws_ring *r[2];
60         uint32_t t[2]; /* ring tail */
61
62         uint32_t ticket_next;
63         uint32_t ticket_retired;
64
65         char rx_enabled[2];
66         char closed[2];
67         char established[2];
68 };
69
70 struct raw_pss {
71         struct conn *conn;
72 };
73
74 /* one of these is created for each vhost our protocol is used with */
75
76 struct raw_vhd {
77         char addr[128];
78         uint16_t port;
79         char ipv6;
80 };
81
82 static void
83 __destroy_packet(void *_pkt)
84 {
85         struct packet *pkt = _pkt;
86
87         free(pkt->payload);
88         pkt->payload = NULL;
89         pkt->len = 0;
90 }
91
92 static void
93 destroy_conn(struct raw_vhd *vhd, struct raw_pss *pss)
94 {
95         struct conn *conn = pss->conn;
96
97         if (conn->r[ACC])
98                 lws_ring_destroy(conn->r[ACC]);
99         if (conn->r[ONW])
100                 lws_ring_destroy(conn->r[ONW]);
101
102         pss->conn = NULL;
103
104         free(conn);
105 }
106
107 static int
108 connect_client(struct raw_vhd *vhd, struct raw_pss *pss)
109 {
110         struct lws_client_connect_info i;
111         char host[128];
112         struct lws *cwsi;
113
114         lws_snprintf(host, sizeof(host), "%s:%u", vhd->addr, vhd->port);
115
116         memset(&i, 0, sizeof(i));
117
118         i.method = "RAW";
119         i.context = lws_get_context(pss->conn->wsi[ACC]);
120         i.port = vhd->port;
121         i.address = vhd->addr;
122         i.host = host;
123         i.origin = host;
124         i.ssl_connection = 0;
125         i.vhost = lws_get_vhost(pss->conn->wsi[ACC]);
126         i.local_protocol_name = "raw-proxy";
127         i.protocol = "raw-proxy";
128         i.path = "/";
129         /*
130          * The "onward" client wsi has its own pss but shares the "conn"
131          * created when the inbound connection was accepted.  We need to stash
132          * the address of the shared conn and apply it to the client psss
133          * when the client connection completes.
134          */
135         i.opaque_user_data = pss->conn;
136         i.pwsi = &pss->conn->wsi[ONW];
137
138         lwsl_info("%s: onward: %s:%d%s\n", __func__, i.address, i.port, i.path);
139
140         cwsi = lws_client_connect_via_info(&i);
141         if (!cwsi)
142                 lwsl_err("%s: client connect failed early\n", __func__);
143
144         return !cwsi;
145 }
146
147 static int
148 flow_control(struct conn *conn, int side, int enable)
149 {
150         if (conn->closed[side] ||
151             enable == conn->rx_enabled[side] ||
152             !conn->established[side])
153                 return 0;
154
155         if (lws_rx_flow_control(conn->wsi[side], enable))
156                 return 1;
157
158         conn->rx_enabled[side] = enable;
159         lwsl_info("%s: %s side: %s\n", __func__, side ? "ONW" : "ACC",
160                   enable ? "rx enabled" : "rx flow controlled");
161
162         return 0;
163 }
164
165 static int
166 callback_raw_proxy(struct lws *wsi, enum lws_callback_reasons reason,
167                    void *user, void *in, size_t len)
168 {
169         struct raw_pss *pss = (struct raw_pss *)user;
170         struct raw_vhd *vhd = (struct raw_vhd *)lws_protocol_vh_priv_get(
171                                      lws_get_vhost(wsi), lws_get_protocol(wsi));
172         const struct packet *ppkt;
173         struct conn *conn = NULL;
174         struct lws_tokenize ts;
175         lws_tokenize_elem e;
176         struct packet pkt;
177         const char *cp;
178         int n;
179
180         if (pss)
181                 conn = pss->conn;
182
183         switch (reason) {
184         case LWS_CALLBACK_PROTOCOL_INIT:
185                 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
186                                 lws_get_protocol(wsi), sizeof(struct raw_vhd));
187                 if (lws_pvo_get_str(in, "onward", &cp)) {
188                         lwsl_err("%s: vh %s: pvo 'onward' required\n", __func__,
189                                  lws_get_vhost_name(lws_get_vhost(wsi)));
190
191                         return -1;
192                 }
193                 lws_tokenize_init(&ts, cp, LWS_TOKENIZE_F_DOT_NONTERM |
194                                            LWS_TOKENIZE_F_MINUS_NONTERM |
195                                            LWS_TOKENIZE_F_NO_FLOATS);
196                 ts.len = strlen(cp);
197
198                 if (lws_tokenize(&ts) != LWS_TOKZE_TOKEN)
199                         goto bad_onward;
200                 if (!strncmp(ts.token, "ipv6", ts.token_len))
201                         vhd->ipv6 = 1;
202                 else
203                         if (strncmp(ts.token, "ipv4", ts.token_len))
204                                 goto bad_onward;
205
206                 /* then the colon */
207                 if (lws_tokenize(&ts) != LWS_TOKZE_DELIMITER)
208                         goto bad_onward;
209
210                 e = lws_tokenize(&ts);
211                 if (!vhd->ipv6) {
212                         if (e != LWS_TOKZE_TOKEN ||
213                             ts.token_len + 1 >= (int)sizeof(vhd->addr))
214                                 goto bad_onward;
215
216                         lws_strncpy(vhd->addr, ts.token, ts.token_len + 1);
217                         e = lws_tokenize(&ts);
218                         if (e == LWS_TOKZE_DELIMITER) {
219                                 /* there should be a port then */
220                                 e = lws_tokenize(&ts);
221                                 if (e != LWS_TOKZE_INTEGER)
222                                         goto bad_onward;
223                                 vhd->port = atoi(ts.token);
224                                 e = lws_tokenize(&ts);
225                         }
226                         if (e != LWS_TOKZE_ENDED)
227                                 goto bad_onward;
228                 } else
229                         lws_strncpy(vhd->addr, ts.token, sizeof(vhd->addr));
230
231                 lwsl_notice("%s: vh %s: onward %s:%s:%d\n", __func__,
232                             lws_get_vhost_name(lws_get_vhost(wsi)),
233                             vhd->ipv6 ? "ipv6": "ipv4", vhd->addr, vhd->port);
234                 break;
235
236 bad_onward:
237                 lwsl_err("%s: onward pvo format must be ipv4:addr[:port] "
238                          " or ipv6:addr, not '%s'\n", __func__, cp);
239                 return -1;
240
241         case LWS_CALLBACK_PROTOCOL_DESTROY:
242                 break;
243
244         /* callbacks related to client "onward side" */
245
246         case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
247                 lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
248                          in ? (char *)in : "(null)");
249                 break;
250
251         case LWS_CALLBACK_RAW_PROXY_CLI_ADOPT:
252                 lwsl_debug("LWS_CALLBACK_RAW_CLI_ADOPT: pss %p\n", pss);
253                 if (conn || !pss)
254                         break;
255                 conn = pss->conn = lws_get_opaque_user_data(wsi);
256                 conn->established[ONW] = 1;
257                 /* they start enabled */
258                 conn->rx_enabled[ACC] = 1;
259                 conn->rx_enabled[ONW] = 1;
260
261                 /* he disabled his rx while waiting for use to be established */
262                 flow_control(conn, ACC, 1);
263
264                 lws_callback_on_writable(wsi);
265                 lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
266                 break;
267
268         case LWS_CALLBACK_RAW_PROXY_CLI_CLOSE:
269                 lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_CLOSE\n");
270                 if (!conn)
271                         break;
272
273                 conn->closed[ONW] = 1;
274
275                 if (conn->closed[ACC])
276                         destroy_conn(vhd, pss);
277
278                 break;
279
280         case LWS_CALLBACK_RAW_PROXY_CLI_RX:
281                 lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_RX: %d\n", (int)len);
282
283                 if (!conn)
284                         return 0;
285
286                 if (!pss || !conn->wsi[ACC] || conn->closed[ACC]) {
287                         lwsl_info(" pss %p, wsi[ACC] %p, closed[ACC] %d\n",
288                                   pss, conn->wsi[ACC], conn->closed[ACC]);
289                         return -1;
290                 }
291                 pkt.payload = malloc(len);
292                 if (!pkt.payload) {
293                         lwsl_notice("OOM: dropping\n");
294                         return -1;
295                 }
296                 pkt.len = len;
297                 pkt.ticket = conn->ticket_next++;
298
299                 memcpy(pkt.payload, in, len);
300                 if (!lws_ring_insert(conn->r[ONW], &pkt, 1)) {
301                         __destroy_packet(&pkt);
302                         lwsl_notice("dropping!\n");
303                         return -1;
304                 }
305
306                 lwsl_debug("After onward RX: acc free: %d...\n",
307                            (int)lws_ring_get_count_free_elements(conn->r[ONW]));
308
309                 if (conn->rx_enabled[ONW] &&
310                     lws_ring_get_count_free_elements(conn->r[ONW]) < 2)
311                         flow_control(conn, ONW, 0);
312
313                 if (!conn->closed[ACC])
314                         lws_callback_on_writable(conn->wsi[ACC]);
315                 break;
316
317         case LWS_CALLBACK_RAW_PROXY_CLI_WRITEABLE:
318                 lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_WRITEABLE\n");
319
320                 if (!conn)
321                         break;
322
323                 ppkt = lws_ring_get_element(conn->r[ACC], &conn->t[ACC]);
324                 if (!ppkt) {
325                         lwsl_info("%s: CLI_WRITABLE had nothing in acc ring\n",
326                                   __func__);
327                         break;
328                 }
329
330                 if (ppkt->ticket != conn->ticket_retired + 1) {
331                         lwsl_info("%s: acc ring has %d but next %d\n", __func__,
332                                   ppkt->ticket, conn->ticket_retired + 1);
333                         lws_callback_on_writable(conn->wsi[ACC]);
334                         break;
335                 }
336
337                 n = lws_write(wsi, ppkt->payload, ppkt->len, LWS_WRITE_RAW);
338                 if (n < 0) {
339                         lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
340
341                         return -1;
342                 }
343
344                 conn->ticket_retired = ppkt->ticket;
345                 lws_ring_consume(conn->r[ACC], &conn->t[ACC], NULL, 1);
346                 lws_ring_update_oldest_tail(conn->r[ACC], conn->t[ACC]);
347
348                 lwsl_debug("acc free: %d...\n",
349                           (int)lws_ring_get_count_free_elements(conn->r[ACC]));
350
351                 if (!conn->rx_enabled[ACC] &&
352                     lws_ring_get_count_free_elements(conn->r[ACC]) > 2)
353                         flow_control(conn, ACC, 1);
354
355                 ppkt = lws_ring_get_element(conn->r[ACC], &conn->t[ACC]);
356                 lwsl_debug("%s: CLI_WRITABLE: next acc pkt %p idx %d vs %d\n",
357                            __func__, ppkt, ppkt ? ppkt->ticket : 0,
358                                            conn->ticket_retired + 1);
359
360                 if (ppkt && ppkt->ticket == conn->ticket_retired + 1)
361                         lws_callback_on_writable(wsi);
362                 else {
363                         /*
364                          * defer checking for accepted side closing until we
365                          * sent everything in the ring to onward
366                          */
367                         if (conn->closed[ACC])
368                                 /*
369                                  * there is never going to be any more... but
370                                  * we may have some tx still in tx buflist /
371                                  * partial
372                                  */
373                                 return lws_raw_transaction_completed(wsi);
374
375                         if (lws_ring_get_element(conn->r[ONW], &conn->t[ONW]))
376                                 lws_callback_on_writable(conn->wsi[ACC]);
377                 }
378                 break;
379
380         /* callbacks related to raw socket descriptor "accepted side" */
381
382         case LWS_CALLBACK_RAW_PROXY_SRV_ADOPT:
383                 lwsl_debug("LWS_CALLBACK_RAW_SRV_ADOPT\n");
384                 if (!pss)
385                         return -1;
386                 conn = pss->conn = malloc(sizeof(struct conn));
387                 if (!pss->conn)
388                         return -1;
389                 memset(conn, 0, sizeof(*conn));
390
391                 conn->wsi[ACC] = wsi;
392                 conn->ticket_next = 1;
393
394                 conn->r[ACC] = lws_ring_create(sizeof(struct packet),
395                                                RING_DEPTH, __destroy_packet);
396                 if (!conn->r[ACC]) {
397                         lwsl_err("%s: OOM\n", __func__);
398                         return -1;
399                 }
400                 conn->r[ONW] = lws_ring_create(sizeof(struct packet),
401                                                RING_DEPTH, __destroy_packet);
402                 if (!conn->r[ONW]) {
403                         lws_ring_destroy(conn->r[ACC]);
404                         conn->r[ACC] = NULL;
405                         lwsl_err("%s: OOM\n", __func__);
406
407                         return -1;
408                 }
409
410                 conn->established[ACC] = 1;
411
412                 /* disable any rx until the client side is up */
413                 flow_control(conn, ACC, 0);
414
415                 lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
416
417                 /* try to create the onward client connection */
418                 connect_client(vhd, pss);
419                 break;
420
421         case LWS_CALLBACK_RAW_PROXY_SRV_CLOSE:
422                 lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_CLOSE:\n");
423
424                 if (!conn)
425                         break;
426
427                 conn->closed[ACC] = 1;
428                 if (conn->closed[ONW])
429                         destroy_conn(vhd, pss);
430                 break;
431
432         case LWS_CALLBACK_RAW_PROXY_SRV_RX:
433                 lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_RX: rx %d\n", (int)len);
434
435                 if (!conn || !conn->wsi[ONW]) {
436                         lwsl_err("%s: LWS_CALLBACK_RAW_PROXY_SRV_RX: "
437                                  "conn->wsi[ONW] NULL\n", __func__);
438                         return -1;
439                 }
440                 if (conn->closed[ONW]) {
441                         lwsl_info(" closed[ONW] %d\n", conn->closed[ONW]);
442                         return -1;
443                 }
444
445                 pkt.payload = malloc(len);
446                 if (!pkt.payload) {
447                         lwsl_notice("OOM: dropping\n");
448                         return -1;
449                 }
450                 pkt.len = len;
451                 pkt.ticket = conn->ticket_next++;
452
453                 memcpy(pkt.payload, in, len);
454                 if (!lws_ring_insert(conn->r[ACC], &pkt, 1)) {
455                         __destroy_packet(&pkt);
456                         lwsl_notice("dropping!\n");
457                         return -1;
458                 }
459
460                 lwsl_debug("After acc RX: acc free: %d...\n",
461                            (int)lws_ring_get_count_free_elements(conn->r[ACC]));
462
463                 if (conn->rx_enabled[ACC] &&
464                     lws_ring_get_count_free_elements(conn->r[ACC]) <= 2)
465                         flow_control(conn, ACC, 0);
466
467                 if (conn->established[ONW] && !conn->closed[ONW])
468                         lws_callback_on_writable(conn->wsi[ONW]);
469                 break;
470
471         case LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE:
472                 lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE\n");
473
474                 if (!conn || !conn->established[ONW] || conn->closed[ONW])
475                         break;
476
477                 ppkt = lws_ring_get_element(conn->r[ONW], &conn->t[ONW]);
478                 if (!ppkt) {
479                         lwsl_info("%s: SRV_WRITABLE nothing in onw ring\n",
480                                   __func__);
481                         break;
482                 }
483
484                 if (ppkt->ticket != conn->ticket_retired + 1) {
485                         lwsl_info("%s: onw ring has %d but next %d\n", __func__,
486                                   ppkt->ticket, conn->ticket_retired + 1);
487                         lws_callback_on_writable(conn->wsi[ONW]);
488                         break;
489                 }
490
491                 n = lws_write(wsi, ppkt->payload, ppkt->len, LWS_WRITE_RAW);
492                 if (n < 0) {
493                         lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
494
495                         return -1;
496                 }
497
498                 conn->ticket_retired = ppkt->ticket;
499                 lws_ring_consume(conn->r[ONW], &conn->t[ONW], NULL, 1);
500                 lws_ring_update_oldest_tail(conn->r[ONW], conn->t[ONW]);
501
502                 lwsl_debug("onward free: %d... waiting %d\n",
503                           (int)lws_ring_get_count_free_elements(conn->r[ONW]),
504                           (int)lws_ring_get_count_waiting_elements(conn->r[ONW],
505                                                                 &conn->t[ONW]));
506
507                 if (!conn->rx_enabled[ONW] &&
508                     lws_ring_get_count_free_elements(conn->r[ONW]) > 2)
509                         flow_control(conn, ONW, 1);
510
511                 ppkt = lws_ring_get_element(conn->r[ONW], &conn->t[ONW]);
512                 lwsl_debug("%s: SRV_WRITABLE: next onw pkt %p idx %d vs %d\n",
513                            __func__, ppkt, ppkt ? ppkt->ticket : 0,
514                                            conn->ticket_retired + 1);
515
516                 if (ppkt && ppkt->ticket == conn->ticket_retired + 1)
517                         lws_callback_on_writable(wsi);
518                 else {
519                         /*
520                          * defer checking for onward side closing until we
521                          * sent everything in the ring to accepted side
522                          */
523                         if (conn->closed[ONW])
524                                 /*
525                                  * there is never going to be any more... but
526                                  * we may have some tx still in tx buflist /
527                                  * partial
528                                  */
529                                 return lws_raw_transaction_completed(wsi);
530
531                 if (lws_ring_get_element(conn->r[ACC], &conn->t[ACC]))
532                         lws_callback_on_writable(conn->wsi[ONW]);
533                 }
534                 break;
535
536         default:
537                 break;
538         }
539
540         return lws_callback_http_dummy(wsi, reason, user, in, len);
541 }
542
543 #define LWS_PLUGIN_PROTOCOL_RAW_PROXY { \
544                 "raw-proxy", \
545                 callback_raw_proxy, \
546                 sizeof(struct raw_pss), \
547                 8192, \
548                 8192, NULL, 0 \
549         }
550
551 #if !defined (LWS_PLUGIN_STATIC)
552
553 static const struct lws_protocols protocols[] = {
554         LWS_PLUGIN_PROTOCOL_RAW_PROXY
555 };
556
557 LWS_EXTERN LWS_VISIBLE int
558 init_protocol_lws_raw_proxy(struct lws_context *context,
559                             struct lws_plugin_capability *c)
560 {
561         if (c->api_magic != LWS_PLUGIN_API_MAGIC) {
562                 lwsl_err("Plugin API %d, library API %d", LWS_PLUGIN_API_MAGIC,
563                          c->api_magic);
564                 return 1;
565         }
566
567         c->protocols = protocols;
568         c->count_protocols = LWS_ARRAY_SIZE(protocols);
569         c->extensions = NULL;
570         c->count_extensions = 0;
571
572         return 0;
573 }
574
575 LWS_EXTERN LWS_VISIBLE int
576 destroy_protocol_lws_raw_proxy(struct lws_context *context)
577 {
578         return 0;
579 }
580 #endif
581
582