%bcond_without lws_static_prebuilt
Name: capi-network-vine
Summary: An service discovery framework
-Version: 1.0.3
+Version: 1.0.5
Release: 0
Group: Network & Connectivity/API
License: Apache-2.0
-DUSE_LIBWEBSOCKETS_STATIC_PREBUILT=OFF \
%endif
-DWITH_UNITTEST=ON \
+ -DENABLE_DATAPATH_PLUGIN_DEBUG=ON \
-DWITH_VINE_TEST=ON
make %{?jobs:-j%jobs}
*/
#include <map>
-
+#include <set>
#include <libwebsockets.h>
#include <openssl/ssl.h>
#include <pthread.h>
#include "vine-log.h"
#include "vine-utils.h"
#include "vine-queue.h"
+#include "vine-set.h"
#ifdef ENABLE_DATAPATH_PLUGIN_DEBUG
#define DEBUG_LEVEL (LLL_USER | LLL_ERR | LLL_WARN | LLL_DEBUG | LLL_NOTICE | LLL_INFO)
typedef struct {
struct lws *wsi;
struct lws_vhost *vh;
+ bool is_server;
bool close_requested;
int curr_conn;
int max_conn;
+ char *host_name;
unsigned char *token;
int token_len;
} websocket_op_s;
static VineQueue<websocket_op_s *> op_queue;
+static VineSet<struct lws_vhost *> listen_vh_list;
static int g_ref_count = 0;
static pthread_mutex_t g_lws_mutex = PTHREAD_MUTEX_INITIALIZER;
g_callbacks.connected_cb(0, user_data);
}
+static int __check_vhost(struct lws_vhost *vh)
+{
+ auto it = listen_vh_list.find(vh);
+ if (it == listen_vh_list.end()) {
+ VINE_LOGE("vh[%p] isn't used. port[%d]",
+ vh, lws_get_vhost_port(vh));
+ return 0;
+ }
+
+ return 1;
+}
+
static int _websocket_protocol_cb(struct lws *wsi,
enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
case LWS_CALLBACK_ESTABLISHED: {
VINE_LOGI("Websocket connection is established.");
websocket_s *client_ws = (websocket_s *)lws_wsi_user(wsi);
+ if (!__check_vhost(lws_get_vhost(wsi))) {
+ VINE_LOGE("Ignore a connection.");
+ return -1;
+ }
vhd->curr_conn++;
if (g_callbacks.accepted_cb) {
vine_dp_addr_family_e addr_family;
char ip[INET6_ADDRSTRLEN];
int port;
_get_peer_network_info(wsi, &addr_family, ip, &port);
+ VINE_LOGE("vh[%p] wsi[%p] vhd->user[%p] port[%d]",
+ lws_get_vhost(wsi),
+ wsi, vhd->user, lws_get_vhost_port(lws_get_vhost(wsi)));
g_callbacks.accepted_cb(addr_family, ip, port, client_ws, vhd->user);
}
break;
}
if (ws->close_requested) {
- VINE_LOGI("Close websocket.");
+ VINE_LOGI("Close websocket. ws[%p] ws->user[%p] wsi[%p]",
+ ws, ws->user, wsi);
return -1;
}
return VINE_DATA_PATH_ERROR_NONE;
}
+static void _clear_listen_vhosts(void)
+{
+ for (auto &vh : listen_vh_list)
+ lws_vhost_destroy(vh);
+ listen_vh_list.clear();
+}
+
static void websocket_deinit(void)
{
if (__sync_sub_and_fetch(&g_ref_count, 1) > 0) {
it = g_pollfds.erase(it);
}
+ _clear_listen_vhosts();
VINE_LOGD("lws is deinitialized.");
}
return options;
}
-static struct lws_vhost *_create_vhost(int addr_family,
+static void __vhost_finalized_cb(struct lws_vhost *vh, void *arg)
+{
+ VINE_LOGE("vh[%p] is finalized", vh);
+ listen_vh_list.erase(vh);
+}
+
+static struct lws_vhost *_create_vhost(const char *host_name, int addr_family,
int port, const char *iface_name, vine_dp_ssl ssl)
{
struct lws_context_creation_info info;
info.ssl_cert_filepath = ssl.cert_path;
info.ssl_private_key_filepath = ssl.key_path;
info.protocols = protocols;
- info.vhost_name = "vine-websocket-server";
+ info.vhost_name = host_name ? STRDUP(host_name) : "vine-websocket-server";
info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT | LWS_SERVER_OPTION_EXPLICIT_VHOSTS;
-
+ info.finalize = __vhost_finalized_cb;
if (addr_family == VINE_DP_IPV4)
info.options |= LWS_SERVER_OPTION_DISABLE_IPV6;
else if (addr_family == VINE_DP_IPV6)
int port, const char *iface_name, int max_conn, vine_dp_ssl ssl)
{
ws->max_conn = max_conn;
- ws->vh = _create_vhost(addr_family, port, iface_name, ssl);
+ ws->vh = _create_vhost(ws->host_name, addr_family, port, iface_name, ssl);
if (!ws->vh) {
VINE_LOGE("Failed to create vhost.");
if (g_callbacks.opened_cb)
return;
}
+ //listen_vh_list.insert((void *)ws->vh);
+ listen_vh_list.insert(ws->vh);
+
void *user_data = lws_protocol_vh_priv_zalloc(ws->vh, protocols, sizeof(websocket_s));
memcpy(user_data, ws, sizeof(websocket_s));
int vport = lws_get_vhost_port(ws->vh);
if (g_callbacks.opened_cb)
g_callbacks.opened_cb(VINE_DATA_PATH_ERROR_NONE, vport, ws->user);
+ VINE_LOGD("vh[%p] user[%p] port[%d]", ws->vh, ws->user, vport);
}
static int websocket_open(vine_dp_plugin_h handle,
RET_VAL_IF(!handle, VINE_DATA_PATH_ERROR_INVALID_PARAMETER, "handle is NULL");
RET_VAL_IF(!g_context, VINE_DATA_PATH_ERROR_INVALID_OPERATION, "g_context is NULL");
+ websocket_s *ws = (websocket_s *)handle;
+ ws->is_server = true;
+ VINE_LOGD("ws[%p]", ws);
if (_add_websocket_op_request(WEBSOCKET_OP_OPEN,
(websocket_s *)handle, addr_family, NULL, port, iface_name, max_conn, &ssl) < 0)
return VINE_DATA_PATH_ERROR_OPERATION_FAILED;
client_conn_info.address = ip;
client_conn_info.iface = iface_name;
client_conn_info.path = "/";
- client_conn_info.host = client_conn_info.address;
+ client_conn_info.host = ws->host_name ? ws->host_name : client_conn_info.address;
client_conn_info.origin = client_conn_info.address;
client_conn_info.protocol = "vine-websocket-protocol";
client_conn_info.ssl_connection |= LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK;
client_conn_info.userdata = (void *)ws;
- client_conn_info.vhost = _create_vhost(addr_family, CONTEXT_PORT_NO_LISTEN, iface_name, ssl);
+ client_conn_info.vhost = _create_vhost(ws->host_name,
+ addr_family, CONTEXT_PORT_NO_LISTEN, iface_name, ssl);
ws->wsi = lws_client_connect_via_info(&client_conn_info);
}
websocket_s *ws = (websocket_s *)handle;
ws->close_requested = true;
+ if (ws->is_server) {
+ VINE_LOGE("vh destroy[%p]", ws->vh);
+ listen_vh_list.erase(ws->vh);
+ lws_vhost_destroy(ws->vh);
+ ws->vh = NULL;
+ return VINE_DATA_PATH_ERROR_NONE;
+ }
+
if (ws->wsi)
lws_callback_on_writable(ws->wsi);
(websocket_s *)plugin_data : (websocket_s *)calloc(1, sizeof(websocket_s));
RET_VAL_IF(ws == NULL, VINE_DATA_PATH_ERROR_OUT_OF_MEMORY, "Out of memory");
- VINE_LOGD("plugin_data[%p]", plugin_data);
+ VINE_LOGD("ws[%p], user[%p]", ws, user);
ws->close_requested = false;
ws->curr_conn = 0;
{
RET_VAL_IF(handle == NULL, VINE_DATA_PATH_ERROR_INVALID_PARAMETER, "handle is NULL");
+ VINE_LOGE("ws[%p] is destroyed", handle);
websocket_s *ws = (websocket_s *)handle;
ws->wsi = NULL;
ws->vh = NULL;
ws->user = NULL;
+ free(ws->host_name);
+ ws->host_name = NULL;
free(ws->token);
ws->token = NULL;
ws->token_len = 0;
return VINE_DATA_PATH_ERROR_NONE;
}
+static int websocket_set_host_name(vine_dp_plugin_h handle, const char *name)
+{
+ RET_VAL_IF(handle == NULL, VINE_DATA_PATH_ERROR_INVALID_PARAMETER, "handle is NULL");
+ RET_VAL_IF(name == NULL, VINE_DATA_PATH_ERROR_INVALID_PARAMETER, "name is NULL");
+
+ websocket_s *ws = (websocket_s *)handle;
+ if (ws->host_name)
+ free(ws->host_name);
+ ws->host_name = strdup(name);
+ return VINE_DATA_PATH_ERROR_NONE;
+}
+
void vine_data_path_plugin_init(vine_dp_plugin_fn *fn)
{
fn->init = websocket_init;
fn->set_token = websocket_set_token;
fn->get_token = websocket_get_token;
+ fn->set_host_name = websocket_set_host_name;
}
// DPPubSub uses it to inform own service name to a peer.
int (*set_token)(vine_dp_plugin_h handle, const char *token);
int (*get_token)(vine_dp_plugin_h handle, char **token);
+
+ int (*set_host_name)(vine_dp_plugin_h handle, const char *name);
} vine_dp_plugin_fn;
extern "C" void vine_data_path_plugin_init(vine_dp_plugin_fn *fn);
int vine_data_path_deinit(void);
int vine_data_path_open(vine_address_family_e addr_family, int port,
- const char *iface_name, int max_conn, vine_security_h security,
+ const char *iface_name, int max_conn,
+ vine_security_h security, const char *host_name,
vine_data_path_opened_cb opened_cb, void *opened_cb_data,
vine_data_path_accepted_cb accepted_cb, void *accepted_cb_data,
vine_data_path_h *opened_datapath,
vine_event_queue_h event_fd);
int vine_data_path_connect(vine_address_family_e addr_family,
const char *ip, int port, const char *iface_name,
- vine_security_h security, const char *token,
+ vine_security_h security, const char *host_name, const char *token,
vine_data_path_connected_cb callback, void *user_data,
vine_data_path_h *connected_datapath,
vine_event_queue_h event_fd);
--- /dev/null
+/*
+ * Copyright (c) 2021 Samsung Electronics Co., Ltd. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+*/
+#pragma once
+
+#include <mutex>
+#include <set>
+#include <iterator>
+
+#include "vine-log.h"
+
+template <typename T>
+class VineSet
+{
+public:
+ using iterator = typename std::set<T>::iterator;
+ VineSet()
+ {
+ VINE_LOGD("New Set");
+ }
+ virtual ~VineSet()
+ {
+ VINE_LOGD("Destroy Set");
+ }
+
+ void insert(const T &element)
+ {
+ std::lock_guard<std::mutex> lock_guard(_s_mutex);
+ _set.insert(element);
+ }
+
+ void erase(const T &element)
+ {
+ std::lock_guard<std::mutex> lock_guard(_s_mutex);
+ if (_set.empty())
+ return;
+ _set.erase(element);
+ }
+
+ void clear()
+ {
+ std::lock_guard<std::mutex> lock_guard(_s_mutex);
+ _set.clear();
+ }
+
+ size_t size()
+ {
+ std::lock_guard<std::mutex> lock_guard(_s_mutex);
+ return _set.size();
+ }
+
+ bool empty()
+ {
+ std::lock_guard<std::mutex> lock_guard(_s_mutex);
+ return _set.empty();
+ }
+
+ iterator find(const T &element)
+ {
+ std::lock_guard<std::mutex> lock_guard(_s_mutex);
+ return _set.find(element);
+ }
+
+ iterator begin()
+ {
+ std::lock_guard<std::mutex> lock_guard(_s_mutex);
+ return _set.begin();
+ }
+
+ iterator end()
+ {
+ std::lock_guard<std::mutex> lock_guard(_s_mutex);
+ return _set.end();
+ }
+
+private:
+ std::set<T> _set;
+ std::mutex _s_mutex;
+};
RET_IF(user_data == NULL, "listen_dp is NULL");
vine_data_path_s *listen_dp = (vine_data_path_s *)user_data;
+
+ VINE_LOGE("listen_dp[%p], security[%p]", listen_dp, listen_dp->security);
vine_data_path_s *connected_dp = _vine_data_path_create(__convert_addr_family(addr_family),
addr, port, listen_dp->security, plugin_data, listen_dp->event_fd);
RET_IF(connected_dp == NULL, "Out of memory");
if (ret != VINE_DATA_PATH_ERROR_NONE) {
free(dp->addr);
_vine_security_destroy(dp->security);
+ dp->security = NULL;
free(dp);
return NULL;
}
dp->state = vine_get_default_state(dp, dp->plugin_handle);
-
VINE_LOGD("datapath[%p] is created.", dp);
-
return dp;
}
dp->plugin_handle = NULL;
free(dp->addr);
dp->addr = NULL;
+ _vine_security_destroy(dp->security);
+ dp->security = NULL;
+ dp->listen_dp = NULL;
delete dp->state;
- free(datapath);
+ VINE_LOGD("data_path[%p] is destroyed", datapath);
+ free(datapath);
return VINE_ERROR_NONE;
}
}
int vine_data_path_open(vine_address_family_e addr_family, int port, const char *iface_name,
- int max_conn, vine_security_h security,
+ int max_conn, vine_security_h security, const char *host_name,
vine_data_path_opened_cb opened_cb, void *opened_cb_data,
vine_data_path_accepted_cb accepted_cb, void *accepted_cb_data,
vine_data_path_h *opened_datapath,
dp->accepted_cb = accepted_cb;
dp->accepted_cb_data = accepted_cb_data;
+ // optional
+ if (host_name) {
+ ret = g_dp_plugin_fn.set_host_name(dp->plugin_handle, host_name);
+ if (ret != VINE_DATA_PATH_ERROR_NONE) {
+ vine_data_path_destroy(dp);
+ return __convert_data_path_error_to_vine_error((vine_data_path_error)ret);
+ }
+ }
+
ret = g_dp_plugin_fn.open(dp->plugin_handle, dp_addr_family, port, iface_name, max_conn, ssl);
_destroy_security_info(&ssl);
int vine_data_path_connect(vine_address_family_e addr_family,
const char *ip, int port, const char *iface_name,
- vine_security_h security, const char *token,
+ vine_security_h security, const char *host_name, const char *token,
vine_data_path_connected_cb callback, void *user_data,
vine_data_path_h *connected_datapath, vine_event_queue_h event_fd)
{
_extract_security_info(security, &ssl);
int ret;
+
+ // optional
+ if (host_name) {
+ ret = g_dp_plugin_fn.set_host_name(dp->plugin_handle, host_name);
+ if (ret != VINE_DATA_PATH_ERROR_NONE) {
+ vine_data_path_destroy(dp);
+ return __convert_data_path_error_to_vine_error((vine_data_path_error)ret);
+ }
+ }
+
if (token) {
ret = g_dp_plugin_fn.set_token(dp->plugin_handle, token);
if (ret != VINE_DATA_PATH_ERROR_NONE) {
int ret = vine_data_path_open(mAddrFamily, mListenPort,
mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL,
- mMaxConnNum, mSecurity,
+ mMaxConnNum, mSecurity, NULL,
_opened_cb, static_cast<void *>(this),
_accepted_cb, static_cast<void *>(this),
&mDataPath, mEventFd);
int ret = vine_data_path_connect(mAddrFamily, mPeerIp.c_str(), mPeerPort,
mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL,
- mSecurity, NULL,
+ mSecurity, NULL, NULL,
_connected_cb, static_cast<void *>(this), &mDataPath, mEventFd);
if (ret != VINE_ERROR_NONE)
mOpenState = VINE_DP_OPEN_STATE_NONE;
conn_data->service_name = STRDUP(service_name);
int ret = vine_data_path_connect(mAddrFamily, ip, port, iface_name,
- mSecurity, mId.c_str(),
+ mSecurity, service_name, mId.c_str(),
_pubsub_connected_cb, (void *)conn_data,
&datapath, mEventFd);
return ret;
const size_t map_len = strlen(map);
string rand_str;
+ srand(time(NULL));
generate_n(back_inserter(rand_str), 6,
[&](){
return map[rand() % map_len];
vine_service_h service;
int ret;
char rank_str[VINE_DP_PUBSUB_RANK_LEN] = {0 , };
- char service_name[VINE_MAX_SERVICE_NAME_LEN + 1] = {0 , };
ret = vine_service_create(&service);
if (ret != VINE_ERROR_NONE)
mRank = create_rank();
sprintf(rank_str, "%d", mRank);
- create_id(service_name);
- set_id(service_name);
-
- vine_service_set_name(service, service_name);
+ vine_service_set_name(service, mId.c_str());
vine_service_add_attribute(service, VINE_DP_PUBSUB_RANK_KEY, (const char *)rank_str);
if (mSdPub == NULL) {
mOpenedCb = callback;
mOpenedCbData = user_data;
+ char service_name[VINE_MAX_SERVICE_NAME_LEN + 1] = {0 , };
+ create_id(service_name);
+ set_id(service_name);
+
int ret = vine_data_path_open(mAddrFamily, mListenPort,
mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL,
- mMaxConnNum, mSecurity,
+ mMaxConnNum, mSecurity, mId.c_str(),
_pubsub_opened_cb, static_cast<void *>(this),
_pubsub_accepted_cb, static_cast<void *>(this),
&mServerDataPath, mEventFd);
* Test: vine-pubsub-open-close-test
* Description:
* a PubSub DP opens and closes repeatedly in one session.
- * The interval is 100ms. (DP_LIFETIME)
+ * The interval is 500ms. (DP_LIFETIME)
*/
#include <stdio.h>
#define MAX_EVENTS 50
#define DEFAULT_REPETITION_TIME 100
-#define DP_LIFETIME 100000000 // nanoseconds
+#define DP_LIFETIME 500000000 // nanoseconds
#define TOPIC "pubsub-test"
static int epollfd = 0;
PRINT_IF_ERROR(vine_dp_open(dp,
[](vine_dp_h dp, vine_error_e result, void *user_data) {
if (result != VINE_ERROR_NONE)
- return;
- PRINT_LOG("dp[%p] is opened.", dp);
+ return;
+ int port = 0;
+ vine_dp_get_port(dp, &port);
+ PRINT_LOG("dp[%p] is opened. port[%d]", dp, port);
_start_timer(dp, DP_LIFETIME);
}, NULL), "vine_dp_open");
}