From 869f6b88eeb9c9fdf0f46c0867c87fa0d51bb02a Mon Sep 17 00:00:00 2001 From: Seonah Moon Date: Thu, 21 Oct 2021 20:22:32 +0900 Subject: [PATCH] lws-plugin: move client close request to eventloop thread Change-Id: I36f2d091a754ed04e55947ff6c2cbf378ae146fa --- plugins/ble-gatt/ble-gatt-plugin.cpp | 2 +- plugins/libwebsockets/libwebsockets-plugin.cpp | 94 ++++++++++++++++---------- src/include/vine-data-path-plugin.h | 3 +- src/include/vine-map.h | 1 - src/include/vine-queue.h | 1 - src/vine-data-path.cpp | 7 +- src/vine-event-loop-epoll.cpp | 2 +- 7 files changed, 67 insertions(+), 43 deletions(-) diff --git a/plugins/ble-gatt/ble-gatt-plugin.cpp b/plugins/ble-gatt/ble-gatt-plugin.cpp index b8c8a04..373f65d 100755 --- a/plugins/ble-gatt/ble-gatt-plugin.cpp +++ b/plugins/ble-gatt/ble-gatt-plugin.cpp @@ -323,7 +323,7 @@ static void __gatt_server_write_value_requested_cb(const char *remote_address, static void __notify_write_event(vine_gatt_s *gatt) { - g_callbacks.pollfd_cb(VINE_DATA_PATH_POLLFD_OP, gatt->eventfd, POLLOUT, gatt->user); + g_callbacks.pollfd_cb(VINE_DATA_PATH_POLLFD_OP_ADD, gatt->eventfd, POLLOUT, gatt->user); uint64_t v = 1; if (write(gatt->eventfd, &v, sizeof(v)) == -1) diff --git a/plugins/libwebsockets/libwebsockets-plugin.cpp b/plugins/libwebsockets/libwebsockets-plugin.cpp index feb98d4..061f539 100755 --- a/plugins/libwebsockets/libwebsockets-plugin.cpp +++ b/plugins/libwebsockets/libwebsockets-plugin.cpp @@ -53,6 +53,7 @@ typedef struct { struct lws *wsi; struct lws_vhost *vh; bool is_server; + int write_requested; bool close_requested; bool destroy_requested; int curr_conn; @@ -228,7 +229,7 @@ static void _notify_websocket_op_request(websocket_op_s *op) int _eventfd = eventfd(0, 0); op->fd = _eventfd; - g_callbacks.pollfd_cb(VINE_DATA_PATH_POLLFD_OP, _eventfd, LWS_POLLOUT, NULL); + g_callbacks.pollfd_cb(VINE_DATA_PATH_POLLFD_OP_ADD, _eventfd, LWS_POLLOUT, NULL); if (write(_eventfd, &v, sizeof(v)) == -1) VINE_LOGE("Write error(%d)", errno); } @@ -261,8 +262,8 @@ static websocket_op_s *_add_websocket_op_request(websocket_op_code_e code, } op_queue.push(op); + VINE_LOGI("op[%p] pushed. size[%zd]", op, op_queue.size()); return op; - //return 0; } static void _del_websocket_op_request(websocket_op_s *op) @@ -275,9 +276,9 @@ static void _del_websocket_op_request(websocket_op_s *op) free(op); } -static void _process_pending_destroy(struct lws_vhost *vh) +static void _terminate_server(struct lws_vhost *vh) { - VINE_LOGD("Process pending destroy"); + VINE_LOGD("terminate server. vh[%p]", vh); if (!vh) return; @@ -286,15 +287,26 @@ static void _process_pending_destroy(struct lws_vhost *vh) if (!protocol) return; - lws_callback_all_protocol_vhost_args(vh, protocol, LWS_CALLBACK_USER, NULL, 0); listen_vh_list.erase(vh); VINE_LOGI("vh[%p] will be destroyed.", vh); lws_vhost_destroy(vh); - while (!lws_service_adjust_timeout(g_context, 1, 0)) + if (!lws_service_adjust_timeout(g_context, 1, 0)) lws_service_tsi(g_context, -1, 0); + VINE_LOGI("-"); } +static void _terminate_client(websocket_s *ws) +{ + VINE_LOGD("terminate client. ws[%p]", ws); + if (!ws->wsi) { + VINE_LOGE("ws[%p] was already terminated.", ws); + return; + } + + if (lws_callback_on_writable(ws->wsi) < 1) + VINE_LOGI("Failed to request a writable callback for close."); +} static void _destroy_websocket(websocket_s *ws) { @@ -370,7 +382,10 @@ static void _do_process_op(websocket_op_s *op) _request_write(op->ws); break; case WEBSOCKET_OP_TERMINATE: - _process_pending_destroy(op->vh); + if (op->ws->is_server) + _terminate_server(op->vh); + else + _terminate_client(op->ws); break; case WEBSOCKET_OP_DESTROY: _destroy_websocket(op->ws); @@ -414,11 +429,7 @@ static void _do_flush_op(websocket_op_s *op) // TODO: do write. break; case WEBSOCKET_OP_TERMINATE: - VINE_LOGI("Terminate. Do not handle here. ws[%p]", op->ws); - return; case WEBSOCKET_OP_DESTROY: - VINE_LOGI("Destroy. Do not handle here. ws[%p]", op->ws); - return; case WEBSOCKET_OP_DEINIT: VINE_LOGE("This cannot be called here."); return; @@ -436,7 +447,11 @@ static void _flush_op_queue(websocket_s *ws) while (count-- > 0) { op_queue.do_remove_if([&](websocket_op_s *op) { - return (ws == NULL || (op && op->ws == ws)); + if (!op || op->code == WEBSOCKET_OP_TERMINATE + || op->code == WEBSOCKET_OP_DESTROY + || op->code == WEBSOCKET_OP_DEINIT) + return false; + return (ws == NULL || (op && op->ws == ws)); }, _do_flush_op); } } @@ -488,7 +503,8 @@ static void _delete_websocket_poll_fd(struct lws_pollargs *args) g_pollfds.erase(it); VINE_LOGI("websocket poll fd[%d] is removed.", args->fd); } else { - VINE_LOGI("Cannot find fd[%d] but remove it from eventloop", args->fd); + VINE_LOGI("Cannot find fd[%d]", args->fd); + //return; } if (g_callbacks.pollfd_cb) @@ -542,6 +558,10 @@ static int _websocket_protocol_cb(struct lws *wsi, switch (reason) { case LWS_CALLBACK_WSI_DESTROY: VINE_LOGI("wsi[%p](ws: %p) is destroyed.", wsi, ws); + if (ws) { + VINE_LOGI("clear ws->wsi."); + ws->wsi = NULL; + } break; case LWS_CALLBACK_FILTER_NETWORK_CONNECTION: @@ -748,17 +768,20 @@ static int _websocket_protocol_cb(struct lws *wsi, return 0; } - if (ws->close_requested) { + if (ws->write_requested == 0 && ws->close_requested) { VINE_LOGI("Close websocket."); return -1; } if (ws->destroy_requested) { - VINE_LOGI("ws[%p] will be destroyed.", ws); - return -1; + VINE_LOGI("Destroy requested. Do not send data. ws[%p]", ws); + break; } - VINE_LOGI("Writeable to server."); + --ws->write_requested; + VINE_LOGI("Writeable to server. write_requested[%d]", + ws->write_requested); + n = _write_data(ws); if (g_callbacks.written_cb) g_callbacks.written_cb(n, ws->user); @@ -1109,6 +1132,12 @@ static int websocket_write(vine_dp_plugin_h handle, unsigned char *buf, size_t l RET_VAL_IF(len == 0, VINE_DATA_PATH_ERROR_INVALID_PARAMETER, "len is 0"); websocket_s *ws = (websocket_s *)handle; + + if (ws->close_requested) { + VINE_LOGE("ws[%p] will be closed.", ws); + return VINE_DATA_PATH_ERROR_INVALID_OPERATION; + } + websocket_data_s *wd = (websocket_data_s *)calloc(1, sizeof(websocket_data_s)); unsigned char *data = (unsigned char *)calloc(len, sizeof(unsigned char)); @@ -1124,6 +1153,9 @@ static int websocket_write(vine_dp_plugin_h handle, unsigned char *buf, size_t l if (!op) return VINE_DATA_PATH_ERROR_OUT_OF_MEMORY; + ++(ws->write_requested); + VINE_LOGI("write_requested[%d]", ws->write_requested); + _notify_websocket_op_request(op); return VINE_DATA_PATH_ERROR_NONE; @@ -1141,26 +1173,25 @@ static int websocket_close(vine_dp_plugin_h handle) VINE_LOGD("ws[%p] will be closed.", ws); ws->close_requested = true; - if (ws->is_server) { - websocket_op_s *op = _add_websocket_op_request(WEBSOCKET_OP_TERMINATE, - ws, ws->vh, 0, NULL, 0, NULL, 0, NULL); - if (!op) - return VINE_DATA_PATH_ERROR_OUT_OF_MEMORY; - _notify_websocket_op_request(op); - return VINE_DATA_PATH_ERROR_NONE; - } - - if (ws->wsi) - lws_callback_on_writable(ws->wsi); + websocket_op_s *op = _add_websocket_op_request(WEBSOCKET_OP_TERMINATE, + ws, ws->vh, 0, NULL, 0, NULL, 0, NULL); + if (!op) + return VINE_DATA_PATH_ERROR_OUT_OF_MEMORY; + _notify_websocket_op_request(op); return VINE_DATA_PATH_ERROR_NONE; } static void websocket_process_event(int fd, int events) { + if (fd <= 0) + return; + auto it = g_pollfds.find(fd); if (it == g_pollfds.end()) { _process_websocket_op_request(fd); + g_callbacks.pollfd_cb(VINE_DATA_PATH_POLLFD_OP_DEL, + fd, LWS_POLLOUT, NULL); close(fd); return; } @@ -1208,7 +1239,6 @@ static int websocket_destroy(vine_dp_plugin_h handle) { RET_VAL_IF(handle == NULL, VINE_DATA_PATH_ERROR_INVALID_PARAMETER, "handle is NULL"); - websocket_s *ws = (websocket_s *)handle; if (ws->destroy_requested) { VINE_LOGD("Already requested."); @@ -1217,12 +1247,6 @@ static int websocket_destroy(vine_dp_plugin_h handle) _flush_op_queue(ws); ws->destroy_requested = true; - websocket_op_s *op = _add_websocket_op_request(WEBSOCKET_OP_DESTROY, - (websocket_s *)handle, NULL, 0, NULL, 0, NULL, 0, NULL); - if (!op) - return VINE_DATA_PATH_ERROR_OPERATION_FAILED; - _notify_websocket_op_request(op); - return VINE_DATA_PATH_ERROR_NONE; } diff --git a/src/include/vine-data-path-plugin.h b/src/include/vine-data-path-plugin.h index 24a6a23..3d802a8 100755 --- a/src/include/vine-data-path-plugin.h +++ b/src/include/vine-data-path-plugin.h @@ -37,7 +37,8 @@ typedef enum { VINE_DATA_PATH_POLLFD_ADD = 0, VINE_DATA_PATH_POLLFD_DEL, VINE_DATA_PATH_POLLFD_MOD, - VINE_DATA_PATH_POLLFD_OP, + VINE_DATA_PATH_POLLFD_OP_ADD, + VINE_DATA_PATH_POLLFD_OP_DEL, VINE_DATA_PATH_POLLFD_LOCK_UNLOCK, } vine_data_path_pollfd_op_e; diff --git a/src/include/vine-map.h b/src/include/vine-map.h index 12e79a8..6b06157 100755 --- a/src/include/vine-map.h +++ b/src/include/vine-map.h @@ -16,7 +16,6 @@ */ #pragma once -#include #include #include diff --git a/src/include/vine-queue.h b/src/include/vine-queue.h index dd80d46..256a2e1 100755 --- a/src/include/vine-queue.h +++ b/src/include/vine-queue.h @@ -16,7 +16,6 @@ */ #pragma once -#include #include #include diff --git a/src/vine-data-path.cpp b/src/vine-data-path.cpp index b11baa1..4a12120 100755 --- a/src/vine-data-path.cpp +++ b/src/vine-data-path.cpp @@ -189,9 +189,12 @@ static void __pollfd_cb(vine_data_path_pollfd_op_e op, int fd, int events, void case VINE_DATA_PATH_POLLFD_MOD: vine_event_loop_mod_io_handler(fd, events, __vine_dp_poll_handler, NULL); break; - case VINE_DATA_PATH_POLLFD_OP: + case VINE_DATA_PATH_POLLFD_OP_ADD: vine_event_loop_add_io_handler(fd, events, __vine_dp_op_handler, user_data); break; + case VINE_DATA_PATH_POLLFD_OP_DEL: + vine_event_loop_del_io_handler(fd); + break; case VINE_DATA_PATH_POLLFD_LOCK_UNLOCK: // Do not anything. break; @@ -604,8 +607,6 @@ int vine_data_path_destroy(vine_data_path_h datapath) RET_VAL_IF(datapath == NULL, VINE_ERROR_INVALID_PARAMETER, "datapath is NULL"); vine_data_path_s *dp = (vine_data_path_s *)datapath; - start_default_state(dp, dp->plugin_handle, *dp->plugin_fn, dp->state); - if (dp->plugin_fn && dp->plugin_fn->destroy) dp->plugin_fn->destroy(dp->plugin_handle); diff --git a/src/vine-event-loop-epoll.cpp b/src/vine-event-loop-epoll.cpp index 5e7318a..fac6f80 100755 --- a/src/vine-event-loop-epoll.cpp +++ b/src/vine-event-loop-epoll.cpp @@ -74,7 +74,7 @@ static void *__vine_event_loop_epoll_run(void *arg) for (int i = 0; i < n; ++i) { vine_epoll_io_event_handler *h = (vine_epoll_io_event_handler *)events[i].data.ptr; - if (h && h->handler) + if (h && h->fd > 0 && h->handler) h->handler(h->fd, events[i].events, h->user_data); } } while (!__cleanup); -- 2.7.4