Change DP termination logic 20/265320/9
authorSeonah Moon <seonah1.moon@samsung.com>
Fri, 15 Oct 2021 01:46:18 +0000 (10:46 +0900)
committerSeonah Moon <seonah1.moon@samsung.com>
Tue, 19 Oct 2021 07:39:50 +0000 (16:39 +0900)
Change-Id: I123c9fd3babf9f37e68b1ad2d444f6b0b5f30cdb

plugins/dns-sd/dns-sd-plugin.cpp
plugins/libwebsockets/libwebsockets-plugin.cpp
src/include/vine-dp.h
src/include/vine-queue.h
src/vine-disc.cpp
src/vine-dp.cpp
src/vine-private.cpp
src/vine-service.cpp

index 7637bb5..0c6291d 100755 (executable)
@@ -164,6 +164,11 @@ void add_new_fd(vine_dns_sd_s *dns_sd_handle,
 static void __remove_service_ref(vine_dns_sd_s *dns_sd_handle, DNSServiceRef service_ref)
 {
        int fd = DNSServiceRefSockFD(service_ref);
+       if (fd < 0) {
+               VINE_LOGE("Invalid fd[%d]", fd);
+               return;
+       }
+
        VINE_LOGD("fd[%d] to be removed", fd);
        if (event_callbacks.fd_removed_cb)
                event_callbacks.fd_removed_cb(fd, dns_sd_handle->user_data);
index 428a445..60933ee 100755 (executable)
@@ -73,6 +73,7 @@ typedef enum {
        WEBSOCKET_OP_WRITE,
        WEBSOCKET_OP_TERMINATE,
        WEBSOCKET_OP_DESTROY,
+       WEBSOCKET_OP_DEINIT,
 } websocket_op_code_e;
 
 typedef struct {
@@ -257,7 +258,6 @@ static int _add_websocket_op_request(websocket_op_code_e code,
        }
 
        op_queue.push(op);
-
        return 0;
 }
 
@@ -284,8 +284,8 @@ static void _process_pending_destroy(struct lws_vhost *vh)
 
        lws_callback_all_protocol_vhost_args(vh, protocol, LWS_CALLBACK_USER, NULL, 0);
        listen_vh_list.erase(vh);
-       lws_vhost_destroy(vh);
-       VINE_LOGI("vh[%p] is destroyed", vh);
+       //lws_vhost_destroy(vh);
+       VINE_LOGI("vh[%p] is erased.", vh);
 
        while (!lws_service_adjust_timeout(g_context, 1, 0))
                lws_service_tsi(g_context, -1, 0);
@@ -318,6 +318,31 @@ static void _destroy_websocket(websocket_s *ws)
        free(ws);
 }
 
+static void _destroy_context(struct lws_context *context)
+{
+       lws_context_destroy(context);
+}
+
+static void _clear_listen_vhosts(void)
+{
+       listen_vh_list.clear();
+}
+
+static void _deinit(void)
+{
+       _destroy_context(g_context);
+       g_context = NULL;
+
+       auto it = g_pollfds.begin();
+       while (it != g_pollfds.end()) {
+               free(it->second);
+               it = g_pollfds.erase(it);
+       }
+
+       _clear_listen_vhosts();
+       VINE_LOGI("-");
+}
+
 static void _process_websocket_op_request(void)
 {
        RET_IF(op_queue.empty(), "operation queue is NULL");
@@ -326,7 +351,7 @@ static void _process_websocket_op_request(void)
        op_queue.pop();
        RET_IF(op == NULL, "op is NULL");
 
-       if (!op->ws) {
+       if (!op->ws && op->code != WEBSOCKET_OP_DEINIT) {
                _del_websocket_op_request(op);
                return;
        }
@@ -347,6 +372,10 @@ static void _process_websocket_op_request(void)
        case WEBSOCKET_OP_DESTROY:
                _destroy_websocket(op->ws);
                break;
+       case WEBSOCKET_OP_DEINIT:
+               _deinit();
+               break;
+
        default:
                break;
        }
@@ -354,6 +383,55 @@ static void _process_websocket_op_request(void)
        _del_websocket_op_request(op);
 }
 
+static void do_func(websocket_op_s *op)
+{
+       if (op == NULL)
+               return;
+
+       VINE_LOGD("op[%p] op->code[%d]", op, op->code);
+       switch (op->code) {
+       case WEBSOCKET_OP_OPEN:
+               VINE_LOGI("Ignore.");
+               break;
+       case WEBSOCKET_OP_CONNECT:
+               VINE_LOGI("Ignore.");
+               break;
+       case WEBSOCKET_OP_WRITE:
+               VINE_LOGI("Ignore.");
+               // Do!
+               break;
+       case WEBSOCKET_OP_TERMINATE:
+               VINE_LOGI("Terminate.");
+               _process_pending_destroy(op->vh);
+               // Do!
+               break;
+       case WEBSOCKET_OP_DESTROY:
+               VINE_LOGI("destroy.");
+               //_destroy_websocket(op->ws);
+               // error
+               break;
+       case WEBSOCKET_OP_DEINIT:
+               VINE_LOGI("This cannot be called here.");
+               break;
+       default:
+               break;
+       }
+       _del_websocket_op_request(op);
+}
+
+static void _flush_op_queue(websocket_s *ws)
+{
+       VINE_LOGD("flush ws[%p]", ws);
+
+       size_t count = op_queue.size();
+
+       while (count-- > 0) {
+               op_queue.do_remove_if([&](websocket_op_s *op) {
+                                       return (ws == NULL || (op && op->ws == ws));
+                               }, do_func);
+       }
+}
+
 static struct lws_context *_create_context(void)
 {
        struct lws_context_creation_info info;
@@ -374,11 +452,6 @@ static struct lws_context *_create_context(void)
        return lws_create_context(&info);
 }
 
-static void _destroy_context(struct lws_context *context)
-{
-       lws_context_destroy(context);
-}
-
 static void _add_websocket_poll_fd(struct lws_pollargs *args)
 {
        RET_IF(args == NULL, "args is NULL");
@@ -457,12 +530,6 @@ static int _websocket_protocol_cb(struct lws *wsi,
                        lws_get_protocol(wsi)); // host private data.
        int n = 0;
 
-       if (ws && ws->close_requested
-                       && reason != LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS) {
-               VINE_LOGD("ws[%p](user: %p) will be closed. Do nothing for it.", ws, ws->user);
-               return -1;
-       }
-
        switch (reason) {
        case LWS_CALLBACK_WSI_DESTROY:
                VINE_LOGI("wsi[%p](ws: %p) is destroyed.", wsi, ws);
@@ -718,13 +785,6 @@ static int websocket_init(void)
        return VINE_DATA_PATH_ERROR_NONE;
 }
 
-static void _clear_listen_vhosts(void)
-{
-       for (auto &vh : listen_vh_list)
-                       lws_vhost_destroy(vh);
-       listen_vh_list.clear();
-}
-
 static void websocket_deinit(void)
 {
        if (__sync_sub_and_fetch(&g_ref_count, 1) > 0) {
@@ -732,16 +792,12 @@ static void websocket_deinit(void)
                return;
        }
 
-       _destroy_context(g_context);
-       g_context = NULL;
-
-       auto it = g_pollfds.begin();
-       while (it != g_pollfds.end()) {
-               free(it->second);
-               it = g_pollfds.erase(it);
-       }
+       _flush_op_queue(NULL);
+       // Run deinit in eventloop thread
+       _add_websocket_op_request(WEBSOCKET_OP_DEINIT,
+                               NULL, NULL, 0, NULL, 0, NULL, 0, NULL);
+       _notify_websocket_op_request();
 
-       _clear_listen_vhosts();
        VINE_LOGD("lws is deinitialized.");
 }
 
@@ -1069,8 +1125,9 @@ static int websocket_close(vine_dp_plugin_h handle)
        VINE_LOGD("ws[%p] will be closed.", ws);
        ws->close_requested = true;
        if (ws->is_server) {
-               _add_websocket_op_request(WEBSOCKET_OP_TERMINATE,
-                               ws, ws->vh, 0, NULL, 0, NULL, 0, NULL);
+               if (_add_websocket_op_request(WEBSOCKET_OP_TERMINATE,
+                               ws, ws->vh, 0, NULL, 0, NULL, 0, NULL) < 0)
+                       return VINE_DATA_PATH_ERROR_OPERATION_FAILED;
                _notify_websocket_op_request();
                return VINE_DATA_PATH_ERROR_NONE;
        }
@@ -1103,7 +1160,7 @@ static void websocket_process_event(int fd, int events)
        lws_service_fd(g_context, pollfd);
 
        // Check for any connection needing forced service.
-       while (!lws_service_adjust_timeout(g_context, 1, 0)) {
+       if (!lws_service_adjust_timeout(g_context, 1, 0)) {
                // Service any pending webscoket activity.
                // Only needed if multiple service threads.
                lws_service_tsi(g_context, -1, 0);
@@ -1140,9 +1197,9 @@ static int websocket_destroy(vine_dp_plugin_h handle)
                return VINE_DATA_PATH_ERROR_NONE;
        }
 
+       _flush_op_queue(ws);
+
        ws->destroy_requested = true;
-       _add_websocket_op_request(WEBSOCKET_OP_DESTROY, ws, NULL, 0, NULL, 0, NULL, 0, NULL);
-       _notify_websocket_op_request();
        return VINE_DATA_PATH_ERROR_NONE;
 }
 
index 2e86615..f0bb97d 100755 (executable)
@@ -42,7 +42,7 @@ public:
        virtual ~DataPath() {}
 
        virtual int open(vine_dp_opened_cb callback, void *user_data) = 0;
-       virtual void close() = 0;
+       virtual int close() = 0;
 
        virtual int send(unsigned char *buf, size_t len) = 0;
        virtual int recv(unsigned char *buf, size_t buf_len, size_t *read_len) = 0;
@@ -106,7 +106,7 @@ public:
        virtual ~DPServer();
 
        virtual int open(vine_dp_opened_cb callback, void *user_data);
-       virtual void close();
+       virtual int close();
 
        virtual int send(unsigned char *buf, size_t len);
        virtual int recv(unsigned char *buf, size_t buf_len, size_t *read_len);
@@ -156,7 +156,7 @@ public:
        virtual ~DPClient();
 
        virtual int open(vine_dp_opened_cb callback, void *user_data);
-       virtual void close();
+       virtual int close();
        virtual int send(unsigned char *buf, size_t len);
        virtual int recv(unsigned char *buf, size_t buf_len, size_t *read_len);
 
@@ -216,7 +216,7 @@ public:
        virtual ~DPPubSub();
 
        virtual int open(vine_dp_opened_cb callback, void *user_data);
-       virtual void close();
+       virtual int close();
        virtual int send(unsigned char *buf, size_t len);
        virtual int recv(unsigned char *buf, size_t buf_len, size_t *read_len);
 
index 5b805e8..dd80d46 100755 (executable)
@@ -16,8 +16,9 @@
 */
 #pragma once
 
+#include <functional>
 #include <mutex>
-#include <queue>
+#include <list>
 
 #include "vine-log.h"
 
@@ -37,7 +38,7 @@ public:
        void push(const T &element)
        {
                std::lock_guard<std::mutex> lock_guard(_q_mutex);
-               _queue.push(element);
+               _queue.push_back(element);
        }
 
        void pop()
@@ -45,7 +46,7 @@ public:
                std::lock_guard<std::mutex> lock_guard(_q_mutex);
                if (_queue.empty())
                        return;
-               _queue.pop();
+               _queue.pop_front();
        }
 
        T &front()
@@ -72,7 +73,23 @@ public:
                return _queue.empty();
        }
 
+       typedef void (*do_func)(T t);
+
+       template <class UnaryPredicate> void do_remove_if(UnaryPredicate comp_f, do_func do_f)
+       {
+               std::lock_guard<std::mutex> lock_guard(_q_mutex);
+               for (auto it = _queue.begin(); it != _queue.end();) {
+                       if (comp_f(*it)) {
+                               do_f(*it);
+                               it = _queue.erase(it);
+                       }
+                       else {
+                               ++it;
+                       }
+               }
+       }
+
 private:
-       std::queue<T> _queue;
+       std::list<T> _queue;
        std::mutex _q_mutex;
 };
index 8d092bd..7b0679d 100755 (executable)
@@ -298,7 +298,7 @@ void __vine_disc_epoll_handler(int fd, int events, void *user_data)
 {
        VINE_LOGD("Process event for fd[%d] events[%d] disc_handle[%p]", fd, events, user_data);
        vine_disc_s *disc_handle = (vine_disc_s *)user_data;
-       if (!disc_handle || !disc_handle->plugin_fn || disc_handle->plugin_fn->process_event == NULL) {
+       if (!disc_handle || !disc_handle->plugin_fn || !disc_handle->plugin_fn->process_event) {
                VINE_LOGE("No process_event() defined");
                return;
        }
index c9475ee..e2e04d1 100755 (executable)
@@ -44,6 +44,15 @@ using namespace vine;
                } \
     } while (0)
 
+#define RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(state) \
+       do { \
+        if (state != VINE_DP_OPEN_STATE_DONE) { \
+                       VINE_LOGE("DP isn't opened."); \
+                       return VINE_ERROR_INVALID_OPERATION; \
+               } \
+    } while (0)
+
+
 extern vine_dp_plugin_fn g_dp_plugin_fn;
 
 static bool _check_if_valid_ip(vine_address_family_e addr_family, const char *ip)
@@ -424,13 +433,17 @@ int DataPath::set_iface_name(const std::string &iface_name)
 
 void DataPath::invoke_opened_cb(int result)
 {
+       if (result == VINE_ERROR_NONE)
+               mOpenState = VINE_DP_OPEN_STATE_DONE;
+       else
+               mOpenState = VINE_DP_OPEN_STATE_NONE;
+
        if (mOpenedCb)
                mOpenedCb(static_cast<void *>(this), (vine_error_e)result, mOpenedCbData);
 
        // called only once.
        mOpenedCb = NULL;
        mOpenedCbData = NULL;
-       mOpenState = VINE_DP_OPEN_STATE_DONE;
 }
 
 int DataPath::set_received_cb(vine_dp_received_cb callback, void *user_data)
@@ -685,19 +698,23 @@ int DPServer::open(vine_dp_opened_cb callback, void *user_data)
        return ret;
 }
 
-void DPServer::close()
-{
+int DPServer::close() {
+       RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
        mOpenState = VINE_DP_OPEN_STATE_NONE;
+       unset_received_cb();
        vine_data_path_close(mDataPath);
+       return VINE_ERROR_NONE;
 }
 
 int DPServer::send(unsigned char *buf, size_t len)
 {
+       RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
        return vine_data_path_write(mDataPath, buf, len);
 }
 
 int DPServer::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
 {
+       RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
        return vine_data_path_read(mDataPath, buf, buf_len, read_len);
 }
 
@@ -707,6 +724,7 @@ DPClient::DPClient(void *event_queue)
        mMethod = VINE_DATA_PATH_METHOD_LWS;
        mEventQueue = event_queue;
        mSecurity = NULL;
+       mOpenState = VINE_DP_OPEN_STATE_NONE;
        mAddrFamily = VINE_ADDRESS_FAMILY_DEFAULT;
        mIfaceName = "";
        mDataPath = NULL;
@@ -729,7 +747,7 @@ DPClient::DPClient(void *event_queue, void *datapath)
        VINE_LOGD("DPClient[%p] is created with datapath[%p]", this, datapath);
        mEventQueue = event_queue;
        mSecurity = NULL;
-       mOpenState = VINE_DP_OPEN_STATE_NONE;
+       mOpenState = VINE_DP_OPEN_STATE_DONE;
        mDataPath = datapath;
        mAddrFamily = vine_data_path_get_addr_family(mDataPath);
        mPeerIp = vine_data_path_get_ip(mDataPath);
@@ -929,19 +947,24 @@ int DPClient::open(vine_dp_opened_cb callback, void *user_data)
        return ret;
 }
 
-void DPClient::close()
+int DPClient::close()
 {
+       RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
        mOpenState = VINE_DP_OPEN_STATE_NONE;
+       unset_received_cb();
        vine_data_path_close(mDataPath);
+       return VINE_ERROR_NONE;
 }
 
 int DPClient::send(unsigned char *buf, size_t len)
 {
+       RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
        return vine_data_path_write(mDataPath, buf, len);
 }
 
 int DPClient::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
 {
+       RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
        return vine_data_path_read(mDataPath, buf, buf_len, read_len);
 }
 
@@ -982,7 +1005,6 @@ DPPubSub::~DPPubSub()
 {
        VINE_LOGD("DPPubSub[%p] is deleted.", this);
        _vine_security_destroy(mSecurity);
-       close();
 }
 
 int DPPubSub::set_method(vine_dp_method_e method)
@@ -1316,11 +1338,14 @@ int DPPubSub::open(vine_dp_opened_cb callback, void *user_data)
        return VINE_ERROR_NONE;
 }
 
-void DPPubSub::close()
+int DPPubSub::close()
 {
+       RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
        vine_disc_stop_publish(mSdPub);
        vine_disc_stop_subscribe(mSdSub);
 
+       unset_received_cb();
+
        vine_disc_destroy(mSdSub);
        mSdSub = NULL;
        vine_disc_destroy(mSdPub);
@@ -1332,10 +1357,12 @@ void DPPubSub::close()
        mServerDataPath = NULL;
 
        mOpenState = VINE_DP_OPEN_STATE_NONE;
+       return VINE_ERROR_NONE;
 }
 
 int DPPubSub::send(unsigned char *buf, size_t len)
 {
+       RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
        int ret = VINE_ERROR_NONE;
 
        for (auto &peer : mDataPathList) {
@@ -1353,6 +1380,7 @@ int DPPubSub::send(unsigned char *buf, size_t len)
 
 int DPPubSub::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
 {
+       RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
        auto &dp_info = mRecvDataPathList.front();
        size_t bytes = 0;
        int ret = vine_data_path_read(dp_info.first, buf, buf_len, &bytes);
@@ -1781,8 +1809,7 @@ int _vine_dp_close(vine_dp_h dp)
        RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
 
        DataPath *_dp = static_cast<DataPath *>(dp);
-       _dp->close();
-       return VINE_ERROR_NONE;
+       return _dp->close();
 }
 
 int _vine_dp_send(vine_dp_h dp, unsigned char *buf, size_t len)
index c2561f4..7fb97d5 100755 (executable)
@@ -100,10 +100,10 @@ int _vine_deinit()
        }
 
        if (__vine_unref() == 0) {
-               vine_event_loop_stop();
-               vine_event_loop_deinit();
                vine_data_path_deinit();
                vine_disc_deinit();
+               vine_event_loop_stop();
+               vine_event_loop_deinit();
        }
        VINE_UNLOCK(&__vine_mutex);
        return VINE_ERROR_NONE;
@@ -135,4 +135,4 @@ int _vine_set_event_loop(vine_event_loop_e loop)
        }
        VINE_UNLOCK(&__vine_mutex);
        return vine_event_loop_set(loop);
-}
\ No newline at end of file
+}
index a86e371..d5b8258 100755 (executable)
@@ -128,6 +128,7 @@ int _vine_service_clone(vine_service_h origin, vine_service_h *cloned)
        cloned_service->state = origin_service->state;
 
        cloned_service->disc_handle = NULL;
+       cloned_service->ip_resolving_session = NULL;
        cloned_service->ip_resolved_cb = NULL;
        cloned_service->ip_resolved_cb_data = NULL;