2 * Copyright (c) 2021 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License")
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
21 #include "vine-constants.h"
22 #include "vine-data-path.h"
23 #include "vine-event-loop.h"
25 #include "vine-session.h"
26 #include "vine-security.h"
27 #include "vine-utils.h"
32 #include <arpa/inet.h>
36 #define RET_ERR_IF_DP_OPEN_STATE_ISNT_IDLE(state) \
38 if (state == VINE_DP_OPEN_STATE_WAIT) { \
39 VINE_LOGE("Ignore duplicate request."); \
40 return VINE_ERROR_NOW_IN_PROGRESS; \
41 } else if (state == VINE_DP_OPEN_STATE_DONE) { \
42 VINE_LOGE("Already opened."); \
43 return VINE_ERROR_INVALID_OPERATION; \
47 extern vine_dp_plugin_fn g_dp_plugin_fn;
49 static bool _check_if_valid_ip(vine_address_family_e addr_family, const char *ip)
52 unsigned char buf[sizeof(struct in6_addr)];
54 if (addr_family == VINE_ADDRESS_FAMILY_IPV4) {
55 ret = inet_pton(AF_INET, ip, buf);
56 } else if (addr_family == VINE_ADDRESS_FAMILY_IPV6) {
57 ret = inet_pton(AF_INET6, ip, buf);
59 ret |= inet_pton(AF_INET, ip, buf);
60 ret |= inet_pton(AF_INET6, ip, buf);
66 static bool _check_topic_len(const char * topic)
68 RET_VAL_IF(topic == NULL, false, "topic is NULL");
69 int len = strlen(topic);
70 return len > 0 && len <= VINE_MAX_TOPIC_LEN;
73 static void _received_cb(vine_data_path_h datapath, size_t received_len, void *user_data)
75 if (!datapath || !user_data)
78 static_cast<DataPath *>(user_data)->invoke_received_cb(received_len);
81 static void _terminated_cb(vine_data_path_h datapath, void *user_data)
83 if (!datapath || !user_data)
86 static_cast<DataPath *>(user_data)->invoke_terminated_cb();
89 static void _opened_cb(vine_data_path_h datapath, int result, int port, void *userdata)
91 DataPath *dp = static_cast<DataPath *>(userdata);
93 dp->update_local_address_info();
95 VINE_LOGI("port[%d] result[%d]", port, result);
96 dp->invoke_opened_cb((vine_error_e)result);
99 static void _accepted_cb(vine_data_path_h datapath, void *user_data)
101 if (!datapath || !user_data)
104 void *event_queue = static_cast<DataPath *>(user_data)->get_eventfd();
106 // datapath is created newly. DP class should be needed for it.
107 // event_queue is the same as corresponding DPServer.
108 DPClient *connected_client_dp = new DPClient(event_queue, datapath);
109 vine_data_path_set_received_cb(datapath,
110 _received_cb, static_cast<void *>(connected_client_dp));
111 vine_data_path_set_terminated_cb(datapath,
112 _terminated_cb, static_cast<void *>(connected_client_dp));
114 connected_client_dp->update_local_address_info();
115 static_cast<DPServer *>(user_data)->invoke_accepted_cb(connected_client_dp);
118 static void _connected_cb(vine_data_path_h datapath, int result, void *user_data)
121 vine_data_path_set_received_cb(datapath, _received_cb, user_data);
122 vine_data_path_set_terminated_cb(datapath, _terminated_cb, user_data);
125 DataPath *dp = static_cast<DataPath *>(user_data);
126 dp->update_local_address_info();
127 dp->invoke_opened_cb(result);
130 static void _pubsub_received_cb(vine_data_path_h datapath, size_t received_len, void *user_data)
132 if (!datapath || !user_data || received_len == 0)
135 VINE_LOGD("receive %zd bytes from datapath[%p]", received_len, datapath);
137 static_cast<DPPubSub *>(user_data)->noti_received_peer(datapath, received_len);
138 static_cast<DataPath *>(user_data)->invoke_received_cb(received_len);
141 static void _pubsub_terminated_cb(vine_data_path_h datapath, void *user_data)
143 if (!datapath || !user_data)
146 VINE_LOGD("datapath[%p] is terminated by peer.", datapath);
147 DPPubSub *dp = static_cast<DPPubSub *>(user_data);
149 char *peer_id = STRDUP(dp->get_joined_peer_id(datapath));
150 dp->del_joined_peer(datapath);
152 dp->invoke_peer_left_cb(peer_id);
155 vine_data_path_destroy(datapath);
158 static void _pubsub_opened_cb(vine_data_path_h datapath, int result, int port, void *userdata)
163 DPPubSub *dp = static_cast<DPPubSub *>(userdata);
166 VINE_LOGI("port[%d] result[%d]", port, result);
168 // Notify user that a listen socket cannot be used anymore.
169 if (result != VINE_ERROR_NONE) {
170 static_cast<DataPath *>(userdata)->invoke_opened_cb(result);
175 int ret = dp->publish_service();
176 if (ret != VINE_ERROR_NONE) {
178 static_cast<DataPath *>(userdata)->invoke_opened_cb(ret);
183 static void _pubsub_accepted_cb(vine_data_path_h datapath, void *user_data)
185 if (!datapath || !user_data)
188 DPPubSub *dp = static_cast<DPPubSub *>(user_data);
189 if (dp->get_max_conns() <= dp->get_joined_peer()) {
190 VINE_LOGE("The max connection limit is reached. Ignore [%p].", datapath);
191 vine_data_path_close(datapath);
196 if (vine_data_path_get_token(datapath, &token) != VINE_ERROR_NONE) {
197 VINE_LOGE("Cannot find peer's service name. Ignore[%p]", datapath);
198 vine_data_path_close(datapath);
199 vine_data_path_destroy(datapath);
203 dp->add_joined_peer(token, datapath);
205 vine_data_path_set_received_cb(datapath, _pubsub_received_cb, user_data);
206 vine_data_path_set_terminated_cb(datapath, _pubsub_terminated_cb, user_data);
208 dp->invoke_peer_joined_cb(token);
212 struct pubsub_connect_data {
214 const char *service_name;
217 static void _pubsub_connected_cb(vine_data_path_h datapath, int result, void *user_data)
219 if (!datapath || !user_data || result != 0) {
220 VINE_LOGE("connect failure.");
224 struct pubsub_connect_data *conn_data = (struct pubsub_connect_data *)user_data;
225 if (!conn_data->service_name) {
226 VINE_LOGE("service_name is NULL.");
227 vine_data_path_close(datapath);
228 vine_data_path_destroy(datapath);
232 DPPubSub *dp = static_cast<DPPubSub *>(conn_data->dp);
233 dp->add_joined_peer(conn_data->service_name, datapath);
235 vine_data_path_set_received_cb(datapath, _pubsub_received_cb, conn_data->dp);
236 vine_data_path_set_terminated_cb(datapath, _pubsub_terminated_cb, conn_data->dp);
238 dp->invoke_peer_joined_cb(conn_data->service_name);
239 conn_data->dp = NULL;
244 static int __vine_set_discovered_service(vine_service_h service,
245 const char *service_type, const char *service_name,
246 const char *host_name, int port,
247 const map<string, string> &attr, const char *iface_name)
249 int ret = VINE_ERROR_NONE;
250 ret = _vine_service_set_type(service, service_type);
251 RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set service type");
252 ret = _vine_service_set_name(service, service_name);
253 RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set service name");
254 ret = _vine_service_set_host_name(service, host_name);
255 RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set host name");
256 ret = _vine_service_set_port(service, port);
257 RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set port");
259 for (const auto &kv : attr)
260 _vine_service_add_attribute(service, kv.first.c_str(), kv.second.c_str());
262 ret = _vine_service_set_iface_name(service, iface_name);
263 RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set iface_name");
265 return VINE_ERROR_NONE;
268 static void _ip_resolved_cb(vine_disc_h disc, vine_service_h service, bool add,
269 const char *ip, vine_address_family_e address_family, void *user_data)
271 if (!user_data || !add) {
272 VINE_LOGD("state: %s", add ? "add" : "remove");
276 DPPubSub *dp = static_cast<DPPubSub *>(user_data);
277 int port = _vine_service_get_port(service);
278 if (dp->is_joined_peer(_vine_service_get_name(service), ip)) {
279 VINE_LOGD("%s:%d was already joined.", ip, port);
283 VINE_LOGD("IP Resolved %s:%d", ip, port);
285 vine_address_family_e supported_addr_family = dp->get_addr_family();
286 if (supported_addr_family != VINE_ADDRESS_FAMILY_DEFAULT
287 && supported_addr_family != address_family) {
288 VINE_LOGD("address family is dismatched. peer type[%d]", address_family);
292 auto attr = _vine_service_get_attributes(service);
293 auto it = attr.find(VINE_DP_PUBSUB_RANK_KEY);
294 if (it == attr.end()) {
295 VINE_LOGE("peer doens't have a rank.");
299 if (dp->get_max_conns() > dp->get_joined_peer()
300 && dp->check_if_connect(it->second.c_str(), address_family, ip, port)) {
301 const char *service_name = _vine_service_get_name(service);
305 VINE_LOGD("Try to connect a peer(%s:%d)", ip, port);
306 dp->connect(service_name, ip, port);
310 static void _service_discovered_cb(vine_disc_h disc, bool available,
311 const char *service_type, const char *service_name,
312 const char *host_name, int port, const map<string, string> &attr,
313 const char *iface_name, int more_coming, void *user_data)
315 VINE_LOGD("%s is discovered. %s",
316 service_name, available ? "available" : "not available");
318 if (!user_data || !available)
321 vine_disc_h disc_handle;
322 int ret = vine_disc_create(VINE_DISCOVERY_METHOD_DNS_SD, &disc_handle);
323 RET_IF(ret != VINE_ERROR_NONE, "Fail to create a disc");
325 vine_service_h service;
326 ret = _vine_service_create(&service, false);
327 if (ret != VINE_ERROR_NONE) {
328 vine_disc_destroy(disc_handle);
332 auto it = attr.find(VINE_DP_PUBSUB_RANK_KEY);
333 if (it == attr.end()) {
334 VINE_LOGE("peer doens't have a rank.");
335 _vine_service_destroy(service);
336 vine_disc_destroy(disc_handle);
340 ret = _vine_service_set_disc_handle(service, disc_handle);
341 if (ret != VINE_ERROR_NONE) {
342 VINE_LOGE("Fail to set disc_handle. error(%d)", ret);
343 _vine_service_destroy(service);
344 vine_disc_destroy(disc_handle);
348 ret = __vine_set_discovered_service(service,
349 service_type, service_name, host_name, port, attr, iface_name);
350 if (ret != VINE_ERROR_NONE) {
351 VINE_LOGE("Fail to set a service. error(%d)", ret);
352 _vine_service_destroy(service);
353 vine_disc_destroy(disc_handle);
357 ret = vine_disc_resolve_ip(disc_handle, service,
358 _ip_resolved_cb, user_data,
359 (vine_event_queue_h)static_cast<DataPath *>(user_data)->get_eventfd());
360 if (ret != VINE_ERROR_NONE) {
361 VINE_LOGE("Fail to resolve IP. error(%d)", ret);
362 _vine_service_destroy(service);
363 vine_disc_destroy(disc_handle);
368 static void _service_published_cb(vine_disc_h disc,
369 const char *service_name, vine_error_e error, void *user_data)
371 VINE_LOGD("%s publish request %s.",
372 service_name, error == VINE_ERROR_NONE ? "succeed" : "failed");
377 DPPubSub *dp = static_cast<DPPubSub *>(user_data);
378 dp->set_id(service_name);
379 int ret = dp->subscribe_service();
380 if (ret != VINE_ERROR_NONE)
383 static_cast<DataPath *>(user_data)->invoke_opened_cb(ret);
386 int DataPath::set_security(void *security)
388 return _vine_security_clone(&mSecurity, security);
391 int DataPath::set_iface_name(const std::string &iface_name)
393 mIfaceName = iface_name;
394 return VINE_ERROR_NONE;
397 void DataPath::invoke_opened_cb(int result)
400 mOpenedCb(static_cast<void *>(this), (vine_error_e)result, mOpenedCbData);
404 mOpenedCbData = NULL;
405 mOpenState = VINE_DP_OPEN_STATE_DONE;
408 int DataPath::set_received_cb(vine_dp_received_cb callback, void *user_data)
410 mReceivedCb = callback;
411 mReceivedCbData = user_data;
412 return VINE_ERROR_NONE;
415 int DataPath::unset_received_cb()
418 mReceivedCbData = NULL;
419 return VINE_ERROR_NONE;
422 void DataPath::invoke_received_cb(size_t received_len)
425 mReceivedCb(static_cast<void *>(this), received_len, mReceivedCbData);
428 int DataPath::set_terminated_cb(vine_dp_terminated_cb callback, void *user_data)
430 mTerminatedCb = callback;
431 mTerminatedCbData = user_data;
432 return VINE_ERROR_NONE;
435 int DataPath::unset_terminated_cb()
437 mTerminatedCb = NULL;
438 mTerminatedCbData = NULL;
439 return VINE_ERROR_NONE;
442 void DataPath::invoke_terminated_cb()
444 mOpenState = VINE_DP_OPEN_STATE_NONE;
446 mTerminatedCb(static_cast<void *>(this), mTerminatedCbData);
449 DPServer::DPServer(void *event_queue)
451 VINE_LOGD("DPServer[%p] is created.", this);
452 mEventQueue = event_queue;
454 mAddrFamily = VINE_ADDRESS_FAMILY_DEFAULT;
456 mOpenState = VINE_DP_OPEN_STATE_NONE;
459 mMaxConnNum = VINE_DP_DEFAULT_CONNECTIONS_NUM;
461 mAcceptedCbData = NULL;
463 mReceivedCbData = NULL;
465 mOpenedCbData = NULL;
466 mTerminatedCb = NULL;
467 mTerminatedCbData = NULL;
470 DPServer::~DPServer()
472 VINE_LOGD("DPServer[%p] is deleted.", this);
473 _vine_security_destroy(mSecurity);
474 vine_data_path_destroy(mDataPath);
477 int DPServer::get_id(char **id)
479 return VINE_ERROR_INVALID_OPERATION;
482 int DPServer::set_addr_family(vine_address_family_e addr_family)
484 mAddrFamily = addr_family;
485 return VINE_ERROR_NONE;
488 int DPServer::set_remote_ip(vine_address_family_e addr_family, const std::string &ip)
490 return VINE_ERROR_INVALID_OPERATION;
493 int DPServer::get_remote_ip(vine_address_family_e *addr_family, char **ip)
495 return VINE_ERROR_INVALID_OPERATION;
498 int DPServer::set_remote_port(int port)
500 return VINE_ERROR_INVALID_OPERATION;
503 int DPServer::get_remote_port()
508 int DPServer::set_port(int port)
510 if (port < 0 || port > 65535)
511 return VINE_ERROR_INVALID_PARAMETER;
515 return VINE_ERROR_NONE;
518 int DPServer::get_ip(vine_address_family_e *addr_family, char **ip)
520 *addr_family = mAddrFamily;
521 *ip = STRDUP(mLocalIp.c_str());
522 return VINE_ERROR_NONE;
525 int DPServer::update_local_address_info()
530 int ret = vine_data_path_get_local_address_info(mDataPath, &addr_family, &ip, &port);
531 if (ret != VINE_ERROR_NONE)
534 mLocalIp = string(ip);
537 return VINE_ERROR_NONE;
540 int DPServer::set_topic(std::string topic)
542 return VINE_ERROR_INVALID_OPERATION;
545 int DPServer::set_max_connections(int max_conn)
547 if (max_conn < 1 || max_conn > VINE_DP_MAX_CONNECTIONS_NUM)
548 return VINE_ERROR_INVALID_PARAMETER;
550 mMaxConnNum = max_conn;
551 return VINE_ERROR_NONE;
554 int DPServer::set_accepted_cb(vine_dp_accepted_cb callback, void *user_data)
556 mAcceptedCb = callback;
557 mAcceptedCbData = user_data;
558 return VINE_ERROR_NONE;
561 int DPServer::unset_accepted_cb()
564 mAcceptedCbData = NULL;
565 return VINE_ERROR_NONE;
568 int DPServer::set_peer_joined_cb(vine_dp_peer_joined_cb callback, void *user_data)
570 return VINE_ERROR_INVALID_OPERATION;
573 int DPServer::unset_peer_joined_cb()
575 return VINE_ERROR_INVALID_OPERATION;
578 int DPServer::set_peer_left_cb(vine_dp_peer_left_cb callback, void *user_data)
580 return VINE_ERROR_INVALID_OPERATION;
583 int DPServer::unset_peer_left_cb()
585 return VINE_ERROR_INVALID_OPERATION;
588 void DPServer::invoke_accepted_cb(vine_dp_h dp)
591 mAcceptedCb(static_cast<void *>(this), dp, mAcceptedCbData);
594 int DPServer::open(vine_dp_opened_cb callback, void *user_data)
596 RET_ERR_IF_DP_OPEN_STATE_ISNT_IDLE(mOpenState);
598 mOpenedCb = callback;
599 mOpenedCbData = user_data;
600 mOpenState = VINE_DP_OPEN_STATE_WAIT;
602 int ret = vine_data_path_open(mAddrFamily, mListenPort,
603 mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL,
604 mMaxConnNum, mSecurity, NULL,
605 _opened_cb, static_cast<void *>(this),
606 _accepted_cb, static_cast<void *>(this),
607 &mDataPath, mEventQueue);
608 if (ret != VINE_ERROR_NONE)
609 mOpenState = VINE_DP_OPEN_STATE_NONE;
614 void DPServer::close()
616 mOpenState = VINE_DP_OPEN_STATE_NONE;
617 vine_data_path_close(mDataPath);
620 int DPServer::send(unsigned char *buf, size_t len)
622 return vine_data_path_write(mDataPath, buf, len);
625 int DPServer::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
627 return vine_data_path_read(mDataPath, buf, buf_len, read_len);
630 DPClient::DPClient(void *event_queue)
632 VINE_LOGD("DPClient[%p] is created.", this);
633 mEventQueue = event_queue;
635 mAddrFamily = VINE_ADDRESS_FAMILY_DEFAULT;
642 mReceivedCbData = NULL;
644 mOpenedCbData = NULL;
645 mTerminatedCb = NULL;
646 mTerminatedCbData = NULL;
648 isCreatedByServerDp = false;
651 DPClient::DPClient(void *event_queue, void *datapath)
653 VINE_LOGD("DPClient[%p] is created with datapath[%p]", this, datapath);
654 mEventQueue = event_queue;
656 mOpenState = VINE_DP_OPEN_STATE_NONE;
657 mDataPath = datapath;
658 mAddrFamily = vine_data_path_get_addr_family(mDataPath);
659 mPeerIp = vine_data_path_get_ip(mDataPath);
660 mPeerPort = vine_data_path_get_port(mDataPath);
663 mReceivedCbData = NULL;
665 mOpenedCbData = NULL;
666 mTerminatedCb = NULL;
667 mTerminatedCbData = NULL;
669 isCreatedByServerDp = true;
672 DPClient::~DPClient()
674 VINE_LOGD("DPClient[%p] is deleted.", this);
675 _vine_security_destroy(mSecurity);
676 vine_data_path_destroy(mDataPath);
679 int DPClient::get_id(char **id)
681 return VINE_ERROR_INVALID_OPERATION;
684 int DPClient::set_addr_family(vine_address_family_e addr_family)
686 return VINE_ERROR_INVALID_OPERATION;
689 int DPClient::set_remote_ip(vine_address_family_e addr_family, const std::string &ip)
691 RET_VAL_IF(isCreatedByServerDp, VINE_ERROR_INVALID_OPERATION, "cannot set remote IP");
692 if (!_check_if_valid_ip(addr_family, ip.c_str()))
693 return VINE_ERROR_INVALID_PARAMETER;
696 mAddrFamily = addr_family;
698 return VINE_ERROR_NONE;
701 int DPClient::get_remote_ip(vine_address_family_e *addr_family, char **ip)
703 *addr_family = mAddrFamily;
704 *ip = STRDUP(mPeerIp.c_str());
705 return VINE_ERROR_NONE;
708 int DPClient::set_remote_port(int port)
710 RET_VAL_IF(isCreatedByServerDp, VINE_ERROR_INVALID_OPERATION, "cannot set port");
711 if (port <= 0 || port > 65535) // Do not allow 0
712 return VINE_ERROR_INVALID_PARAMETER;
716 return VINE_ERROR_NONE;
719 int DPClient::set_port(int port)
721 return VINE_ERROR_INVALID_OPERATION;
724 int DPClient::get_ip(vine_address_family_e *addr_family, char **ip)
726 *addr_family = mAddrFamily;
727 *ip = STRDUP(mLocalIp.c_str());
728 return VINE_ERROR_NONE;
731 int DPClient::update_local_address_info()
736 int ret = vine_data_path_get_local_address_info(mDataPath, &addr_family, &ip, &port);
737 if (ret != VINE_ERROR_NONE)
740 mLocalIp = string(ip);
745 return VINE_ERROR_NONE;
748 int DPClient::set_topic(std::string topic)
750 return VINE_ERROR_INVALID_OPERATION;
753 int DPClient::set_accepted_cb(vine_dp_accepted_cb callback, void *user_data)
755 return VINE_ERROR_INVALID_OPERATION;
758 int DPClient::unset_accepted_cb()
760 return VINE_ERROR_INVALID_OPERATION;
763 int DPClient::set_peer_joined_cb(vine_dp_peer_joined_cb callback, void *user_data)
765 return VINE_ERROR_INVALID_OPERATION;
768 int DPClient::unset_peer_joined_cb()
770 return VINE_ERROR_INVALID_OPERATION;
773 int DPClient::set_peer_left_cb(vine_dp_peer_left_cb callback, void *user_data)
775 return VINE_ERROR_INVALID_OPERATION;
778 int DPClient::unset_peer_left_cb()
780 return VINE_ERROR_INVALID_OPERATION;
783 int DPClient::open(vine_dp_opened_cb callback, void *user_data)
785 RET_VAL_IF(isCreatedByServerDp, VINE_ERROR_INVALID_OPERATION, "cannot open");
786 RET_ERR_IF_DP_OPEN_STATE_ISNT_IDLE(mOpenState);
788 mOpenedCb = callback;
789 mOpenedCbData = user_data;
790 mOpenState = VINE_DP_OPEN_STATE_WAIT;
792 int ret = vine_data_path_connect(mAddrFamily, mPeerIp.c_str(), mPeerPort,
793 mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL,
794 mSecurity, NULL, NULL,
795 _connected_cb, static_cast<void *>(this), &mDataPath, mEventQueue);
796 if (ret != VINE_ERROR_NONE)
797 mOpenState = VINE_DP_OPEN_STATE_NONE;
801 void DPClient::close()
803 mOpenState = VINE_DP_OPEN_STATE_NONE;
804 vine_data_path_close(mDataPath);
807 int DPClient::send(unsigned char *buf, size_t len)
809 return vine_data_path_write(mDataPath, buf, len);
812 int DPClient::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
814 return vine_data_path_read(mDataPath, buf, buf_len, read_len);
817 DPPubSub::DPPubSub(void *event_queue)
819 VINE_LOGD("DPPubSub[%p] is created.", this);
820 mEventQueue = event_queue;
823 mOpenState = VINE_DP_OPEN_STATE_NONE;
825 mReceivedCbData = NULL;
827 mOpenedCbData = NULL;
828 mTerminatedCb = NULL;
829 mTerminatedCbData = NULL;
830 mPeerJoinedCb = NULL;
831 mPeerJoinedCbData = NULL;
833 mPeerLeftCbData = NULL;
842 mAddrFamily = VINE_ADDRESS_FAMILY_DEFAULT;
844 mMaxConnNum = VINE_DP_DEFAULT_CONNECTIONS_NUM;
845 mServerDataPath = NULL;
849 DPPubSub::~DPPubSub()
851 VINE_LOGD("DPPubSub[%p] is deleted.", this);
852 _vine_security_destroy(mSecurity);
856 void DPPubSub::set_id(const char *id)
858 if (id == NULL || mId.compare(id) == 0)
861 VINE_LOGD("Id is changed %s -> %s", mId.c_str(), id);
865 int DPPubSub::get_id(char **id)
867 *id = STRDUP(mId.c_str());
868 return VINE_ERROR_NONE;
871 int DPPubSub::set_addr_family(vine_address_family_e addr_family)
873 mAddrFamily = addr_family;
874 return VINE_ERROR_NONE;
877 int DPPubSub::set_remote_ip(vine_address_family_e addr_family, const std::string &ip)
879 return VINE_ERROR_INVALID_OPERATION;
882 int DPPubSub::get_remote_ip(vine_address_family_e *addr_family, char **ip)
884 return VINE_ERROR_INVALID_OPERATION;
887 int DPPubSub::set_remote_port(int port)
889 return VINE_ERROR_INVALID_OPERATION;
892 int DPPubSub::get_remote_port()
897 int DPPubSub::set_port(int port)
899 if (port < 0 || port > 65535)
900 return VINE_ERROR_INVALID_PARAMETER;
904 return VINE_ERROR_NONE;
907 int DPPubSub::get_ip(vine_address_family_e *addr_family, char **ip)
909 return VINE_ERROR_INVALID_OPERATION;
912 int DPPubSub::update_local_address_info()
917 int ret = vine_data_path_get_local_address_info(mServerDataPath, &addr_family, &ip, &port);
918 if (ret != VINE_ERROR_NONE)
921 mLocalIp = string(ip);
924 return VINE_ERROR_NONE;
927 int DPPubSub::set_topic(std::string topic)
930 return VINE_ERROR_NONE;
933 int DPPubSub::set_max_connections(int max_conn)
935 if (max_conn < 1 || max_conn > VINE_DP_MAX_CONNECTIONS_NUM)
936 return VINE_ERROR_INVALID_PARAMETER;
938 mMaxConnNum = max_conn;
939 return VINE_ERROR_NONE;
942 int DPPubSub::set_accepted_cb(vine_dp_accepted_cb callback, void *user_data)
944 return VINE_ERROR_INVALID_OPERATION;
947 int DPPubSub::unset_accepted_cb()
949 return VINE_ERROR_INVALID_OPERATION;
952 int DPPubSub::set_peer_joined_cb(vine_dp_peer_joined_cb callback, void *user_data)
954 mPeerJoinedCb = callback;
955 mPeerJoinedCbData = user_data;
957 return VINE_ERROR_NONE;
960 int DPPubSub::unset_peer_joined_cb()
962 mPeerJoinedCb = NULL;
963 mPeerJoinedCbData = NULL;
965 return VINE_ERROR_NONE;
968 int DPPubSub::set_peer_left_cb(vine_dp_peer_left_cb callback, void *user_data)
970 mPeerLeftCb = callback;
971 mPeerLeftCbData = user_data;
973 return VINE_ERROR_NONE;
976 int DPPubSub::unset_peer_left_cb()
979 mPeerLeftCbData = NULL;
981 return VINE_ERROR_NONE;
984 void DPPubSub::invoke_peer_joined_cb(const char *peer_id)
987 mPeerJoinedCb(static_cast<void *>(this), peer_id, mPeerJoinedCbData);
990 void DPPubSub::invoke_peer_left_cb(const char *peer_id)
993 mPeerLeftCb(static_cast<void *>(this), peer_id, mPeerJoinedCbData);
996 int DPPubSub::connect(const char *service_name, const char *ip, int port)
998 vine_data_path_h datapath = NULL;
999 const char *iface_name = mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL;
1000 struct pubsub_connect_data *conn_data
1001 = (struct pubsub_connect_data *)calloc(1, sizeof(struct pubsub_connect_data));
1003 conn_data->dp = static_cast<void *>(this);
1004 conn_data->service_name = STRDUP(service_name);
1006 int ret = vine_data_path_connect(mAddrFamily, ip, port, iface_name,
1007 mSecurity, service_name, mId.c_str(),
1008 _pubsub_connected_cb, (void *)conn_data,
1009 &datapath, mEventQueue);
1013 int DPPubSub::close_server_dp()
1015 int ret = vine_data_path_close(mServerDataPath);
1016 mServerDataPath = NULL;
1020 void DPPubSub::create_id(char id[])
1022 const char *map = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
1023 const size_t map_len = strlen(map);
1027 generate_n(back_inserter(rand_str), 6,
1029 return map[rand() % map_len];
1031 snprintf(id, VINE_MAX_SERVICE_NAME_LEN, "%s-%s",
1032 VINE_DP_PUBSUB_SERVICE_NAME_PREFIX, rand_str.c_str());
1035 int DPPubSub::publish_service()
1037 vine_service_h service;
1039 char rank_str[VINE_DP_PUBSUB_RANK_LEN] = {0 , };
1041 ret = vine_service_create(&service);
1042 if (ret != VINE_ERROR_NONE)
1045 vine_service_set_type(service, mTopic.c_str());
1046 vine_service_set_port(service, mListenPort);
1048 mRank = create_rank();
1049 sprintf(rank_str, "%d", mRank);
1051 vine_service_set_name(service, mId.c_str());
1052 vine_service_add_attribute(service, VINE_DP_PUBSUB_RANK_KEY, (const char *)rank_str);
1054 if (mSdPub == NULL) {
1055 ret = vine_disc_create(VINE_DISCOVERY_METHOD_DNS_SD, &mSdPub);
1056 if (ret != VINE_ERROR_NONE) {
1057 VINE_LOGE("Fail to vine_disc_create");
1058 vine_service_destroy(service);
1062 ret = vine_disc_publish(mSdPub,
1064 _service_published_cb, static_cast<void *>(this),
1066 if (ret != VINE_ERROR_NONE) {
1067 vine_disc_destroy(mSdPub);
1071 vine_service_destroy(service);
1073 VINE_LOGD("Publish %s:%d with rank %d", mTopic.c_str(), mListenPort, mRank);
1077 int DPPubSub::subscribe_service()
1081 if (mSdSub == NULL) {
1082 ret = vine_disc_create(VINE_DISCOVERY_METHOD_DNS_SD, &mSdSub);
1083 RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to vine_disc_create");
1086 ret = vine_disc_subscribe(mSdSub,
1087 mTopic.c_str(), NULL,
1088 _service_discovered_cb, static_cast<void *>(this),
1090 if (ret != VINE_ERROR_NONE) {
1091 vine_disc_destroy(mSdSub);
1098 int DPPubSub::create_rank()
1101 return rand() % VINE_DP_PUBSUB_RANK_MAX;
1104 int DPPubSub::open(vine_dp_opened_cb callback, void *user_data)
1106 RET_ERR_IF_DP_OPEN_STATE_ISNT_IDLE(mOpenState);
1108 mOpenState = VINE_DP_OPEN_STATE_WAIT;
1109 mOpenedCb = callback;
1110 mOpenedCbData = user_data;
1112 char service_name[VINE_MAX_SERVICE_NAME_LEN + 1] = {0 , };
1113 create_id(service_name);
1114 set_id(service_name);
1116 int ret = vine_data_path_open(mAddrFamily, mListenPort,
1117 mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL,
1118 mMaxConnNum, mSecurity, mId.c_str(),
1119 _pubsub_opened_cb, static_cast<void *>(this),
1120 _pubsub_accepted_cb, static_cast<void *>(this),
1121 &mServerDataPath, mEventQueue);
1122 if (ret != VINE_ERROR_NONE) {
1124 mOpenedCbData = NULL;
1125 mOpenState = VINE_DP_OPEN_STATE_NONE;
1129 return VINE_ERROR_NONE;
1132 void DPPubSub::close()
1134 vine_disc_stop_publish(mSdPub);
1135 vine_disc_stop_subscribe(mSdSub);
1137 vine_disc_destroy(mSdSub);
1139 vine_disc_destroy(mSdPub);
1142 clear_joined_peer();
1143 vine_data_path_close(mServerDataPath);
1144 vine_data_path_destroy(mServerDataPath);
1145 mServerDataPath = NULL;
1147 mOpenState = VINE_DP_OPEN_STATE_NONE;
1150 int DPPubSub::send(unsigned char *buf, size_t len)
1152 int ret = VINE_ERROR_NONE;
1154 for (auto &peer : mDataPathList) {
1157 ret = vine_data_path_write(peer.second, buf, len);
1158 if (ret != VINE_ERROR_NONE) {
1159 VINE_LOGE("fail to write to a peer[%p] ", peer.second);
1167 int DPPubSub::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
1169 auto &dp_info = mRecvDataPathList.front();
1171 int ret = vine_data_path_read(dp_info.first, buf, buf_len, &bytes);
1172 if (ret != VINE_ERROR_NONE)
1175 dp_info.second -= bytes;
1176 VINE_LOGD("%zd bytes remained", dp_info.second);
1179 if (dp_info.second == 0)
1180 mRecvDataPathList.pop();
1182 return VINE_ERROR_NONE;
1185 bool DPPubSub::is_joined_peer(const char *service_name, const char *ip)
1187 if (mId.compare(service_name) == 0) {
1188 VINE_LOGD("It's me!");
1193 return (mDataPathList.count(string(service_name)) > 0);
1196 int DPPubSub::get_joined_peer()
1198 return mDataPathList.size();
1201 const char *DPPubSub::get_joined_peer_id(vine_data_path_h datapath)
1206 DPMap::iterator found = std::find_if(mDataPathList.begin(),
1207 mDataPathList.end(),
1208 [datapath](std::pair<std::string, vine_data_path_h> const& item)
1210 VINE_LOGD("item.second[%p]", item.second);
1211 return (item.second == datapath);
1214 if (found == mDataPathList.end()) {
1215 VINE_LOGE("Cannot find the datapath[%p].", datapath);
1219 return found->first.c_str();
1222 void DPPubSub::add_joined_peer(const char *service_name, vine_data_path_h datapath)
1224 if (!service_name || !datapath)
1227 VINE_LOGD("%s, %p is added.", service_name, datapath);
1228 mDataPathList.insert(std::make_pair(string(service_name), datapath));
1231 void DPPubSub::del_joined_peer(vine_data_path_h datapath)
1236 DPMap::iterator found = std::find_if(mDataPathList.begin(),
1237 mDataPathList.end(),
1238 [datapath](std::pair<std::string, vine_data_path_h> const& item)
1240 VINE_LOGD("item.second[%p]", item.second);
1241 return (item.second == datapath);
1244 if (found == mDataPathList.end()) {
1245 VINE_LOGE("Cannot find the datapath[%p].", datapath);
1249 VINE_LOGD("datapath[%p] is deleted from list.", datapath);
1250 mDataPathList.erase(found);
1253 void DPPubSub::clear_joined_peer()
1255 for (auto &peer : mDataPathList) {
1258 vine_data_path_close(peer.second);
1259 vine_data_path_destroy(peer.second);
1262 mDataPathList.clear();
1265 void DPPubSub::noti_received_peer(vine_data_path_h datapath, size_t bytes)
1267 mRecvDataPathList.push(std::make_pair(datapath, bytes));
1271 static int _get_last_two_octets(string ip)
1273 size_t pos = ip.find_last_of(':');
1274 string sub = ip.substr(pos + 1);
1276 if (sub.size() == 0) {
1277 VINE_LOGE("invalid IP.");
1281 return std::stoi(sub, NULL, 16);
1285 // The smaller the last octet, the higher the priority.
1286 // 1: peers priority is higher than me
1287 // 0: same or failure
1288 // -1: peers priority is lower than me
1289 int DPPubSub::compare_ip_priority(const char *peer_ip)
1291 std::string peer_ip_str(peer_ip);
1292 size_t pos = peer_ip_str.find_last_of('.');
1293 std::string subnet = peer_ip_str.substr(0, pos);
1294 std::string found_ip;
1295 int last = 0, peer_last = 0;
1297 if (subnet.size() == 0)
1300 for (auto &ip : mIpList) {
1301 if (ip.find(subnet) != 0)
1308 VINE_LOGD("peer_ip[%s] found_ip[%s] subnet[%s]", peer_ip, found_ip.c_str(), subnet.c_str());
1310 last = std::stoi(found_ip.substr(pos + 1));
1311 peer_last = std::stoi(peer_ip_str.substr(pos + 1));
1313 VINE_LOGD("Failed to convert last octet. IP is invalid.");
1316 if (last == peer_last)
1318 return (last < peer_last ? 1 : -1);
1321 // If true is return, it will connect to a peer.
1322 bool DPPubSub::check_if_connect(const char *peer_rank,
1323 vine_address_family_e ip_type, const char *peer_ip, int peer_port)
1325 // The smaller the rank value, the higher the priority.
1326 // Connect to a peer when both 1 and 2 are satisfied.
1327 // 1. peer has rank key and value
1328 // 2. the rank value is higher than peers value
1329 int prank = std::stoi(peer_rank);
1330 VINE_LOGD("ip_type[%d] rank[%d], Peer rank[%s/%d] peer_ip[%s] peer_port[%d]",
1331 ip_type, mRank, peer_rank, prank, peer_ip, peer_port);
1334 else if (mRank < prank)
1337 // If rank is the same, Need to check the IP address
1339 if (ip_type == VINE_ADDRESS_FAMILY_IPV4) {
1340 is_higher = compare_ip_priority(peer_ip);
1342 // TODO : handle IPv6 address
1348 // If last value of IP is the same, Need to check the port
1349 if (mListenPort > peer_port)
1355 int _vine_dp_create(vine_session_h session, vine_dp_type_e type, vine_dp_h *dp)
1357 RET_VAL_IF(session == NULL, VINE_ERROR_INVALID_PARAMETER, "session is null.");
1358 RET_VAL_IF(type >= VINE_DP_TYPE_UNKNOWN, VINE_ERROR_INVALID_PARAMETER, "invalid type");
1359 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1361 vine_event_queue_h eq = NULL;
1362 int ret = _vine_session_get_event_queue(session, &eq);
1364 if (ret != VINE_ERROR_NONE || !eq)
1365 return VINE_ERROR_INVALID_PARAMETER;
1367 if (type == VINE_DP_TYPE_SERVER) {
1368 *dp = new DPServer((void *)eq);
1369 } else if (type == VINE_DP_TYPE_CLIENT) {
1370 *dp = new DPClient((void *)eq);
1371 } else if (type == VINE_DP_TYPE_PUBSUB) {
1372 *dp = new DPPubSub((void *)eq);
1375 return VINE_ERROR_NONE;
1378 int _vine_dp_destroy(vine_dp_h dp)
1380 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1382 DataPath *_dp = static_cast<DataPath *>(dp);
1385 return VINE_ERROR_NONE;
1388 int _vine_dp_get_id(vine_dp_h dp, char **id)
1390 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1391 RET_VAL_IF(id == NULL, VINE_ERROR_INVALID_PARAMETER, "id is null.");
1393 DataPath *_dp = static_cast<DataPath *>(dp);
1394 return _dp->get_id(id);
1397 int _vine_dp_set_iface_name(vine_dp_h dp, const char *iface_name)
1399 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1400 RET_VAL_IF(iface_name == NULL, VINE_ERROR_INVALID_PARAMETER, "iface_name is null.");
1402 DataPath *_dp = static_cast<DataPath *>(dp);
1403 return _dp->set_iface_name(iface_name);
1406 int _vine_dp_set_addr_family(vine_dp_h dp, vine_address_family_e addr_family)
1408 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1409 RET_VAL_IF(addr_family < VINE_ADDRESS_FAMILY_DEFAULT
1410 || addr_family > VINE_ADDRESS_FAMILY_IPV6,
1411 VINE_ERROR_INVALID_PARAMETER, "addr_family is invalid.");
1413 DataPath *_dp = static_cast<DataPath *>(dp);
1414 return _dp->set_addr_family(addr_family);
1417 int _vine_dp_set_remote_ip(vine_dp_h dp, vine_address_family_e addr_family, const char *ip)
1419 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1420 RET_VAL_IF(ip == NULL, VINE_ERROR_INVALID_PARAMETER, "ip is null.");
1422 DataPath *_dp = static_cast<DataPath *>(dp);
1423 return _dp->set_remote_ip(addr_family, ip);
1426 int _vine_dp_get_remote_ip(vine_dp_h dp, vine_address_family_e *addr_family, char **ip)
1428 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1429 RET_VAL_IF(addr_family == NULL, VINE_ERROR_INVALID_PARAMETER, "addr_family is null.");
1430 RET_VAL_IF(ip == NULL, VINE_ERROR_INVALID_PARAMETER, "ip is null.");
1432 DataPath *_dp = static_cast<DataPath *>(dp);
1433 return _dp->get_remote_ip(addr_family, ip);
1436 int _vine_dp_set_remote_port(vine_dp_h dp, int port)
1438 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1440 DataPath *_dp = static_cast<DataPath *>(dp);
1441 return _dp->set_remote_port(port);
1444 int _vine_dp_get_remote_port(vine_dp_h dp, int *port)
1446 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1447 RET_VAL_IF(port == NULL, VINE_ERROR_INVALID_PARAMETER, "port is null.");
1449 DataPath *_dp = static_cast<DataPath *>(dp);
1450 int remote_port = _dp->get_remote_port();
1451 if (remote_port < 0)
1452 return VINE_ERROR_INVALID_OPERATION;
1453 *port = remote_port;
1454 return VINE_ERROR_NONE;
1457 int _vine_dp_get_ip(vine_dp_h dp, vine_address_family_e *addr_family, char **ip)
1459 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1460 RET_VAL_IF(addr_family == NULL, VINE_ERROR_INVALID_PARAMETER, "addr_family is null.");
1461 RET_VAL_IF(ip == NULL, VINE_ERROR_INVALID_PARAMETER, "ip is null.");
1463 DataPath *_dp = static_cast<DataPath *>(dp);
1464 return _dp->get_ip(addr_family, ip);
1467 int _vine_dp_set_port(vine_dp_h dp, int port)
1469 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1471 DataPath *_dp = static_cast<DataPath *>(dp);
1472 return _dp->set_port(port);
1475 int _vine_dp_get_port(vine_dp_h dp, int *port)
1477 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1478 RET_VAL_IF(port == NULL, VINE_ERROR_INVALID_PARAMETER, "port is null.");
1480 DataPath *_dp = static_cast<DataPath *>(dp);
1481 *port = _dp->get_port();
1482 return VINE_ERROR_NONE;
1485 int _vine_dp_set_topic(vine_dp_h dp, const char *topic)
1487 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1488 RET_VAL_IF(topic == NULL, VINE_ERROR_INVALID_PARAMETER, "topic is null.");
1489 RET_VAL_IF(_check_topic_len(topic) == false,
1490 VINE_ERROR_INVALID_PARAMETER, "invalid length of topic.");
1492 DataPath *_dp = static_cast<DataPath *>(dp);
1493 return _dp->set_topic(topic);
1496 int _vine_dp_set_max_connections(vine_dp_h dp, int max_conn)
1498 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1500 DataPath *_dp = static_cast<DataPath *>(dp);
1501 return _dp->set_max_connections(max_conn);
1504 int _vine_dp_set_security(vine_dp_h dp, vine_security_h security)
1506 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1507 RET_VAL_IF(security == NULL, VINE_ERROR_INVALID_PARAMETER, "security is null.");
1509 DataPath *_dp = static_cast<DataPath *>(dp);
1510 return _dp->set_security(security);
1513 int _vine_dp_set_accepted_cb(vine_dp_h dp, vine_dp_accepted_cb callback, void *user_data)
1515 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1516 RET_VAL_IF(callback == NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1518 DataPath *_dp = static_cast<DataPath *>(dp);
1519 return _dp->set_accepted_cb(callback, user_data);
1522 int _vine_dp_unset_accepted_cb(vine_dp_h dp)
1524 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1526 DataPath *_dp = static_cast<DataPath *>(dp);
1527 return _dp->unset_accepted_cb();
1530 int _vine_dp_set_terminated_cb(vine_dp_h dp, vine_dp_terminated_cb callback, void *user_data)
1532 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1533 RET_VAL_IF(callback == NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1535 DataPath *_dp = static_cast<DataPath *>(dp);
1536 return _dp->set_terminated_cb(callback, user_data);
1539 int _vine_dp_unset_terminated_cb(vine_dp_h dp)
1541 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1543 DataPath *_dp = static_cast<DataPath *>(dp);
1544 return _dp->unset_terminated_cb();
1547 int _vine_dp_open(vine_dp_h dp, vine_dp_opened_cb callback, void *user_data)
1549 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1550 RET_VAL_IF(callback == NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1552 DataPath *_dp = static_cast<DataPath *>(dp);
1553 return _dp->open(callback, user_data);
1556 int _vine_dp_close(vine_dp_h dp)
1558 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1560 DataPath *_dp = static_cast<DataPath *>(dp);
1562 return VINE_ERROR_NONE;
1565 int _vine_dp_send(vine_dp_h dp, unsigned char *buf, size_t len)
1567 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1568 RET_VAL_IF(buf == NULL, VINE_ERROR_INVALID_PARAMETER, "buf is null.");
1570 DataPath *_dp = static_cast<DataPath *>(dp);
1571 return _dp->send(buf, len);
1574 int _vine_dp_recv(vine_dp_h dp, unsigned char *buf, size_t buf_len, size_t *read_len)
1576 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1577 RET_VAL_IF(buf == NULL, VINE_ERROR_INVALID_PARAMETER, "buf is null.");
1578 RET_VAL_IF(read_len == NULL, VINE_ERROR_INVALID_PARAMETER, "read_len is null.");
1580 DataPath *_dp = static_cast<DataPath *>(dp);
1581 return _dp->recv(buf, buf_len, read_len);
1584 int _vine_dp_set_received_cb(vine_dp_h dp, vine_dp_received_cb callback, void *user_data)
1586 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1587 RET_VAL_IF(callback== NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1589 DataPath *_dp = static_cast<DataPath *>(dp);
1590 return _dp->set_received_cb(callback, user_data);
1593 int _vine_dp_unset_received_cb(vine_dp_h dp)
1595 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1597 DataPath *_dp = static_cast<DataPath *>(dp);
1598 return _dp->unset_received_cb();
1601 int _vine_dp_set_peer_joined_cb(vine_dp_h dp, vine_dp_peer_joined_cb callback, void *user_data)
1603 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1604 RET_VAL_IF(callback== NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1606 DataPath *_dp = static_cast<DataPath *>(dp);
1607 return _dp->set_peer_joined_cb(callback, user_data);
1610 int _vine_dp_unset_peer_joined_cb(vine_dp_h dp)
1612 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1614 DataPath *_dp = static_cast<DataPath *>(dp);
1615 return _dp->unset_peer_joined_cb();
1618 int _vine_dp_set_peer_left_cb(vine_dp_h dp, vine_dp_peer_left_cb callback, void *user_data)
1620 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1621 RET_VAL_IF(callback== NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1623 DataPath *_dp = static_cast<DataPath *>(dp);
1624 return _dp->set_peer_left_cb(callback, user_data);
1627 int _vine_dp_unset_peer_left_cb(vine_dp_h dp)
1629 RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1631 DataPath *_dp = static_cast<DataPath *>(dp);
1632 return _dp->unset_peer_left_cb();