2 * Copyright (c) 2021 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License")
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
21 #include "vine-constants.h"
22 #include "vine-data-path.h"
23 #include "vine-event-loop.h"
25 #include "vine-session.h"
26 #include "vine-security.h"
27 #include "vine-utils.h"
32 #include <arpa/inet.h>
36 extern vine_dp_plugin_fn g_dp_plugin_fn;
38 static bool _check_if_valid_ip(vine_address_family_e addr_family, const char *ip)
41 unsigned char buf[sizeof(struct in6_addr)];
43 if (addr_family == VINE_ADDRESS_FAMILY_IPV4) {
44 ret = inet_pton(AF_INET, ip, buf);
45 } else if (addr_family == VINE_ADDRESS_FAMILY_IPV6) {
46 ret = inet_pton(AF_INET6, ip, buf);
48 ret |= inet_pton(AF_INET, ip, buf);
49 ret |= inet_pton(AF_INET6, ip, buf);
55 static bool _check_topic_len(const char * topic)
57 RET_VAL_IF(topic == NULL, false, "topic is NULL");
58 int len = strlen(topic);
59 return len > 0 && len <= VINE_MAX_TOPIC_LEN;
62 static void _received_cb(vine_data_path_h datapath, size_t received_len, void *user_data)
64 if (!datapath || !user_data)
67 static_cast<DataPath *>(user_data)->invoke_received_cb(received_len);
70 static void _terminated_cb(vine_data_path_h datapath, void *user_data)
72 if (!datapath || !user_data)
75 static_cast<DataPath *>(user_data)->invoke_terminated_cb();
78 static void _opened_cb(vine_data_path_h datapath, int result, int port, void *userdata)
80 DataPath *dp = static_cast<DataPath *>(userdata);
83 VINE_LOGI("port[%d] result[%d]", port, result);
84 dp->invoke_opened_cb((vine_error_e)result);
87 static void _accepted_cb(vine_data_path_h datapath, void *user_data)
89 if (!datapath || !user_data)
92 void *event_fd = static_cast<DataPath *>(user_data)->get_eventfd();
94 // datapath is created newly. DP class should be needed for it.
95 // event_fd is the same as corresponding DPServer.
96 DPClient *connected_client_dp = new DPClient(event_fd, datapath);
97 _vine_data_path_set_received_cb(datapath,
98 _received_cb, static_cast<void *>(connected_client_dp));
99 vine_data_path_set_terminated_cb(datapath,
100 _terminated_cb, static_cast<void *>(connected_client_dp));
101 static_cast<DPServer *>(user_data)->invoke_accepted_cb(connected_client_dp);
104 static void _connected_cb(vine_data_path_h datapath, int result, void *user_data)
106 _vine_data_path_set_received_cb(datapath, _received_cb, user_data);
107 vine_data_path_set_terminated_cb(datapath, _terminated_cb, user_data);
108 static_cast<DataPath *>(user_data)->invoke_opened_cb(result);
111 static void _pubsub_received_cb(vine_data_path_h datapath, size_t received_len, void *user_data)
113 if (!datapath || !user_data || received_len == 0)
116 VINE_LOGD("receive %zd bytes from datapath[%p]", received_len, datapath);
118 static_cast<DPPubSub *>(user_data)->noti_received_peer(datapath, received_len);
119 static_cast<DataPath *>(user_data)->invoke_received_cb(received_len);
122 static void _pubsub_terminated_cb(vine_data_path_h datapath, void *user_data)
124 if (!datapath || !user_data)
127 VINE_LOGD("datapath[%p] is terminated by peer.", datapath);
128 static_cast<DPPubSub *>(user_data)->del_joined_peer(datapath);
129 _vine_data_path_destroy(datapath);
132 static void _pubsub_opened_cb(vine_data_path_h datapath, int result, int port, void *userdata)
137 DPPubSub *dp = static_cast<DPPubSub *>(userdata);
140 VINE_LOGI("port[%d] result[%d]", port, result);
142 // Notify user that a listen socket cannot be used anymore.
143 if (result != VINE_ERROR_NONE) {
144 static_cast<DataPath *>(userdata)->invoke_opened_cb(result);
149 int ret = dp->publish_service();
150 if (ret != VINE_ERROR_NONE) {
152 static_cast<DataPath *>(userdata)->invoke_opened_cb(ret);
156 ret = dp->subscribe_service();
157 if (ret != VINE_ERROR_NONE) {
159 static_cast<DataPath *>(userdata)->invoke_opened_cb(ret);
164 static void _pubsub_accepted_cb(vine_data_path_h datapath, void *user_data)
166 if (!datapath || !user_data)
169 DPPubSub *dp = static_cast<DPPubSub *>(user_data);
170 if (dp->get_max_conns() <= dp->get_joined_peer()) {
171 VINE_LOGE("The max connection limit is reached. Ignore [%p].", datapath);
172 dp->decrease_init_disc_num();
173 _vine_data_path_close(datapath);
177 const char *ip = _vine_data_path_get_ip(datapath);
178 int port = _vine_data_path_get_port(datapath);
179 dp->add_joined_peer(ip, port, datapath);
181 _vine_data_path_set_received_cb(datapath, _pubsub_received_cb, user_data);
182 vine_data_path_set_terminated_cb(datapath, _pubsub_terminated_cb, user_data);
184 if (dp->get_open_state() == VINE_DP_PUBSUB_OPEN_STATE_WAIT
185 && dp->decrease_init_disc_num() <= 0) {
186 dp->set_open_state(VINE_DP_PUBSUB_OPEN_STATE_DONE);
187 static_cast<DataPath *>(user_data)->invoke_opened_cb(VINE_ERROR_NONE);
191 static void _pubsub_connected_cb(vine_data_path_h datapath, int result, void *user_data)
193 if (!datapath || !user_data)
196 DPPubSub *dp = static_cast<DPPubSub *>(user_data);
199 VINE_LOGE("connect failure.");
200 dp->decrease_init_disc_num();
204 const char *ip = _vine_data_path_get_ip(datapath);
205 int port = _vine_data_path_get_port(datapath);
207 dp->add_joined_peer(ip, port, datapath);
209 _vine_data_path_set_received_cb(datapath, _pubsub_received_cb, user_data);
210 vine_data_path_set_terminated_cb(datapath, _pubsub_terminated_cb, user_data);
212 if (dp->get_open_state() == VINE_DP_PUBSUB_OPEN_STATE_WAIT
213 && dp->decrease_init_disc_num() <= 0) {
214 dp->set_open_state(VINE_DP_PUBSUB_OPEN_STATE_DONE);
215 static_cast<DataPath *>(user_data)->invoke_opened_cb(VINE_ERROR_NONE);
220 static int __vine_set_discovered_service(vine_service_h service,
221 const char *service_type, const char *service_name,
222 const char *host_name, int port,
223 const map<string, string> &attr, const char *iface_name)
225 int ret = VINE_ERROR_NONE;
226 ret = _vine_service_set_type(service, service_type);
227 RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set service type");
228 ret = _vine_service_set_name(service, service_name);
229 RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set service name");
230 ret = _vine_service_set_host_name(service, host_name);
231 RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set host name");
232 ret = _vine_service_set_port(service, port);
233 RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set port");
235 for (const auto &kv : attr)
236 _vine_service_add_attribute(service, kv.first.c_str(), kv.second.c_str());
238 ret = _vine_service_set_iface_name(service, iface_name);
239 RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set iface_name");
241 return VINE_ERROR_NONE;
244 static void _ip_resolved_cb(vine_disc_h disc, vine_service_h service, bool add,
245 const char *ip, vine_address_family_e address_family, void *user_data)
247 if (!user_data || !add) {
248 VINE_LOGD("state: %s", add ? "add" : "remove");
252 DPPubSub *dp = static_cast<DPPubSub *>(user_data);
253 int port = _vine_service_get_port(service);
254 if (dp->is_joined_peer(_vine_service_get_name(service), ip, port)) {
255 VINE_LOGD("%s:%d was already joined.", ip, port);
259 VINE_LOGD("IP Resolved %s:%d", ip, port);
261 vine_address_family_e supported_addr_family = dp->get_addr_family();
262 if (supported_addr_family != VINE_ADDRESS_FAMILY_DEFAULT
263 && supported_addr_family != address_family) {
264 VINE_LOGD("address family is dismatched. peer type[%d]", address_family);
268 auto attr = _vine_service_get_attributes(service);
269 auto it = attr.find(VINE_DP_PUBSUB_RANK_KEY);
270 if (it == attr.end()) {
271 VINE_LOGE("peer doens't have a rank.");
275 if (dp->get_max_conns() > dp->get_joined_peer()
276 && dp->check_if_connect(it->second.c_str(), address_family, ip, port)) {
277 VINE_LOGD("Try to connect a peer(%s:%d)", ip, port);
278 dp->connect(ip, port);
282 static void _service_discovered_cb(vine_disc_h disc, bool available,
283 const char *service_type, const char *service_name,
284 const char *host_name, int port, const map<string, string> &attr,
285 const char *iface_name, int more_coming, void *user_data)
287 VINE_LOGD("%s is discovered. %s",
288 service_name, available ? "available" : "not available");
290 if (!user_data || !available)
293 vine_disc_h disc_handle;
294 int ret = vine_disc_create(VINE_DISCOVERY_METHOD_DNS_SD, &disc_handle);
295 RET_IF(ret != VINE_ERROR_NONE, "Fail to create a disc");
297 vine_service_h service;
298 ret = _vine_service_create(&service, false);
299 if (ret != VINE_ERROR_NONE) {
300 vine_disc_destroy(disc_handle);
304 auto it = attr.find(VINE_DP_PUBSUB_RANK_KEY);
305 if (it == attr.end()) {
306 VINE_LOGE("peer doens't have a rank.");
307 _vine_service_destroy(service);
308 vine_disc_destroy(disc_handle);
312 ret = _vine_service_set_disc_handle(service, disc_handle);
313 if (ret != VINE_ERROR_NONE) {
314 VINE_LOGE("Fail to set disc_handle. error(%d)", ret);
315 _vine_service_destroy(service);
316 vine_disc_destroy(disc_handle);
320 ret = __vine_set_discovered_service(service,
321 service_type, service_name, host_name, port, attr, iface_name);
322 if (ret != VINE_ERROR_NONE) {
323 VINE_LOGE("Fail to set a service. error(%d)", ret);
324 _vine_service_destroy(service);
325 vine_disc_destroy(disc_handle);
329 DPPubSub *dp = static_cast<DPPubSub *>(user_data);
330 if (more_coming && dp->get_open_state() == VINE_DP_PUBSUB_OPEN_STATE_WAIT) {
331 VINE_LOGD("At least one more result is exist.");
332 dp->increase_init_disc_num();
335 ret = vine_disc_resolve_ip(disc_handle, service,
336 _ip_resolved_cb, user_data,
337 (vine_event_queue_h)static_cast<DataPath *>(user_data)->get_eventfd());
338 if (ret != VINE_ERROR_NONE) {
339 VINE_LOGE("Fail to resolve IP. error(%d)", ret);
340 _vine_service_destroy(service);
341 vine_disc_destroy(disc_handle);
346 static void _service_published_cb(vine_disc_h disc,
347 const char *service_name, vine_error_e error, void *user_data)
349 VINE_LOGD("%s publish request %s.",
350 service_name, error == VINE_ERROR_NONE ? "succeed" : "failed");
355 DPPubSub *dp = static_cast<DPPubSub *>(user_data);
356 dp->set_service_name(service_name);
359 int DataPath::set_security(void *security)
361 return _vine_security_clone(&mSecurity, security);
364 int DataPath::set_iface_name(const std::string &iface_name)
366 mIfaceName = iface_name;
367 return VINE_ERROR_NONE;
370 void DataPath::invoke_opened_cb(int result)
373 mOpenedCb(static_cast<void *>(this), (vine_error_e)result, mOpenedCbData);
377 mOpenedCbData = NULL;
380 int DataPath::set_received_cb(vine_dp_received_cb callback, void *user_data)
382 mReceivedCb = callback;
383 mReceivedCbData = user_data;
384 return VINE_ERROR_NONE;
387 int DataPath::unset_received_cb()
390 mReceivedCbData = NULL;
391 return VINE_ERROR_NONE;
394 void DataPath::invoke_received_cb(size_t received_len)
397 mReceivedCb(static_cast<void *>(this), received_len, mReceivedCbData);
400 int DataPath::set_terminated_cb(vine_dp_terminated_cb callback, void *user_data)
402 mTerminatedCb = callback;
403 mTerminatedCbData = user_data;
404 return VINE_ERROR_NONE;
407 int DataPath::unset_terminated_cb()
409 mTerminatedCb = NULL;
410 mTerminatedCbData = NULL;
411 return VINE_ERROR_NONE;
414 void DataPath::invoke_terminated_cb()
417 mTerminatedCb(static_cast<void *>(this), mTerminatedCbData);
420 DPServer::DPServer(void *event_fd)
422 VINE_LOGD("DPServer[%p] is created.", this);
425 mAddrFamily = VINE_ADDRESS_FAMILY_DEFAULT;
429 mMaxConnNum = VINE_DP_DEFAULT_CONNECTIONS_NUM;
431 mAcceptedCbData = NULL;
433 mReceivedCbData = NULL;
435 mOpenedCbData = NULL;
436 mTerminatedCb = NULL;
437 mTerminatedCbData = NULL;
440 DPServer::~DPServer()
442 VINE_LOGD("DPServer[%p] is deleted.", this);
443 _vine_security_destroy(mSecurity);
444 _vine_data_path_destroy(mDataPath);
447 int DPServer::set_addr_family(vine_address_family_e addr_family)
449 mAddrFamily = addr_family;
450 return VINE_ERROR_NONE;
453 int DPServer::set_remote_ip(vine_address_family_e addr_family, const std::string &ip)
455 return VINE_ERROR_INVALID_OPERATION;
458 int DPServer::set_port(int port)
460 if (port < 0 || port > 65535)
461 return VINE_ERROR_INVALID_PARAMETER;
465 return VINE_ERROR_NONE;
468 int DPServer::set_topic(std::string topic)
470 return VINE_ERROR_INVALID_OPERATION;
473 int DPServer::set_max_connections(int max_conn)
475 if (max_conn < 1 || max_conn > VINE_DP_MAX_CONNECTIONS_NUM)
476 return VINE_ERROR_INVALID_PARAMETER;
478 mMaxConnNum = max_conn;
479 return VINE_ERROR_NONE;
482 int DPServer::set_accepted_cb(vine_dp_accepted_cb callback, void *user_data)
484 mAcceptedCb = callback;
485 mAcceptedCbData = user_data;
486 return VINE_ERROR_NONE;
489 int DPServer::unset_accepted_cb()
492 mAcceptedCbData = NULL;
493 return VINE_ERROR_NONE;
496 void DPServer::invoke_accepted_cb(vine_dp_h dp)
499 mAcceptedCb(static_cast<void *>(this), dp, mAcceptedCbData);
502 int DPServer::open(vine_dp_opened_cb callback, void *user_data)
504 const char *iface_name = mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL;
506 mOpenedCb = callback;
507 mOpenedCbData = user_data;
509 return vine_data_path_open(mAddrFamily, mListenPort, iface_name, mMaxConnNum, mSecurity,
510 _opened_cb, static_cast<void *>(this),
511 _accepted_cb, static_cast<void *>(this),
512 &mDataPath, mEventFd);
515 void DPServer::close()
517 _vine_data_path_close(mDataPath);
520 int DPServer::send(unsigned char *buf, size_t len)
522 return _vine_data_path_write(mDataPath, buf, len);
525 int DPServer::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
527 return _vine_data_path_read(mDataPath, buf, buf_len, read_len);
530 DPClient::DPClient(void *event_fd)
532 VINE_LOGD("DPClient[%p] is created.", this);
535 mAddrFamily = VINE_ADDRESS_FAMILY_DEFAULT;
541 mReceivedCbData = NULL;
543 mOpenedCbData = NULL;
544 mTerminatedCb = NULL;
545 mTerminatedCbData = NULL;
548 DPClient::DPClient(void *event_fd, void *datapath)
550 VINE_LOGD("DPClient[%p] is created with datapath[%p]", this, datapath);
553 mDataPath = datapath;
554 mAddrFamily = VINE_ADDRESS_FAMILY_IPV4;
558 mReceivedCbData = NULL;
560 mOpenedCbData = NULL;
561 mTerminatedCb = NULL;
562 mTerminatedCbData = NULL;
565 DPClient::~DPClient()
567 VINE_LOGD("DPClient[%p] is deleted.", this);
568 _vine_security_destroy(mSecurity);
569 _vine_data_path_destroy(mDataPath);
572 int DPClient::set_addr_family(vine_address_family_e addr_family)
574 return VINE_ERROR_INVALID_OPERATION;
577 int DPClient::set_remote_ip(vine_address_family_e addr_family, const std::string &ip)
579 if (!_check_if_valid_ip(addr_family, ip.c_str()))
580 return VINE_ERROR_INVALID_PARAMETER;
583 mAddrFamily = addr_family;
585 return VINE_ERROR_NONE;
588 int DPClient::set_port(int port)
590 if (port <= 0 || port > 65535) // Do not allow 0
591 return VINE_ERROR_INVALID_PARAMETER;
595 return VINE_ERROR_NONE;
598 int DPClient::set_topic(std::string topic)
600 return VINE_ERROR_INVALID_OPERATION;
603 int DPClient::set_accepted_cb(vine_dp_accepted_cb callback, void *user_data)
605 return VINE_ERROR_INVALID_OPERATION;
608 int DPClient::unset_accepted_cb()
610 return VINE_ERROR_INVALID_OPERATION;
613 int DPClient::open(vine_dp_opened_cb callback, void *user_data)
615 const char *iface_name = mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL;
617 mOpenedCb = callback;
618 mOpenedCbData = user_data;
620 return vine_data_path_connect(mAddrFamily, mServerIp.c_str(), mServerPort,
621 iface_name, mSecurity, _connected_cb, static_cast<void *>(this), &mDataPath, mEventFd);
624 void DPClient::close()
626 _vine_data_path_close(mDataPath);
629 int DPClient::send(unsigned char *buf, size_t len)
631 return _vine_data_path_write(mDataPath, buf, len);
634 int DPClient::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
636 return _vine_data_path_read(mDataPath, buf, buf_len, read_len);
639 DPPubSub::DPPubSub(void *event_fd)
641 VINE_LOGD("DPPubSub[%p] is created.", this);
646 mReceivedCbData = NULL;
648 mOpenedCbData = NULL;
649 mTerminatedCb = NULL;
650 mTerminatedCbData = NULL;
658 mOpenState = VINE_DP_PUBSUB_OPEN_STATE_NONE;
659 mSdPubSubState = VINE_DP_PUBSUB_SD_STATE_NONE;
662 mAddrFamily = VINE_ADDRESS_FAMILY_DEFAULT;
664 mMaxConnNum = VINE_DP_DEFAULT_CONNECTIONS_NUM;
665 mServerDataPath = NULL;
669 DPPubSub::~DPPubSub()
671 VINE_LOGD("DPPubSub[%p] is deleted.", this);
672 _vine_security_destroy(mSecurity);
676 int DPPubSub::set_addr_family(vine_address_family_e addr_family)
678 mAddrFamily = addr_family;
679 return VINE_ERROR_NONE;
682 int DPPubSub::set_remote_ip(vine_address_family_e addr_family, const std::string &ip)
684 return VINE_ERROR_INVALID_OPERATION;
687 int DPPubSub::set_port(int port)
689 if (port < 0 || port > 65535)
690 return VINE_ERROR_INVALID_PARAMETER;
694 return VINE_ERROR_NONE;
697 int DPPubSub::set_topic(std::string topic)
700 return VINE_ERROR_NONE;
703 int DPPubSub::set_max_connections(int max_conn)
705 if (max_conn < 1 || max_conn > VINE_DP_MAX_CONNECTIONS_NUM)
706 return VINE_ERROR_INVALID_PARAMETER;
708 mMaxConnNum = max_conn;
709 return VINE_ERROR_NONE;
712 int DPPubSub::set_accepted_cb(vine_dp_accepted_cb callback, void *user_data)
714 return VINE_ERROR_INVALID_OPERATION;
717 int DPPubSub::unset_accepted_cb()
719 return VINE_ERROR_INVALID_OPERATION;
722 int DPPubSub::connect(const char *ip, int port)
724 vine_data_path_h datapath;
725 const char *iface_name = mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL;
726 int ret = vine_data_path_connect(mAddrFamily, ip, port, iface_name, mSecurity,
727 _pubsub_connected_cb, static_cast<void *>(this), &datapath, mEventFd);
731 int DPPubSub::close_server_dp()
733 int ret = _vine_data_path_close(mServerDataPath);
734 mServerDataPath = NULL;
738 int DPPubSub::publish_service()
740 vine_service_h service;
742 char rank_str[VINE_DP_PUBSUB_RANK_LEN] = {0 , };
743 char service_name[VINE_MAX_SERVICE_NAME_LEN + 1] = {0 , };
745 ret = vine_service_create(&service);
746 if (ret != VINE_ERROR_NONE)
749 vine_service_set_type(service, mTopic.c_str());
750 vine_service_set_port(service, mListenPort);
752 mRank = create_rank();
753 sprintf(rank_str, "%d", mRank);
754 snprintf(service_name, VINE_MAX_SERVICE_NAME_LEN,
755 "%s-%d", VINE_DP_PUBSUB_SERVICE_NAME_PREFIX, mRank);
757 vine_service_set_name(service, service_name);
758 vine_service_add_attribute(service, VINE_DP_PUBSUB_RANK_KEY, (const char *)rank_str);
760 if (mSdPub == NULL) {
761 ret = vine_disc_create(VINE_DISCOVERY_METHOD_DNS_SD, &mSdPub);
762 if (ret != VINE_ERROR_NONE) {
763 VINE_LOGE("Fail to vine_disc_create");
764 vine_service_destroy(service);
768 ret = vine_disc_publish(mSdPub,
770 _service_published_cb, static_cast<void *>(this),
772 if (ret != VINE_ERROR_NONE) {
773 vine_disc_destroy(mSdPub);
776 mSdPubSubState |= VINE_DP_PUBSUB_SD_STATE_PUBLISH;
778 vine_service_destroy(service);
780 VINE_LOGD("Publish %s:%d with rank %d", mTopic.c_str(), mListenPort, mRank);
784 int DPPubSub::subscribe_service()
788 if (mSdSub == NULL) {
789 ret = vine_disc_create(VINE_DISCOVERY_METHOD_DNS_SD, &mSdSub);
790 RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to vine_disc_create");
793 ret = vine_disc_subscribe(mSdSub,
794 mTopic.c_str(), NULL,
795 _service_discovered_cb, static_cast<void *>(this),
797 if (ret != VINE_ERROR_NONE) {
798 vine_disc_destroy(mSdSub);
801 mSdPubSubState |= VINE_DP_PUBSUB_SD_STATE_SUBSCRIBE;
807 int DPPubSub::create_rank()
810 return rand() % VINE_DP_PUBSUB_RANK_MAX;
813 int DPPubSub::open(vine_dp_opened_cb callback, void *user_data)
815 if (mOpenState == VINE_DP_PUBSUB_OPEN_STATE_WAIT) {
816 VINE_LOGE("Ignore duplicate request.");
817 return VINE_ERROR_NOW_IN_PROGRESS;
818 } else if (mOpenState == VINE_DP_PUBSUB_OPEN_STATE_DONE) {
819 VINE_LOGE("Already opened.");
820 return VINE_ERROR_INVALID_OPERATION;
823 mOpenState = VINE_DP_PUBSUB_OPEN_STATE_WAIT;
824 mOpenTimer.start(VINE_DP_PUBSUB_OPEN_TIMEOUT_MS, _open_timer, static_cast<void *>(this));
826 mOpenedCb = callback;
827 mOpenedCbData = user_data;
829 const char *iface_name = mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL;
830 int ret = vine_data_path_open(mAddrFamily, mListenPort, iface_name, mMaxConnNum, mSecurity,
831 _pubsub_opened_cb, static_cast<void *>(this),
832 _pubsub_accepted_cb, static_cast<void *>(this),
833 &mServerDataPath, mEventFd);
834 if (ret != VINE_ERROR_NONE) {
836 mOpenedCbData = NULL;
841 return VINE_ERROR_NONE;
844 void DPPubSub::close()
847 vine_disc_stop_publish(mSdPub);
848 vine_disc_stop_subscribe(mSdSub);
850 vine_disc_destroy(mSdSub);
852 vine_disc_destroy(mSdPub);
856 _vine_data_path_close(mServerDataPath);
857 _vine_data_path_destroy(mServerDataPath);
858 mServerDataPath = NULL;
860 mOpenState = VINE_DP_PUBSUB_OPEN_STATE_NONE;
861 mSdPubSubState = VINE_DP_PUBSUB_SD_STATE_NONE;
864 int DPPubSub::send(unsigned char *buf, size_t len)
866 int ret = VINE_ERROR_NONE;
868 for (auto &peer : mDataPathList) {
871 ret = _vine_data_path_write(peer.second, buf, len);
872 if (ret != VINE_ERROR_NONE) {
873 VINE_LOGE("fail to write to a peer[%p] ", peer.second);
881 int DPPubSub::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
883 auto &dp_info = mRecvDataPathList.front();
885 int ret = _vine_data_path_read(dp_info.first, buf, buf_len, &bytes);
886 if (ret != VINE_ERROR_NONE)
889 dp_info.second -= bytes;
890 VINE_LOGD("%zd bytes remained", dp_info.second);
893 if (dp_info.second == 0)
894 mRecvDataPathList.erase();
896 return VINE_ERROR_NONE;
899 bool DPPubSub::is_joined_peer(const char *service_name, const char *ip, int port)
901 if (mServiceName.compare(service_name) == 0) {
902 VINE_LOGD("It's me!");
907 // TODO: compare the service name.
908 // service name might be unique.
910 DPKey key = std::make_pair(ip, port);
911 return (mDataPathList.count(key) > 0);
914 int DPPubSub::get_joined_peer()
916 return mDataPathList.size();
919 void DPPubSub::add_joined_peer(const char *ip, int port, vine_data_path_h datapath)
921 if (!ip || !datapath)
924 VINE_LOGD("%s:%d, %p is added.", ip, port, datapath);
925 DPKey key = std::make_pair(ip, port);
926 mDataPathList.insert(std::make_pair(key, datapath));
929 void DPPubSub::del_joined_peer(vine_data_path_h datapath)
934 DPMap::iterator found = std::find_if(mDataPathList.begin(),
936 [datapath](std::pair<DPKey, vine_data_path_h> const& item)
938 VINE_LOGD("item.second[%p]", item.second);
939 return (item.second == datapath);
942 if (found == mDataPathList.end()) {
943 VINE_LOGE("Cannot find the datapath[%p].", datapath);
947 VINE_LOGD("datapath[%p] is deleted from list.", datapath);
948 mDataPathList.erase(found);
951 void DPPubSub::clear_joined_peer()
953 for (auto &peer : mDataPathList) {
956 _vine_data_path_close(peer.second);
957 _vine_data_path_destroy(peer.second);
960 mDataPathList.clear();
963 void DPPubSub::noti_received_peer(vine_data_path_h datapath, size_t bytes)
965 mRecvDataPathList.push(std::make_pair(datapath, bytes));
969 static int _get_last_two_octets(string ip)
971 size_t pos = ip.find_last_of(':');
972 string sub = ip.substr(pos + 1);
974 if (sub.size() == 0) {
975 VINE_LOGE("invalid IP.");
979 return std::stoi(sub, NULL, 16);
983 // The smaller the last octet, the higher the priority.
984 // 1: peers priority is higher than me
985 // 0: same or failure
986 // -1: peers priority is lower than me
987 int DPPubSub::compare_ip_priority(const char *peer_ip)
989 std::string peer_ip_str(peer_ip);
990 size_t pos = peer_ip_str.find_last_of('.');
991 std::string subnet = peer_ip_str.substr(0, pos);
992 std::string found_ip;
993 int last = 0, peer_last = 0;
995 if (subnet.size() == 0)
998 for (auto &ip : mIpList) {
999 if (ip.find(subnet) != 0)
1006 VINE_LOGD("peer_ip[%s] found_ip[%s] subnet[%s]", peer_ip, found_ip.c_str(), subnet.c_str());
1008 last = std::stoi(found_ip.substr(pos + 1));
1009 peer_last = std::stoi(peer_ip_str.substr(pos + 1));
1011 VINE_LOGD("Failed to convert last octet. IP is invalid.");
1014 if (last == peer_last)
1016 return (last < peer_last ? 1 : -1);
1019 // If true is return, it will connect to a peer.
1020 bool DPPubSub::check_if_connect(const char *peer_rank,
1021 vine_address_family_e ip_type, const char *peer_ip, int peer_port)
1023 // The smaller the rank value, the higher the priority.
1024 // Connect to a peer when both 1 and 2 are satisfied.
1025 // 1. peer has rank key and value
1026 // 2. the rank value is higher than peers value
1027 int prank = std::stoi(peer_rank);
1029 VINE_LOGD("ip_type[%d] rank[%d], Peer rank[%d] peer_ip[%s] peer_port[%d]",
1030 ip_type, mRank, prank, peer_ip, peer_port);
1033 else if (mRank < prank)
1036 // If rank is the same, Need to check the IP address
1038 if (ip_type == VINE_ADDRESS_FAMILY_IPV4) {
1039 is_higher = compare_ip_priority(peer_ip);
1041 // TODO : handle IPv6 address
1047 // If last value of IP is the same, Need to check the port
1048 if (mListenPort > peer_port)
1054 void DPPubSub::_open_timer(void *user_data)
1056 VINE_LOGD("open timeout reached.");
1060 DPPubSub *dp = static_cast<DPPubSub *>(user_data);
1061 int sd_state = dp->get_sd_pubsub_state();
1062 if ((sd_state & VINE_DP_PUBSUB_SD_STATE_PUBLISH)
1063 && (sd_state & VINE_DP_PUBSUB_SD_STATE_SUBSCRIBE)) {
1064 dp->set_open_state(VINE_DP_PUBSUB_OPEN_STATE_DONE);
1065 static_cast<DataPath *>(user_data)->invoke_opened_cb(VINE_ERROR_NONE);
1068 VINE_LOGD("invoke opened_cb with an error. sd_state[%d]", sd_state);
1069 static_cast<DataPath *>(user_data)->invoke_opened_cb(VINE_ERROR_OPERATION_FAILED);
1072 int _vine_dp_create(vine_session_h session, vine_dp_type_e type, vine_dp_h *dp)
1074 RET_VAL_IF(session == NULL, VINE_ERROR_INVALID_PARAMETER, "session is null.");
1075 RET_VAL_IF(type >= VINE_DP_TYPE_UNKNOWN, VINE_ERROR_INVALID_PARAMETER, "invalid type");
1076 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1078 vine_event_queue_h eq = NULL;
1079 int ret = _vine_session_get_event_queue(session, &eq);
1081 if (ret != VINE_ERROR_NONE || !eq)
1082 return VINE_ERROR_INVALID_PARAMETER;
1084 if (type == VINE_DP_TYPE_SERVER) {
1085 *dp = new DPServer((void *)eq);
1086 } else if (type == VINE_DP_TYPE_CLIENT) {
1087 *dp = new DPClient((void *)eq);
1088 } else if (type == VINE_DP_TYPE_PUBSUB) {
1089 *dp = new DPPubSub((void *)eq);
1092 return VINE_ERROR_NONE;
1095 int _vine_dp_destroy(vine_dp_h dp)
1097 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1099 DataPath *_dp = static_cast<DataPath *>(dp);
1102 return VINE_ERROR_NONE;
1105 int _vine_dp_set_iface_name(vine_dp_h dp, const char *iface_name)
1107 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1108 RET_VAL_IF(iface_name == NULL, VINE_ERROR_INVALID_PARAMETER, "iface_name is null.");
1110 DataPath *_dp = static_cast<DataPath *>(dp);
1111 return _dp->set_iface_name(iface_name);
1114 int _vine_dp_set_addr_family(vine_dp_h dp, vine_address_family_e addr_family)
1116 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1117 RET_VAL_IF(addr_family < VINE_ADDRESS_FAMILY_DEFAULT
1118 || addr_family > VINE_ADDRESS_FAMILY_IPV6,
1119 VINE_ERROR_INVALID_PARAMETER, "addr_family is invalid.");
1121 DataPath *_dp = static_cast<DataPath *>(dp);
1122 return _dp->set_addr_family(addr_family);
1125 int _vine_dp_set_remote_ip(vine_dp_h dp, vine_address_family_e addr_family, const char *ip)
1127 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1128 RET_VAL_IF(ip == NULL, VINE_ERROR_INVALID_PARAMETER, "ip is null.");
1130 DataPath *_dp = static_cast<DataPath *>(dp);
1131 return _dp->set_remote_ip(addr_family, ip);
1134 int _vine_dp_set_port(vine_dp_h dp, int port)
1136 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1138 DataPath *_dp = static_cast<DataPath *>(dp);
1139 return _dp->set_port(port);
1142 int _vine_dp_get_port(vine_dp_h dp, int *port)
1144 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1145 RET_VAL_IF(port == NULL, VINE_ERROR_INVALID_PARAMETER, "port is null.");
1147 DataPath *_dp = static_cast<DataPath *>(dp);
1148 *port = _dp->get_port();
1149 return VINE_ERROR_NONE;
1152 int _vine_dp_set_topic(vine_dp_h dp, const char *topic)
1154 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1155 RET_VAL_IF(topic == NULL, VINE_ERROR_INVALID_PARAMETER, "topic is null.");
1156 RET_VAL_IF(_check_topic_len(topic) == false,
1157 VINE_ERROR_INVALID_PARAMETER, "invalid length of topic.");
1159 DataPath *_dp = static_cast<DataPath *>(dp);
1160 return _dp->set_topic(topic);
1163 int _vine_dp_set_max_connections(vine_dp_h dp, int max_conn)
1165 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1167 DataPath *_dp = static_cast<DataPath *>(dp);
1168 return _dp->set_max_connections(max_conn);
1171 int _vine_dp_set_security(vine_dp_h dp, vine_security_h security)
1173 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1174 RET_VAL_IF(security == NULL, VINE_ERROR_INVALID_PARAMETER, "security is null.");
1176 DataPath *_dp = static_cast<DataPath *>(dp);
1177 return _dp->set_security(security);
1180 int _vine_dp_set_accepted_cb(vine_dp_h dp, vine_dp_accepted_cb callback, void *user_data)
1182 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1183 RET_VAL_IF(callback == NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1185 DataPath *_dp = static_cast<DataPath *>(dp);
1186 return _dp->set_accepted_cb(callback, user_data);
1189 int _vine_dp_unset_accepted_cb(vine_dp_h dp)
1191 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1193 DataPath *_dp = static_cast<DataPath *>(dp);
1194 return _dp->unset_accepted_cb();
1197 int _vine_dp_set_terminated_cb(vine_dp_h dp, vine_dp_terminated_cb callback, void *user_data)
1199 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1200 RET_VAL_IF(callback == NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1202 DataPath *_dp = static_cast<DataPath *>(dp);
1203 return _dp->set_terminated_cb(callback, user_data);
1206 int _vine_dp_unset_terminated_cb(vine_dp_h dp)
1208 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1210 DataPath *_dp = static_cast<DataPath *>(dp);
1211 return _dp->unset_terminated_cb();
1214 int _vine_dp_open(vine_dp_h dp, vine_dp_opened_cb callback, void *user_data)
1216 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1217 RET_VAL_IF(callback == NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1219 DataPath *_dp = static_cast<DataPath *>(dp);
1220 return _dp->open(callback, user_data);
1223 int _vine_dp_close(vine_dp_h dp)
1225 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1227 DataPath *_dp = static_cast<DataPath *>(dp);
1229 return VINE_ERROR_NONE;
1232 int _vine_dp_send(vine_dp_h dp, unsigned char *buf, size_t len)
1234 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1235 RET_VAL_IF(buf == NULL, VINE_ERROR_INVALID_PARAMETER, "buf is null.");
1237 DataPath *_dp = static_cast<DataPath *>(dp);
1238 return _dp->send(buf, len);
1241 int _vine_dp_recv(vine_dp_h dp, unsigned char *buf, size_t buf_len, size_t *read_len)
1243 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1244 RET_VAL_IF(buf == NULL, VINE_ERROR_INVALID_PARAMETER, "buf is null.");
1245 RET_VAL_IF(read_len == NULL, VINE_ERROR_INVALID_PARAMETER, "read_len is null.");
1247 DataPath *_dp = static_cast<DataPath *>(dp);
1248 return _dp->recv(buf, buf_len, read_len);
1251 int _vine_dp_set_received_cb(vine_dp_h dp, vine_dp_received_cb callback, void *user_data)
1253 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1254 RET_VAL_IF(callback== NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1256 DataPath *_dp = static_cast<DataPath *>(dp);
1257 return _dp->set_received_cb(callback, user_data);
1260 int _vine_dp_unset_received_cb(vine_dp_h dp)
1262 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1264 DataPath *_dp = static_cast<DataPath *>(dp);
1265 return _dp->unset_received_cb();