lws-plugin: move client close request to eventloop thread 43/265543/2
authorSeonah Moon <seonah1.moon@samsung.com>
Thu, 21 Oct 2021 11:22:32 +0000 (20:22 +0900)
committerSeonah Moon <seonah1.moon@samsung.com>
Fri, 22 Oct 2021 06:32:20 +0000 (15:32 +0900)
Change-Id: I36f2d091a754ed04e55947ff6c2cbf378ae146fa

plugins/ble-gatt/ble-gatt-plugin.cpp
plugins/libwebsockets/libwebsockets-plugin.cpp
src/include/vine-data-path-plugin.h
src/include/vine-map.h
src/include/vine-queue.h
src/vine-data-path.cpp
src/vine-event-loop-epoll.cpp

index b8c8a04..373f65d 100755 (executable)
@@ -323,7 +323,7 @@ static void __gatt_server_write_value_requested_cb(const char *remote_address,
 
 static void __notify_write_event(vine_gatt_s *gatt)
 {
-       g_callbacks.pollfd_cb(VINE_DATA_PATH_POLLFD_OP, gatt->eventfd, POLLOUT, gatt->user);
+       g_callbacks.pollfd_cb(VINE_DATA_PATH_POLLFD_OP_ADD, gatt->eventfd, POLLOUT, gatt->user);
 
        uint64_t v = 1;
        if (write(gatt->eventfd, &v, sizeof(v)) == -1)
index feb98d4..061f539 100755 (executable)
@@ -53,6 +53,7 @@ typedef struct {
        struct lws *wsi;
        struct lws_vhost *vh;
        bool is_server;
+       int write_requested;
        bool close_requested;
        bool destroy_requested;
        int curr_conn;
@@ -228,7 +229,7 @@ static void _notify_websocket_op_request(websocket_op_s *op)
        int _eventfd = eventfd(0, 0);
 
        op->fd = _eventfd;
-       g_callbacks.pollfd_cb(VINE_DATA_PATH_POLLFD_OP, _eventfd, LWS_POLLOUT, NULL);
+       g_callbacks.pollfd_cb(VINE_DATA_PATH_POLLFD_OP_ADD, _eventfd, LWS_POLLOUT, NULL);
        if (write(_eventfd, &v, sizeof(v)) == -1)
                VINE_LOGE("Write error(%d)", errno);
 }
@@ -261,8 +262,8 @@ static websocket_op_s *_add_websocket_op_request(websocket_op_code_e code,
        }
 
        op_queue.push(op);
+       VINE_LOGI("op[%p] pushed. size[%zd]", op, op_queue.size());
        return op;
-       //return 0;
 }
 
 static void _del_websocket_op_request(websocket_op_s *op)
@@ -275,9 +276,9 @@ static void _del_websocket_op_request(websocket_op_s *op)
        free(op);
 }
 
-static void _process_pending_destroy(struct lws_vhost *vh)
+static void _terminate_server(struct lws_vhost *vh)
 {
-       VINE_LOGD("Process pending destroy");
+       VINE_LOGD("terminate server. vh[%p]", vh);
        if (!vh)
                return;
 
@@ -286,15 +287,26 @@ static void _process_pending_destroy(struct lws_vhost *vh)
        if (!protocol)
                return;
 
-       lws_callback_all_protocol_vhost_args(vh, protocol, LWS_CALLBACK_USER, NULL, 0);
        listen_vh_list.erase(vh);
        VINE_LOGI("vh[%p] will be destroyed.", vh);
        lws_vhost_destroy(vh);
 
-       while (!lws_service_adjust_timeout(g_context, 1, 0))
+       if (!lws_service_adjust_timeout(g_context, 1, 0))
                lws_service_tsi(g_context, -1, 0);
+       VINE_LOGI("-");
 }
 
+static void _terminate_client(websocket_s *ws)
+{
+       VINE_LOGD("terminate client. ws[%p]", ws);
+       if (!ws->wsi) {
+               VINE_LOGE("ws[%p] was already terminated.", ws);
+               return;
+       }
+
+       if (lws_callback_on_writable(ws->wsi) < 1)
+               VINE_LOGI("Failed to request a writable callback for close.");
+}
 
 static void _destroy_websocket(websocket_s *ws)
 {
@@ -370,7 +382,10 @@ static void _do_process_op(websocket_op_s *op)
                _request_write(op->ws);
                break;
        case WEBSOCKET_OP_TERMINATE:
-               _process_pending_destroy(op->vh);
+               if (op->ws->is_server)
+                       _terminate_server(op->vh);
+               else
+                       _terminate_client(op->ws);
                break;
        case WEBSOCKET_OP_DESTROY:
                _destroy_websocket(op->ws);
@@ -414,11 +429,7 @@ static void _do_flush_op(websocket_op_s *op)
                // TODO: do write.
                break;
        case WEBSOCKET_OP_TERMINATE:
-               VINE_LOGI("Terminate. Do not handle here. ws[%p]", op->ws);
-               return;
        case WEBSOCKET_OP_DESTROY:
-               VINE_LOGI("Destroy. Do not handle here. ws[%p]", op->ws);
-               return;
        case WEBSOCKET_OP_DEINIT:
                VINE_LOGE("This cannot be called here.");
                return;
@@ -436,7 +447,11 @@ static void _flush_op_queue(websocket_s *ws)
 
        while (count-- > 0) {
                op_queue.do_remove_if([&](websocket_op_s *op) {
-                                       return (ws == NULL || (op && op->ws == ws));
+                               if (!op || op->code == WEBSOCKET_OP_TERMINATE
+                                               || op->code == WEBSOCKET_OP_DESTROY
+                                               || op->code == WEBSOCKET_OP_DEINIT)
+                                       return false;
+                               return (ws == NULL || (op && op->ws == ws));
                                }, _do_flush_op);
        }
 }
@@ -488,7 +503,8 @@ static void _delete_websocket_poll_fd(struct lws_pollargs *args)
                g_pollfds.erase(it);
                VINE_LOGI("websocket poll fd[%d] is removed.", args->fd);
        } else {
-               VINE_LOGI("Cannot find fd[%d] but remove it from eventloop", args->fd);
+               VINE_LOGI("Cannot find fd[%d]", args->fd);
+               //return;
        }
 
        if (g_callbacks.pollfd_cb)
@@ -542,6 +558,10 @@ static int _websocket_protocol_cb(struct lws *wsi,
        switch (reason) {
        case LWS_CALLBACK_WSI_DESTROY:
                VINE_LOGI("wsi[%p](ws: %p) is destroyed.", wsi, ws);
+               if (ws) {
+                       VINE_LOGI("clear ws->wsi.");
+                       ws->wsi = NULL;
+               }
                break;
 
        case LWS_CALLBACK_FILTER_NETWORK_CONNECTION:
@@ -748,17 +768,20 @@ static int _websocket_protocol_cb(struct lws *wsi,
                        return 0;
                }
 
-               if (ws->close_requested) {
+               if (ws->write_requested == 0 && ws->close_requested) {
                        VINE_LOGI("Close websocket.");
                        return -1;
                }
 
                if (ws->destroy_requested) {
-                       VINE_LOGI("ws[%p] will be destroyed.", ws);
-                       return -1;
+                       VINE_LOGI("Destroy requested. Do not send data. ws[%p]", ws);
+                       break;
                }
 
-               VINE_LOGI("Writeable to server.");
+               --ws->write_requested;
+               VINE_LOGI("Writeable to server. write_requested[%d]",
+                               ws->write_requested);
+
                n = _write_data(ws);
                if (g_callbacks.written_cb)
                        g_callbacks.written_cb(n, ws->user);
@@ -1109,6 +1132,12 @@ static int websocket_write(vine_dp_plugin_h handle, unsigned char *buf, size_t l
        RET_VAL_IF(len == 0, VINE_DATA_PATH_ERROR_INVALID_PARAMETER, "len is 0");
 
        websocket_s *ws = (websocket_s *)handle;
+
+       if (ws->close_requested) {
+               VINE_LOGE("ws[%p] will be closed.", ws);
+               return VINE_DATA_PATH_ERROR_INVALID_OPERATION;
+       }
+
        websocket_data_s *wd = (websocket_data_s *)calloc(1, sizeof(websocket_data_s));
        unsigned char *data = (unsigned char *)calloc(len, sizeof(unsigned char));
 
@@ -1124,6 +1153,9 @@ static int websocket_write(vine_dp_plugin_h handle, unsigned char *buf, size_t l
        if (!op)
                return VINE_DATA_PATH_ERROR_OUT_OF_MEMORY;
 
+       ++(ws->write_requested);
+       VINE_LOGI("write_requested[%d]", ws->write_requested);
+
        _notify_websocket_op_request(op);
 
        return VINE_DATA_PATH_ERROR_NONE;
@@ -1141,26 +1173,25 @@ static int websocket_close(vine_dp_plugin_h handle)
 
        VINE_LOGD("ws[%p] will be closed.", ws);
        ws->close_requested = true;
-       if (ws->is_server) {
-               websocket_op_s *op = _add_websocket_op_request(WEBSOCKET_OP_TERMINATE,
-                               ws, ws->vh, 0, NULL, 0, NULL, 0, NULL);
-               if (!op)
-                       return VINE_DATA_PATH_ERROR_OUT_OF_MEMORY;
-               _notify_websocket_op_request(op);
-               return VINE_DATA_PATH_ERROR_NONE;
-       }
-
-       if (ws->wsi)
-               lws_callback_on_writable(ws->wsi);
+       websocket_op_s *op = _add_websocket_op_request(WEBSOCKET_OP_TERMINATE,
+                       ws, ws->vh, 0, NULL, 0, NULL, 0, NULL);
+       if (!op)
+               return VINE_DATA_PATH_ERROR_OUT_OF_MEMORY;
+       _notify_websocket_op_request(op);
 
        return VINE_DATA_PATH_ERROR_NONE;
 }
 
 static void websocket_process_event(int fd, int events)
 {
+       if (fd <= 0)
+               return;
+
        auto it = g_pollfds.find(fd);
        if (it == g_pollfds.end()) {
                _process_websocket_op_request(fd);
+               g_callbacks.pollfd_cb(VINE_DATA_PATH_POLLFD_OP_DEL,
+                               fd, LWS_POLLOUT, NULL);
                close(fd);
                return;
        }
@@ -1208,7 +1239,6 @@ static int websocket_destroy(vine_dp_plugin_h handle)
 {
        RET_VAL_IF(handle == NULL, VINE_DATA_PATH_ERROR_INVALID_PARAMETER, "handle is NULL");
 
-
        websocket_s *ws = (websocket_s *)handle;
        if (ws->destroy_requested) {
                VINE_LOGD("Already requested.");
@@ -1217,12 +1247,6 @@ static int websocket_destroy(vine_dp_plugin_h handle)
 
        _flush_op_queue(ws);
        ws->destroy_requested = true;
-       websocket_op_s *op = _add_websocket_op_request(WEBSOCKET_OP_DESTROY,
-                               (websocket_s *)handle, NULL, 0, NULL, 0, NULL, 0, NULL);
-       if (!op)
-               return VINE_DATA_PATH_ERROR_OPERATION_FAILED;
-       _notify_websocket_op_request(op);
-
        return VINE_DATA_PATH_ERROR_NONE;
 }
 
index 24a6a23..3d802a8 100755 (executable)
@@ -37,7 +37,8 @@ typedef enum {
        VINE_DATA_PATH_POLLFD_ADD = 0,
        VINE_DATA_PATH_POLLFD_DEL,
        VINE_DATA_PATH_POLLFD_MOD,
-       VINE_DATA_PATH_POLLFD_OP,
+       VINE_DATA_PATH_POLLFD_OP_ADD,
+       VINE_DATA_PATH_POLLFD_OP_DEL,
        VINE_DATA_PATH_POLLFD_LOCK_UNLOCK,
 } vine_data_path_pollfd_op_e;
 
index 12e79a8..6b06157 100755 (executable)
@@ -16,7 +16,6 @@
 */
 #pragma once
 
-#include <functional>
 #include <mutex>
 #include <map>
 
index dd80d46..256a2e1 100755 (executable)
@@ -16,7 +16,6 @@
 */
 #pragma once
 
-#include <functional>
 #include <mutex>
 #include <list>
 
index b11baa1..4a12120 100755 (executable)
@@ -189,9 +189,12 @@ static void __pollfd_cb(vine_data_path_pollfd_op_e op, int fd, int events, void
        case VINE_DATA_PATH_POLLFD_MOD:
                vine_event_loop_mod_io_handler(fd, events, __vine_dp_poll_handler, NULL);
                break;
-       case VINE_DATA_PATH_POLLFD_OP:
+       case VINE_DATA_PATH_POLLFD_OP_ADD:
                vine_event_loop_add_io_handler(fd, events, __vine_dp_op_handler, user_data);
                break;
+       case VINE_DATA_PATH_POLLFD_OP_DEL:
+               vine_event_loop_del_io_handler(fd);
+               break;
        case VINE_DATA_PATH_POLLFD_LOCK_UNLOCK:
                // Do not anything.
                break;
@@ -604,8 +607,6 @@ int vine_data_path_destroy(vine_data_path_h datapath)
        RET_VAL_IF(datapath == NULL, VINE_ERROR_INVALID_PARAMETER, "datapath is NULL");
        vine_data_path_s *dp = (vine_data_path_s *)datapath;
 
-       start_default_state(dp, dp->plugin_handle, *dp->plugin_fn, dp->state);
-
        if (dp->plugin_fn && dp->plugin_fn->destroy)
                dp->plugin_fn->destroy(dp->plugin_handle);
 
index 5e7318a..fac6f80 100755 (executable)
@@ -74,7 +74,7 @@ static void *__vine_event_loop_epoll_run(void *arg)
 
                for (int i = 0; i < n; ++i) {
                        vine_epoll_io_event_handler *h = (vine_epoll_io_event_handler *)events[i].data.ptr;
-                       if (h && h->handler)
+                       if (h && h->fd > 0 && h->handler)
                                h->handler(h->fd, events[i].events, h->user_data);
                }
        } while (!__cleanup);