static void __remove_service_ref(vine_dns_sd_s *dns_sd_handle, DNSServiceRef service_ref)
{
int fd = DNSServiceRefSockFD(service_ref);
+ if (fd < 0) {
+ VINE_LOGE("Invalid fd[%d]", fd);
+ return;
+ }
+
VINE_LOGD("fd[%d] to be removed", fd);
if (event_callbacks.fd_removed_cb)
event_callbacks.fd_removed_cb(fd, dns_sd_handle->user_data);
WEBSOCKET_OP_WRITE,
WEBSOCKET_OP_TERMINATE,
WEBSOCKET_OP_DESTROY,
+ WEBSOCKET_OP_DEINIT,
} websocket_op_code_e;
typedef struct {
}
op_queue.push(op);
-
return 0;
}
lws_callback_all_protocol_vhost_args(vh, protocol, LWS_CALLBACK_USER, NULL, 0);
listen_vh_list.erase(vh);
- lws_vhost_destroy(vh);
- VINE_LOGI("vh[%p] is destroyed", vh);
+ //lws_vhost_destroy(vh);
+ VINE_LOGI("vh[%p] is erased.", vh);
while (!lws_service_adjust_timeout(g_context, 1, 0))
lws_service_tsi(g_context, -1, 0);
free(ws);
}
+static void _destroy_context(struct lws_context *context)
+{
+ lws_context_destroy(context);
+}
+
+static void _clear_listen_vhosts(void)
+{
+ listen_vh_list.clear();
+}
+
+static void _deinit(void)
+{
+ _destroy_context(g_context);
+ g_context = NULL;
+
+ auto it = g_pollfds.begin();
+ while (it != g_pollfds.end()) {
+ free(it->second);
+ it = g_pollfds.erase(it);
+ }
+
+ _clear_listen_vhosts();
+ VINE_LOGI("-");
+}
+
static void _process_websocket_op_request(void)
{
RET_IF(op_queue.empty(), "operation queue is NULL");
op_queue.pop();
RET_IF(op == NULL, "op is NULL");
- if (!op->ws) {
+ if (!op->ws && op->code != WEBSOCKET_OP_DEINIT) {
_del_websocket_op_request(op);
return;
}
case WEBSOCKET_OP_DESTROY:
_destroy_websocket(op->ws);
break;
+ case WEBSOCKET_OP_DEINIT:
+ _deinit();
+ break;
+
default:
break;
}
_del_websocket_op_request(op);
}
+static void do_func(websocket_op_s *op)
+{
+ if (op == NULL)
+ return;
+
+ VINE_LOGD("op[%p] op->code[%d]", op, op->code);
+ switch (op->code) {
+ case WEBSOCKET_OP_OPEN:
+ VINE_LOGI("Ignore.");
+ break;
+ case WEBSOCKET_OP_CONNECT:
+ VINE_LOGI("Ignore.");
+ break;
+ case WEBSOCKET_OP_WRITE:
+ VINE_LOGI("Ignore.");
+ // Do!
+ break;
+ case WEBSOCKET_OP_TERMINATE:
+ VINE_LOGI("Terminate.");
+ _process_pending_destroy(op->vh);
+ // Do!
+ break;
+ case WEBSOCKET_OP_DESTROY:
+ VINE_LOGI("destroy.");
+ //_destroy_websocket(op->ws);
+ // error
+ break;
+ case WEBSOCKET_OP_DEINIT:
+ VINE_LOGI("This cannot be called here.");
+ break;
+ default:
+ break;
+ }
+ _del_websocket_op_request(op);
+}
+
+static void _flush_op_queue(websocket_s *ws)
+{
+ VINE_LOGD("flush ws[%p]", ws);
+
+ size_t count = op_queue.size();
+
+ while (count-- > 0) {
+ op_queue.do_remove_if([&](websocket_op_s *op) {
+ return (ws == NULL || (op && op->ws == ws));
+ }, do_func);
+ }
+}
+
static struct lws_context *_create_context(void)
{
struct lws_context_creation_info info;
return lws_create_context(&info);
}
-static void _destroy_context(struct lws_context *context)
-{
- lws_context_destroy(context);
-}
-
static void _add_websocket_poll_fd(struct lws_pollargs *args)
{
RET_IF(args == NULL, "args is NULL");
lws_get_protocol(wsi)); // host private data.
int n = 0;
- if (ws && ws->close_requested
- && reason != LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS) {
- VINE_LOGD("ws[%p](user: %p) will be closed. Do nothing for it.", ws, ws->user);
- return -1;
- }
-
switch (reason) {
case LWS_CALLBACK_WSI_DESTROY:
VINE_LOGI("wsi[%p](ws: %p) is destroyed.", wsi, ws);
return VINE_DATA_PATH_ERROR_NONE;
}
-static void _clear_listen_vhosts(void)
-{
- for (auto &vh : listen_vh_list)
- lws_vhost_destroy(vh);
- listen_vh_list.clear();
-}
-
static void websocket_deinit(void)
{
if (__sync_sub_and_fetch(&g_ref_count, 1) > 0) {
return;
}
- _destroy_context(g_context);
- g_context = NULL;
-
- auto it = g_pollfds.begin();
- while (it != g_pollfds.end()) {
- free(it->second);
- it = g_pollfds.erase(it);
- }
+ _flush_op_queue(NULL);
+ // Run deinit in eventloop thread
+ _add_websocket_op_request(WEBSOCKET_OP_DEINIT,
+ NULL, NULL, 0, NULL, 0, NULL, 0, NULL);
+ _notify_websocket_op_request();
- _clear_listen_vhosts();
VINE_LOGD("lws is deinitialized.");
}
VINE_LOGD("ws[%p] will be closed.", ws);
ws->close_requested = true;
if (ws->is_server) {
- _add_websocket_op_request(WEBSOCKET_OP_TERMINATE,
- ws, ws->vh, 0, NULL, 0, NULL, 0, NULL);
+ if (_add_websocket_op_request(WEBSOCKET_OP_TERMINATE,
+ ws, ws->vh, 0, NULL, 0, NULL, 0, NULL) < 0)
+ return VINE_DATA_PATH_ERROR_OPERATION_FAILED;
_notify_websocket_op_request();
return VINE_DATA_PATH_ERROR_NONE;
}
lws_service_fd(g_context, pollfd);
// Check for any connection needing forced service.
- while (!lws_service_adjust_timeout(g_context, 1, 0)) {
+ if (!lws_service_adjust_timeout(g_context, 1, 0)) {
// Service any pending webscoket activity.
// Only needed if multiple service threads.
lws_service_tsi(g_context, -1, 0);
return VINE_DATA_PATH_ERROR_NONE;
}
+ _flush_op_queue(ws);
+
ws->destroy_requested = true;
- _add_websocket_op_request(WEBSOCKET_OP_DESTROY, ws, NULL, 0, NULL, 0, NULL, 0, NULL);
- _notify_websocket_op_request();
return VINE_DATA_PATH_ERROR_NONE;
}
virtual ~DataPath() {}
virtual int open(vine_dp_opened_cb callback, void *user_data) = 0;
- virtual void close() = 0;
+ virtual int close() = 0;
virtual int send(unsigned char *buf, size_t len) = 0;
virtual int recv(unsigned char *buf, size_t buf_len, size_t *read_len) = 0;
virtual ~DPServer();
virtual int open(vine_dp_opened_cb callback, void *user_data);
- virtual void close();
+ virtual int close();
virtual int send(unsigned char *buf, size_t len);
virtual int recv(unsigned char *buf, size_t buf_len, size_t *read_len);
virtual ~DPClient();
virtual int open(vine_dp_opened_cb callback, void *user_data);
- virtual void close();
+ virtual int close();
virtual int send(unsigned char *buf, size_t len);
virtual int recv(unsigned char *buf, size_t buf_len, size_t *read_len);
virtual ~DPPubSub();
virtual int open(vine_dp_opened_cb callback, void *user_data);
- virtual void close();
+ virtual int close();
virtual int send(unsigned char *buf, size_t len);
virtual int recv(unsigned char *buf, size_t buf_len, size_t *read_len);
*/
#pragma once
+#include <functional>
#include <mutex>
-#include <queue>
+#include <list>
#include "vine-log.h"
void push(const T &element)
{
std::lock_guard<std::mutex> lock_guard(_q_mutex);
- _queue.push(element);
+ _queue.push_back(element);
}
void pop()
std::lock_guard<std::mutex> lock_guard(_q_mutex);
if (_queue.empty())
return;
- _queue.pop();
+ _queue.pop_front();
}
T &front()
return _queue.empty();
}
+ typedef void (*do_func)(T t);
+
+ template <class UnaryPredicate> void do_remove_if(UnaryPredicate comp_f, do_func do_f)
+ {
+ std::lock_guard<std::mutex> lock_guard(_q_mutex);
+ for (auto it = _queue.begin(); it != _queue.end();) {
+ if (comp_f(*it)) {
+ do_f(*it);
+ it = _queue.erase(it);
+ }
+ else {
+ ++it;
+ }
+ }
+ }
+
private:
- std::queue<T> _queue;
+ std::list<T> _queue;
std::mutex _q_mutex;
};
{
VINE_LOGD("Process event for fd[%d] events[%d] disc_handle[%p]", fd, events, user_data);
vine_disc_s *disc_handle = (vine_disc_s *)user_data;
- if (!disc_handle || !disc_handle->plugin_fn || disc_handle->plugin_fn->process_event == NULL) {
+ if (!disc_handle || !disc_handle->plugin_fn || !disc_handle->plugin_fn->process_event) {
VINE_LOGE("No process_event() defined");
return;
}
} \
} while (0)
+#define RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(state) \
+ do { \
+ if (state != VINE_DP_OPEN_STATE_DONE) { \
+ VINE_LOGE("DP isn't opened."); \
+ return VINE_ERROR_INVALID_OPERATION; \
+ } \
+ } while (0)
+
+
extern vine_dp_plugin_fn g_dp_plugin_fn;
static bool _check_if_valid_ip(vine_address_family_e addr_family, const char *ip)
void DataPath::invoke_opened_cb(int result)
{
+ if (result == VINE_ERROR_NONE)
+ mOpenState = VINE_DP_OPEN_STATE_DONE;
+ else
+ mOpenState = VINE_DP_OPEN_STATE_NONE;
+
if (mOpenedCb)
mOpenedCb(static_cast<void *>(this), (vine_error_e)result, mOpenedCbData);
// called only once.
mOpenedCb = NULL;
mOpenedCbData = NULL;
- mOpenState = VINE_DP_OPEN_STATE_DONE;
}
int DataPath::set_received_cb(vine_dp_received_cb callback, void *user_data)
return ret;
}
-void DPServer::close()
-{
+int DPServer::close() {
+ RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
mOpenState = VINE_DP_OPEN_STATE_NONE;
+ unset_received_cb();
vine_data_path_close(mDataPath);
+ return VINE_ERROR_NONE;
}
int DPServer::send(unsigned char *buf, size_t len)
{
+ RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
return vine_data_path_write(mDataPath, buf, len);
}
int DPServer::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
{
+ RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
return vine_data_path_read(mDataPath, buf, buf_len, read_len);
}
mMethod = VINE_DATA_PATH_METHOD_LWS;
mEventQueue = event_queue;
mSecurity = NULL;
+ mOpenState = VINE_DP_OPEN_STATE_NONE;
mAddrFamily = VINE_ADDRESS_FAMILY_DEFAULT;
mIfaceName = "";
mDataPath = NULL;
VINE_LOGD("DPClient[%p] is created with datapath[%p]", this, datapath);
mEventQueue = event_queue;
mSecurity = NULL;
- mOpenState = VINE_DP_OPEN_STATE_NONE;
+ mOpenState = VINE_DP_OPEN_STATE_DONE;
mDataPath = datapath;
mAddrFamily = vine_data_path_get_addr_family(mDataPath);
mPeerIp = vine_data_path_get_ip(mDataPath);
return ret;
}
-void DPClient::close()
+int DPClient::close()
{
+ RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
mOpenState = VINE_DP_OPEN_STATE_NONE;
+ unset_received_cb();
vine_data_path_close(mDataPath);
+ return VINE_ERROR_NONE;
}
int DPClient::send(unsigned char *buf, size_t len)
{
+ RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
return vine_data_path_write(mDataPath, buf, len);
}
int DPClient::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
{
+ RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
return vine_data_path_read(mDataPath, buf, buf_len, read_len);
}
{
VINE_LOGD("DPPubSub[%p] is deleted.", this);
_vine_security_destroy(mSecurity);
- close();
}
int DPPubSub::set_method(vine_dp_method_e method)
return VINE_ERROR_NONE;
}
-void DPPubSub::close()
+int DPPubSub::close()
{
+ RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
vine_disc_stop_publish(mSdPub);
vine_disc_stop_subscribe(mSdSub);
+ unset_received_cb();
+
vine_disc_destroy(mSdSub);
mSdSub = NULL;
vine_disc_destroy(mSdPub);
mServerDataPath = NULL;
mOpenState = VINE_DP_OPEN_STATE_NONE;
+ return VINE_ERROR_NONE;
}
int DPPubSub::send(unsigned char *buf, size_t len)
{
+ RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
int ret = VINE_ERROR_NONE;
for (auto &peer : mDataPathList) {
int DPPubSub::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
{
+ RET_ERR_IF_DP_OPEN_STATE_ISNT_DONE(mOpenState);
auto &dp_info = mRecvDataPathList.front();
size_t bytes = 0;
int ret = vine_data_path_read(dp_info.first, buf, buf_len, &bytes);
RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
DataPath *_dp = static_cast<DataPath *>(dp);
- _dp->close();
- return VINE_ERROR_NONE;
+ return _dp->close();
}
int _vine_dp_send(vine_dp_h dp, unsigned char *buf, size_t len)
}
if (__vine_unref() == 0) {
- vine_event_loop_stop();
- vine_event_loop_deinit();
vine_data_path_deinit();
vine_disc_deinit();
+ vine_event_loop_stop();
+ vine_event_loop_deinit();
}
VINE_UNLOCK(&__vine_mutex);
return VINE_ERROR_NONE;
}
VINE_UNLOCK(&__vine_mutex);
return vine_event_loop_set(loop);
-}
\ No newline at end of file
+}
cloned_service->state = origin_service->state;
cloned_service->disc_handle = NULL;
+ cloned_service->ip_resolving_session = NULL;
cloned_service->ip_resolved_cb = NULL;
cloned_service->ip_resolved_cb_data = NULL;