2 * libwebsockets - plugin for raw proxying
4 * Copyright (C) 2010-2018 Andy Green <andy@warmcat.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation:
9 * version 2.1 of the License.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
22 #if !defined (LWS_PLUGIN_STATIC)
25 #include <libwebsockets.h>
29 #include <sys/types.h>
46 * Because both sides of the connection want to share this, we allocate it
47 * during accepted adoption and both sides have a pss that is just a wrapper
50 * The last one of the accepted side and the onward side to close frees it.
51 * This removes any chance of one side or the other having an invalidated
58 /* rings containing unsent rx from accepted and onward sides */
59 struct lws_ring *r[2];
60 uint32_t t[2]; /* ring tail */
63 uint32_t ticket_retired;
74 /* one of these is created for each vhost our protocol is used with */
83 __destroy_packet(void *_pkt)
85 struct packet *pkt = _pkt;
93 destroy_conn(struct raw_vhd *vhd, struct raw_pss *pss)
95 struct conn *conn = pss->conn;
98 lws_ring_destroy(conn->r[ACC]);
100 lws_ring_destroy(conn->r[ONW]);
108 connect_client(struct raw_vhd *vhd, struct raw_pss *pss)
110 struct lws_client_connect_info i;
114 lws_snprintf(host, sizeof(host), "%s:%u", vhd->addr, vhd->port);
116 memset(&i, 0, sizeof(i));
119 i.context = lws_get_context(pss->conn->wsi[ACC]);
121 i.address = vhd->addr;
124 i.ssl_connection = 0;
125 i.vhost = lws_get_vhost(pss->conn->wsi[ACC]);
126 i.local_protocol_name = "raw-proxy";
127 i.protocol = "raw-proxy";
130 * The "onward" client wsi has its own pss but shares the "conn"
131 * created when the inbound connection was accepted. We need to stash
132 * the address of the shared conn and apply it to the client psss
133 * when the client connection completes.
135 i.opaque_user_data = pss->conn;
136 i.pwsi = &pss->conn->wsi[ONW];
138 lwsl_info("%s: onward: %s:%d%s\n", __func__, i.address, i.port, i.path);
140 cwsi = lws_client_connect_via_info(&i);
142 lwsl_err("%s: client connect failed early\n", __func__);
148 flow_control(struct conn *conn, int side, int enable)
150 if (conn->closed[side] ||
151 enable == conn->rx_enabled[side] ||
152 !conn->established[side])
155 if (lws_rx_flow_control(conn->wsi[side], enable))
158 conn->rx_enabled[side] = enable;
159 lwsl_info("%s: %s side: %s\n", __func__, side ? "ONW" : "ACC",
160 enable ? "rx enabled" : "rx flow controlled");
166 callback_raw_proxy(struct lws *wsi, enum lws_callback_reasons reason,
167 void *user, void *in, size_t len)
169 struct raw_pss *pss = (struct raw_pss *)user;
170 struct raw_vhd *vhd = (struct raw_vhd *)lws_protocol_vh_priv_get(
171 lws_get_vhost(wsi), lws_get_protocol(wsi));
172 const struct packet *ppkt;
173 struct conn *conn = NULL;
174 struct lws_tokenize ts;
184 case LWS_CALLBACK_PROTOCOL_INIT:
185 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
186 lws_get_protocol(wsi), sizeof(struct raw_vhd));
187 if (lws_pvo_get_str(in, "onward", &cp)) {
188 lwsl_err("%s: vh %s: pvo 'onward' required\n", __func__,
189 lws_get_vhost_name(lws_get_vhost(wsi)));
193 lws_tokenize_init(&ts, cp, LWS_TOKENIZE_F_DOT_NONTERM |
194 LWS_TOKENIZE_F_MINUS_NONTERM |
195 LWS_TOKENIZE_F_NO_FLOATS);
198 if (lws_tokenize(&ts) != LWS_TOKZE_TOKEN)
200 if (!strncmp(ts.token, "ipv6", ts.token_len))
203 if (strncmp(ts.token, "ipv4", ts.token_len))
207 if (lws_tokenize(&ts) != LWS_TOKZE_DELIMITER)
210 e = lws_tokenize(&ts);
212 if (e != LWS_TOKZE_TOKEN ||
213 ts.token_len + 1 >= (int)sizeof(vhd->addr))
216 lws_strncpy(vhd->addr, ts.token, ts.token_len + 1);
217 e = lws_tokenize(&ts);
218 if (e == LWS_TOKZE_DELIMITER) {
219 /* there should be a port then */
220 e = lws_tokenize(&ts);
221 if (e != LWS_TOKZE_INTEGER)
223 vhd->port = atoi(ts.token);
224 e = lws_tokenize(&ts);
226 if (e != LWS_TOKZE_ENDED)
229 lws_strncpy(vhd->addr, ts.token, sizeof(vhd->addr));
231 lwsl_notice("%s: vh %s: onward %s:%s:%d\n", __func__,
232 lws_get_vhost_name(lws_get_vhost(wsi)),
233 vhd->ipv6 ? "ipv6": "ipv4", vhd->addr, vhd->port);
237 lwsl_err("%s: onward pvo format must be ipv4:addr[:port] "
238 " or ipv6:addr, not '%s'\n", __func__, cp);
241 case LWS_CALLBACK_PROTOCOL_DESTROY:
244 /* callbacks related to client "onward side" */
246 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
247 lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
248 in ? (char *)in : "(null)");
251 case LWS_CALLBACK_RAW_PROXY_CLI_ADOPT:
252 lwsl_debug("LWS_CALLBACK_RAW_CLI_ADOPT: pss %p\n", pss);
255 conn = pss->conn = lws_get_opaque_user_data(wsi);
256 conn->established[ONW] = 1;
257 /* they start enabled */
258 conn->rx_enabled[ACC] = 1;
259 conn->rx_enabled[ONW] = 1;
261 /* he disabled his rx while waiting for use to be established */
262 flow_control(conn, ACC, 1);
264 lws_callback_on_writable(wsi);
265 lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
268 case LWS_CALLBACK_RAW_PROXY_CLI_CLOSE:
269 lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_CLOSE\n");
273 conn->closed[ONW] = 1;
275 if (conn->closed[ACC])
276 destroy_conn(vhd, pss);
280 case LWS_CALLBACK_RAW_PROXY_CLI_RX:
281 lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_RX: %d\n", (int)len);
286 if (!pss || !conn->wsi[ACC] || conn->closed[ACC]) {
287 lwsl_info(" pss %p, wsi[ACC] %p, closed[ACC] %d\n",
288 pss, conn->wsi[ACC], conn->closed[ACC]);
291 pkt.payload = malloc(len);
293 lwsl_notice("OOM: dropping\n");
297 pkt.ticket = conn->ticket_next++;
299 memcpy(pkt.payload, in, len);
300 if (!lws_ring_insert(conn->r[ONW], &pkt, 1)) {
301 __destroy_packet(&pkt);
302 lwsl_notice("dropping!\n");
306 lwsl_debug("After onward RX: acc free: %d...\n",
307 (int)lws_ring_get_count_free_elements(conn->r[ONW]));
309 if (conn->rx_enabled[ONW] &&
310 lws_ring_get_count_free_elements(conn->r[ONW]) < 2)
311 flow_control(conn, ONW, 0);
313 if (!conn->closed[ACC])
314 lws_callback_on_writable(conn->wsi[ACC]);
317 case LWS_CALLBACK_RAW_PROXY_CLI_WRITEABLE:
318 lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_WRITEABLE\n");
323 ppkt = lws_ring_get_element(conn->r[ACC], &conn->t[ACC]);
325 lwsl_info("%s: CLI_WRITABLE had nothing in acc ring\n",
330 if (ppkt->ticket != conn->ticket_retired + 1) {
331 lwsl_info("%s: acc ring has %d but next %d\n", __func__,
332 ppkt->ticket, conn->ticket_retired + 1);
333 lws_callback_on_writable(conn->wsi[ACC]);
337 n = lws_write(wsi, ppkt->payload, ppkt->len, LWS_WRITE_RAW);
339 lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
344 conn->ticket_retired = ppkt->ticket;
345 lws_ring_consume(conn->r[ACC], &conn->t[ACC], NULL, 1);
346 lws_ring_update_oldest_tail(conn->r[ACC], conn->t[ACC]);
348 lwsl_debug("acc free: %d...\n",
349 (int)lws_ring_get_count_free_elements(conn->r[ACC]));
351 if (!conn->rx_enabled[ACC] &&
352 lws_ring_get_count_free_elements(conn->r[ACC]) > 2)
353 flow_control(conn, ACC, 1);
355 ppkt = lws_ring_get_element(conn->r[ACC], &conn->t[ACC]);
356 lwsl_debug("%s: CLI_WRITABLE: next acc pkt %p idx %d vs %d\n",
357 __func__, ppkt, ppkt ? ppkt->ticket : 0,
358 conn->ticket_retired + 1);
360 if (ppkt && ppkt->ticket == conn->ticket_retired + 1)
361 lws_callback_on_writable(wsi);
364 * defer checking for accepted side closing until we
365 * sent everything in the ring to onward
367 if (conn->closed[ACC])
369 * there is never going to be any more... but
370 * we may have some tx still in tx buflist /
373 return lws_raw_transaction_completed(wsi);
375 if (lws_ring_get_element(conn->r[ONW], &conn->t[ONW]))
376 lws_callback_on_writable(conn->wsi[ACC]);
380 /* callbacks related to raw socket descriptor "accepted side" */
382 case LWS_CALLBACK_RAW_PROXY_SRV_ADOPT:
383 lwsl_debug("LWS_CALLBACK_RAW_SRV_ADOPT\n");
386 conn = pss->conn = malloc(sizeof(struct conn));
389 memset(conn, 0, sizeof(*conn));
391 conn->wsi[ACC] = wsi;
392 conn->ticket_next = 1;
394 conn->r[ACC] = lws_ring_create(sizeof(struct packet),
395 RING_DEPTH, __destroy_packet);
397 lwsl_err("%s: OOM\n", __func__);
400 conn->r[ONW] = lws_ring_create(sizeof(struct packet),
401 RING_DEPTH, __destroy_packet);
403 lws_ring_destroy(conn->r[ACC]);
405 lwsl_err("%s: OOM\n", __func__);
410 conn->established[ACC] = 1;
412 /* disable any rx until the client side is up */
413 flow_control(conn, ACC, 0);
415 lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
417 /* try to create the onward client connection */
418 connect_client(vhd, pss);
421 case LWS_CALLBACK_RAW_PROXY_SRV_CLOSE:
422 lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_CLOSE:\n");
427 conn->closed[ACC] = 1;
428 if (conn->closed[ONW])
429 destroy_conn(vhd, pss);
432 case LWS_CALLBACK_RAW_PROXY_SRV_RX:
433 lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_RX: rx %d\n", (int)len);
435 if (!conn || !conn->wsi[ONW]) {
436 lwsl_err("%s: LWS_CALLBACK_RAW_PROXY_SRV_RX: "
437 "conn->wsi[ONW] NULL\n", __func__);
440 if (conn->closed[ONW]) {
441 lwsl_info(" closed[ONW] %d\n", conn->closed[ONW]);
445 pkt.payload = malloc(len);
447 lwsl_notice("OOM: dropping\n");
451 pkt.ticket = conn->ticket_next++;
453 memcpy(pkt.payload, in, len);
454 if (!lws_ring_insert(conn->r[ACC], &pkt, 1)) {
455 __destroy_packet(&pkt);
456 lwsl_notice("dropping!\n");
460 lwsl_debug("After acc RX: acc free: %d...\n",
461 (int)lws_ring_get_count_free_elements(conn->r[ACC]));
463 if (conn->rx_enabled[ACC] &&
464 lws_ring_get_count_free_elements(conn->r[ACC]) <= 2)
465 flow_control(conn, ACC, 0);
467 if (conn->established[ONW] && !conn->closed[ONW])
468 lws_callback_on_writable(conn->wsi[ONW]);
471 case LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE:
472 lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE\n");
474 if (!conn || !conn->established[ONW] || conn->closed[ONW])
477 ppkt = lws_ring_get_element(conn->r[ONW], &conn->t[ONW]);
479 lwsl_info("%s: SRV_WRITABLE nothing in onw ring\n",
484 if (ppkt->ticket != conn->ticket_retired + 1) {
485 lwsl_info("%s: onw ring has %d but next %d\n", __func__,
486 ppkt->ticket, conn->ticket_retired + 1);
487 lws_callback_on_writable(conn->wsi[ONW]);
491 n = lws_write(wsi, ppkt->payload, ppkt->len, LWS_WRITE_RAW);
493 lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
498 conn->ticket_retired = ppkt->ticket;
499 lws_ring_consume(conn->r[ONW], &conn->t[ONW], NULL, 1);
500 lws_ring_update_oldest_tail(conn->r[ONW], conn->t[ONW]);
502 lwsl_debug("onward free: %d... waiting %d\n",
503 (int)lws_ring_get_count_free_elements(conn->r[ONW]),
504 (int)lws_ring_get_count_waiting_elements(conn->r[ONW],
507 if (!conn->rx_enabled[ONW] &&
508 lws_ring_get_count_free_elements(conn->r[ONW]) > 2)
509 flow_control(conn, ONW, 1);
511 ppkt = lws_ring_get_element(conn->r[ONW], &conn->t[ONW]);
512 lwsl_debug("%s: SRV_WRITABLE: next onw pkt %p idx %d vs %d\n",
513 __func__, ppkt, ppkt ? ppkt->ticket : 0,
514 conn->ticket_retired + 1);
516 if (ppkt && ppkt->ticket == conn->ticket_retired + 1)
517 lws_callback_on_writable(wsi);
520 * defer checking for onward side closing until we
521 * sent everything in the ring to accepted side
523 if (conn->closed[ONW])
525 * there is never going to be any more... but
526 * we may have some tx still in tx buflist /
529 return lws_raw_transaction_completed(wsi);
531 if (lws_ring_get_element(conn->r[ACC], &conn->t[ACC]))
532 lws_callback_on_writable(conn->wsi[ONW]);
540 return lws_callback_http_dummy(wsi, reason, user, in, len);
543 #define LWS_PLUGIN_PROTOCOL_RAW_PROXY { \
545 callback_raw_proxy, \
546 sizeof(struct raw_pss), \
551 #if !defined (LWS_PLUGIN_STATIC)
553 static const struct lws_protocols protocols[] = {
554 LWS_PLUGIN_PROTOCOL_RAW_PROXY
557 LWS_EXTERN LWS_VISIBLE int
558 init_protocol_lws_raw_proxy(struct lws_context *context,
559 struct lws_plugin_capability *c)
561 if (c->api_magic != LWS_PLUGIN_API_MAGIC) {
562 lwsl_err("Plugin API %d, library API %d", LWS_PLUGIN_API_MAGIC,
567 c->protocols = protocols;
568 c->count_protocols = LWS_ARRAY_SIZE(protocols);
569 c->extensions = NULL;
570 c->count_extensions = 0;
575 LWS_EXTERN LWS_VISIBLE int
576 destroy_protocol_lws_raw_proxy(struct lws_context *context)