From: Seonah Moon Date: Fri, 15 Oct 2021 01:46:18 +0000 (+0900) Subject: Change DP termination logic X-Git-Tag: submit/tizen/20211028.034129~10 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=7619acc31ed099b8e78f291c59100d10e36e0b8e;p=platform%2Fcore%2Fapi%2Fvine.git Change DP termination logic Change-Id: I123c9fd3babf9f37e68b1ad2d444f6b0b5f30cdb --- diff --git a/plugins/dns-sd/dns-sd-plugin.cpp b/plugins/dns-sd/dns-sd-plugin.cpp index 7637bb5..0c6291d 100755 --- a/plugins/dns-sd/dns-sd-plugin.cpp +++ b/plugins/dns-sd/dns-sd-plugin.cpp @@ -164,6 +164,11 @@ void add_new_fd(vine_dns_sd_s *dns_sd_handle, static void __remove_service_ref(vine_dns_sd_s *dns_sd_handle, DNSServiceRef service_ref) { int fd = DNSServiceRefSockFD(service_ref); + if (fd < 0) { + VINE_LOGE("Invalid fd[%d]", fd); + return; + } + VINE_LOGD("fd[%d] to be removed", fd); if (event_callbacks.fd_removed_cb) event_callbacks.fd_removed_cb(fd, dns_sd_handle->user_data); diff --git a/plugins/libwebsockets/libwebsockets-plugin.cpp b/plugins/libwebsockets/libwebsockets-plugin.cpp index 428a445..60933ee 100755 --- a/plugins/libwebsockets/libwebsockets-plugin.cpp +++ b/plugins/libwebsockets/libwebsockets-plugin.cpp @@ -73,6 +73,7 @@ typedef enum { WEBSOCKET_OP_WRITE, WEBSOCKET_OP_TERMINATE, WEBSOCKET_OP_DESTROY, + WEBSOCKET_OP_DEINIT, } websocket_op_code_e; typedef struct { @@ -257,7 +258,6 @@ static int _add_websocket_op_request(websocket_op_code_e code, } op_queue.push(op); - return 0; } @@ -284,8 +284,8 @@ static void _process_pending_destroy(struct lws_vhost *vh) lws_callback_all_protocol_vhost_args(vh, protocol, LWS_CALLBACK_USER, NULL, 0); listen_vh_list.erase(vh); - lws_vhost_destroy(vh); - VINE_LOGI("vh[%p] is destroyed", vh); + //lws_vhost_destroy(vh); + VINE_LOGI("vh[%p] is erased.", vh); while (!lws_service_adjust_timeout(g_context, 1, 0)) lws_service_tsi(g_context, -1, 0); @@ -318,6 +318,31 @@ static void _destroy_websocket(websocket_s *ws) free(ws); } +static void _destroy_context(struct lws_context *context) +{ + lws_context_destroy(context); +} + +static void _clear_listen_vhosts(void) +{ + listen_vh_list.clear(); +} + +static void _deinit(void) +{ + _destroy_context(g_context); + g_context = NULL; + + auto it = g_pollfds.begin(); + while (it != g_pollfds.end()) { + free(it->second); + it = g_pollfds.erase(it); + } + + _clear_listen_vhosts(); + VINE_LOGI("-"); +} + static void _process_websocket_op_request(void) { RET_IF(op_queue.empty(), "operation queue is NULL"); @@ -326,7 +351,7 @@ static void _process_websocket_op_request(void) op_queue.pop(); RET_IF(op == NULL, "op is NULL"); - if (!op->ws) { + if (!op->ws && op->code != WEBSOCKET_OP_DEINIT) { _del_websocket_op_request(op); return; } @@ -347,6 +372,10 @@ static void _process_websocket_op_request(void) case WEBSOCKET_OP_DESTROY: _destroy_websocket(op->ws); break; + case WEBSOCKET_OP_DEINIT: + _deinit(); + break; + default: break; } @@ -354,6 +383,55 @@ static void _process_websocket_op_request(void) _del_websocket_op_request(op); } +static void do_func(websocket_op_s *op) +{ + if (op == NULL) + return; + + VINE_LOGD("op[%p] op->code[%d]", op, op->code); + switch (op->code) { + case WEBSOCKET_OP_OPEN: + VINE_LOGI("Ignore."); + break; + case WEBSOCKET_OP_CONNECT: + VINE_LOGI("Ignore."); + break; + case WEBSOCKET_OP_WRITE: + VINE_LOGI("Ignore."); + // Do! + break; + case WEBSOCKET_OP_TERMINATE: + VINE_LOGI("Terminate."); + _process_pending_destroy(op->vh); + // Do! + break; + case WEBSOCKET_OP_DESTROY: + VINE_LOGI("destroy."); + //_destroy_websocket(op->ws); + // error + break; + case WEBSOCKET_OP_DEINIT: + VINE_LOGI("This cannot be called here."); + break; + default: + break; + } + _del_websocket_op_request(op); +} + +static void _flush_op_queue(websocket_s *ws) +{ + VINE_LOGD("flush ws[%p]", ws); + + size_t count = op_queue.size(); + + while (count-- > 0) { + op_queue.do_remove_if([&](websocket_op_s *op) { + return (ws == NULL || (op && op->ws == ws)); + }, do_func); + } +} + static struct lws_context *_create_context(void) { struct lws_context_creation_info info; @@ -374,11 +452,6 @@ static struct lws_context *_create_context(void) return lws_create_context(&info); } -static void _destroy_context(struct lws_context *context) -{ - lws_context_destroy(context); -} - static void _add_websocket_poll_fd(struct lws_pollargs *args) { RET_IF(args == NULL, "args is NULL"); @@ -457,12 +530,6 @@ static int _websocket_protocol_cb(struct lws *wsi, lws_get_protocol(wsi)); // host private data. int n = 0; - if (ws && ws->close_requested - && reason != LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS) { - VINE_LOGD("ws[%p](user: %p) will be closed. Do nothing for it.", ws, ws->user); - return -1; - } - switch (reason) { case LWS_CALLBACK_WSI_DESTROY: VINE_LOGI("wsi[%p](ws: %p) is destroyed.", wsi, ws); @@ -718,13 +785,6 @@ static int websocket_init(void) return VINE_DATA_PATH_ERROR_NONE; } -static void _clear_listen_vhosts(void) -{ - for (auto &vh : listen_vh_list) - lws_vhost_destroy(vh); - listen_vh_list.clear(); -} - static void websocket_deinit(void) { if (__sync_sub_and_fetch(&g_ref_count, 1) > 0) { @@ -732,16 +792,12 @@ static void websocket_deinit(void) return; } - _destroy_context(g_context); - g_context = NULL; - - auto it = g_pollfds.begin(); - while (it != g_pollfds.end()) { - free(it->second); - it = g_pollfds.erase(it); - } + _flush_op_queue(NULL); + // Run deinit in eventloop thread + _add_websocket_op_request(WEBSOCKET_OP_DEINIT, + NULL, NULL, 0, NULL, 0, NULL, 0, NULL); + _notify_websocket_op_request(); - _clear_listen_vhosts(); VINE_LOGD("lws is deinitialized."); } @@ -1069,8 +1125,9 @@ static int websocket_close(vine_dp_plugin_h handle) VINE_LOGD("ws[%p] will be closed.", ws); ws->close_requested = true; if (ws->is_server) { - _add_websocket_op_request(WEBSOCKET_OP_TERMINATE, - ws, ws->vh, 0, NULL, 0, NULL, 0, NULL); + if (_add_websocket_op_request(WEBSOCKET_OP_TERMINATE, + ws, ws->vh, 0, NULL, 0, NULL, 0, NULL) < 0) + return VINE_DATA_PATH_ERROR_OPERATION_FAILED; _notify_websocket_op_request(); return VINE_DATA_PATH_ERROR_NONE; } @@ -1103,7 +1160,7 @@ static void websocket_process_event(int fd, int events) lws_service_fd(g_context, pollfd); // Check for any connection needing forced service. - while (!lws_service_adjust_timeout(g_context, 1, 0)) { + if (!lws_service_adjust_timeout(g_context, 1, 0)) { // Service any pending webscoket activity. // Only needed if multiple service threads. lws_service_tsi(g_context, -1, 0); @@ -1140,9 +1197,9 @@ static int websocket_destroy(vine_dp_plugin_h handle) return VINE_DATA_PATH_ERROR_NONE; } + _flush_op_queue(ws); + ws->destroy_requested = true; - _add_websocket_op_request(WEBSOCKET_OP_DESTROY, ws, NULL, 0, NULL, 0, NULL, 0, NULL); - _notify_websocket_op_request(); return VINE_DATA_PATH_ERROR_NONE; } diff --git a/src/include/vine-dp.h b/src/include/vine-dp.h index 2e86615..f0bb97d 100755 --- a/src/include/vine-dp.h +++ b/src/include/vine-dp.h @@ -42,7 +42,7 @@ public: virtual ~DataPath() {} virtual int open(vine_dp_opened_cb callback, void *user_data) = 0; - virtual void close() = 0; + virtual int close() = 0; virtual int send(unsigned char *buf, size_t len) = 0; virtual int recv(unsigned char *buf, size_t buf_len, size_t *read_len) = 0; @@ -106,7 +106,7 @@ public: virtual ~DPServer(); virtual int open(vine_dp_opened_cb callback, void *user_data); - virtual void close(); + virtual int close(); virtual int send(unsigned char *buf, size_t len); virtual int recv(unsigned char *buf, size_t buf_len, size_t *read_len); @@ -156,7 +156,7 @@ public: virtual ~DPClient(); virtual int open(vine_dp_opened_cb callback, void *user_data); - virtual void close(); + virtual int close(); virtual int send(unsigned char *buf, size_t len); virtual int recv(unsigned char *buf, size_t buf_len, size_t *read_len); @@ -216,7 +216,7 @@ public: virtual ~DPPubSub(); virtual int open(vine_dp_opened_cb callback, void *user_data); - virtual void close(); + virtual int close(); virtual int send(unsigned char *buf, size_t len); virtual int recv(unsigned char *buf, size_t buf_len, size_t *read_len); diff --git a/src/include/vine-queue.h b/src/include/vine-queue.h index 5b805e8..dd80d46 100755 --- a/src/include/vine-queue.h +++ b/src/include/vine-queue.h @@ -16,8 +16,9 @@ */ #pragma once +#include #include -#include +#include #include "vine-log.h" @@ -37,7 +38,7 @@ public: void push(const T &element) { std::lock_guard lock_guard(_q_mutex); - _queue.push(element); + _queue.push_back(element); } void pop() @@ -45,7 +46,7 @@ public: std::lock_guard lock_guard(_q_mutex); if (_queue.empty()) return; - _queue.pop(); + _queue.pop_front(); } T &front() @@ -72,7 +73,23 @@ public: return _queue.empty(); } + typedef void (*do_func)(T t); + + template void do_remove_if(UnaryPredicate comp_f, do_func do_f) + { + std::lock_guard lock_guard(_q_mutex); + for (auto it = _queue.begin(); it != _queue.end();) { + if (comp_f(*it)) { + do_f(*it); + it = _queue.erase(it); + } + else { + ++it; + } + } + } + private: - std::queue _queue; + std::list _queue; std::mutex _q_mutex; }; diff --git a/src/vine-disc.cpp b/src/vine-disc.cpp index 8d092bd..7b0679d 100755 --- a/src/vine-disc.cpp +++ b/src/vine-disc.cpp @@ -298,7 +298,7 @@ void __vine_disc_epoll_handler(int fd, int events, void *user_data) { VINE_LOGD("Process event for fd[%d] events[%d] disc_handle[%p]", fd, events, user_data); vine_disc_s *disc_handle = (vine_disc_s *)user_data; - if (!disc_handle || !disc_handle->plugin_fn || disc_handle->plugin_fn->process_event == NULL) { + if (!disc_handle || !disc_handle->plugin_fn || !disc_handle->plugin_fn->process_event) { VINE_LOGE("No process_event() defined"); return; } diff --git a/src/vine-dp.cpp b/src/vine-dp.cpp index c9475ee..e2e04d1 100755 --- a/src/vine-dp.cpp +++ b/src/vine-dp.cpp @@ -44,6 +44,15 @@ using namespace vine; } \ } while (0) +#define RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(state) \ + do { \ + if (state != VINE_DP_OPEN_STATE_DONE) { \ + VINE_LOGE("DP isn't opened."); \ + return VINE_ERROR_INVALID_OPERATION; \ + } \ + } while (0) + + extern vine_dp_plugin_fn g_dp_plugin_fn; static bool _check_if_valid_ip(vine_address_family_e addr_family, const char *ip) @@ -424,13 +433,17 @@ int DataPath::set_iface_name(const std::string &iface_name) void DataPath::invoke_opened_cb(int result) { + if (result == VINE_ERROR_NONE) + mOpenState = VINE_DP_OPEN_STATE_DONE; + else + mOpenState = VINE_DP_OPEN_STATE_NONE; + if (mOpenedCb) mOpenedCb(static_cast(this), (vine_error_e)result, mOpenedCbData); // called only once. mOpenedCb = NULL; mOpenedCbData = NULL; - mOpenState = VINE_DP_OPEN_STATE_DONE; } int DataPath::set_received_cb(vine_dp_received_cb callback, void *user_data) @@ -685,19 +698,23 @@ int DPServer::open(vine_dp_opened_cb callback, void *user_data) return ret; } -void DPServer::close() -{ +int DPServer::close() { + RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState); mOpenState = VINE_DP_OPEN_STATE_NONE; + unset_received_cb(); vine_data_path_close(mDataPath); + return VINE_ERROR_NONE; } int DPServer::send(unsigned char *buf, size_t len) { + RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState); return vine_data_path_write(mDataPath, buf, len); } int DPServer::recv(unsigned char *buf, size_t buf_len, size_t *read_len) { + RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState); return vine_data_path_read(mDataPath, buf, buf_len, read_len); } @@ -707,6 +724,7 @@ DPClient::DPClient(void *event_queue) mMethod = VINE_DATA_PATH_METHOD_LWS; mEventQueue = event_queue; mSecurity = NULL; + mOpenState = VINE_DP_OPEN_STATE_NONE; mAddrFamily = VINE_ADDRESS_FAMILY_DEFAULT; mIfaceName = ""; mDataPath = NULL; @@ -729,7 +747,7 @@ DPClient::DPClient(void *event_queue, void *datapath) VINE_LOGD("DPClient[%p] is created with datapath[%p]", this, datapath); mEventQueue = event_queue; mSecurity = NULL; - mOpenState = VINE_DP_OPEN_STATE_NONE; + mOpenState = VINE_DP_OPEN_STATE_DONE; mDataPath = datapath; mAddrFamily = vine_data_path_get_addr_family(mDataPath); mPeerIp = vine_data_path_get_ip(mDataPath); @@ -929,19 +947,24 @@ int DPClient::open(vine_dp_opened_cb callback, void *user_data) return ret; } -void DPClient::close() +int DPClient::close() { + RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState); mOpenState = VINE_DP_OPEN_STATE_NONE; + unset_received_cb(); vine_data_path_close(mDataPath); + return VINE_ERROR_NONE; } int DPClient::send(unsigned char *buf, size_t len) { + RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState); return vine_data_path_write(mDataPath, buf, len); } int DPClient::recv(unsigned char *buf, size_t buf_len, size_t *read_len) { + RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState); return vine_data_path_read(mDataPath, buf, buf_len, read_len); } @@ -982,7 +1005,6 @@ DPPubSub::~DPPubSub() { VINE_LOGD("DPPubSub[%p] is deleted.", this); _vine_security_destroy(mSecurity); - close(); } int DPPubSub::set_method(vine_dp_method_e method) @@ -1316,11 +1338,14 @@ int DPPubSub::open(vine_dp_opened_cb callback, void *user_data) return VINE_ERROR_NONE; } -void DPPubSub::close() +int DPPubSub::close() { + RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState); vine_disc_stop_publish(mSdPub); vine_disc_stop_subscribe(mSdSub); + unset_received_cb(); + vine_disc_destroy(mSdSub); mSdSub = NULL; vine_disc_destroy(mSdPub); @@ -1332,10 +1357,12 @@ void DPPubSub::close() mServerDataPath = NULL; mOpenState = VINE_DP_OPEN_STATE_NONE; + return VINE_ERROR_NONE; } int DPPubSub::send(unsigned char *buf, size_t len) { + RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState); int ret = VINE_ERROR_NONE; for (auto &peer : mDataPathList) { @@ -1353,6 +1380,7 @@ int DPPubSub::send(unsigned char *buf, size_t len) int DPPubSub::recv(unsigned char *buf, size_t buf_len, size_t *read_len) { + RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState); auto &dp_info = mRecvDataPathList.front(); size_t bytes = 0; int ret = vine_data_path_read(dp_info.first, buf, buf_len, &bytes); @@ -1781,8 +1809,7 @@ int _vine_dp_close(vine_dp_h dp) RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null."); DataPath *_dp = static_cast(dp); - _dp->close(); - return VINE_ERROR_NONE; + return _dp->close(); } int _vine_dp_send(vine_dp_h dp, unsigned char *buf, size_t len) diff --git a/src/vine-private.cpp b/src/vine-private.cpp index c2561f4..7fb97d5 100755 --- a/src/vine-private.cpp +++ b/src/vine-private.cpp @@ -100,10 +100,10 @@ int _vine_deinit() } if (__vine_unref() == 0) { - vine_event_loop_stop(); - vine_event_loop_deinit(); vine_data_path_deinit(); vine_disc_deinit(); + vine_event_loop_stop(); + vine_event_loop_deinit(); } VINE_UNLOCK(&__vine_mutex); return VINE_ERROR_NONE; @@ -135,4 +135,4 @@ int _vine_set_event_loop(vine_event_loop_e loop) } VINE_UNLOCK(&__vine_mutex); return vine_event_loop_set(loop); -} \ No newline at end of file +} diff --git a/src/vine-service.cpp b/src/vine-service.cpp index a86e371..d5b8258 100755 --- a/src/vine-service.cpp +++ b/src/vine-service.cpp @@ -128,6 +128,7 @@ int _vine_service_clone(vine_service_h origin, vine_service_h *cloned) cloned_service->state = origin_service->state; cloned_service->disc_handle = NULL; + cloned_service->ip_resolving_session = NULL; cloned_service->ip_resolved_cb = NULL; cloned_service->ip_resolved_cb_data = NULL;