Allow to set event loop type with api
[platform/core/api/vine.git] / src / vine-dp.cpp
1 /*
2  * Copyright (c) 2021 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License")
5 {
6 }
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18
19 #include "vine-dp.h"
20
21 #include "vine-constants.h"
22 #include "vine-data-path.h"
23 #include "vine-event-loop.h"
24 #include "vine-log.h"
25 #include "vine-session.h"
26 #include "vine-security.h"
27 #include "vine-utils.h"
28
29 #include <algorithm>
30 #include <string>
31 #include <utility>
32 #include <arpa/inet.h>
33
34 using namespace vine;
35
36 #define RET_ERR_IF_DP_OPEN_STATE_ISNT_IDLE(state) \
37         do { \
38         if (state == VINE_DP_OPEN_STATE_WAIT) { \
39                         VINE_LOGE("Ignore duplicate request."); \
40             return VINE_ERROR_NOW_IN_PROGRESS; \
41         } else if (state == VINE_DP_OPEN_STATE_DONE) { \
42                         VINE_LOGE("Already opened."); \
43                         return VINE_ERROR_INVALID_OPERATION; \
44                 } \
45     } while (0)
46
47 extern vine_dp_plugin_fn g_dp_plugin_fn;
48
49 static bool _check_if_valid_ip(vine_address_family_e addr_family, const char *ip)
50 {
51         int ret = 0;
52         unsigned char buf[sizeof(struct in6_addr)];
53
54         if (addr_family == VINE_ADDRESS_FAMILY_IPV4) {
55                 ret = inet_pton(AF_INET, ip, buf);
56         } else if (addr_family == VINE_ADDRESS_FAMILY_IPV6) {
57                 ret = inet_pton(AF_INET6, ip, buf);
58         } else {
59                 ret |= inet_pton(AF_INET, ip, buf);
60                 ret |= inet_pton(AF_INET6, ip, buf);
61         }
62
63         return (ret == 1);
64 }
65
66 static bool _check_topic_len(const char * topic)
67 {
68         RET_VAL_IF(topic == NULL, false, "topic is NULL");
69         int len = strlen(topic);
70         return len > 0 && len <= VINE_MAX_TOPIC_LEN;
71 }
72
73 static void _received_cb(vine_data_path_h datapath, size_t received_len, void *user_data)
74 {
75         if (!datapath || !user_data)
76                 return;
77
78         static_cast<DataPath *>(user_data)->invoke_received_cb(received_len);
79 }
80
81 static void _terminated_cb(vine_data_path_h datapath, void *user_data)
82 {
83         if (!datapath || !user_data)
84                 return;
85
86         static_cast<DataPath *>(user_data)->invoke_terminated_cb();
87 }
88
89 static void _opened_cb(vine_data_path_h datapath, int result, int port, void *userdata)
90 {
91         DataPath *dp = static_cast<DataPath *>(userdata);
92         dp->set_port(port);
93         dp->update_local_address_info();
94
95         VINE_LOGI("port[%d] result[%d]", port, result);
96         dp->invoke_opened_cb((vine_error_e)result);
97 }
98
99 static void _accepted_cb(vine_data_path_h datapath, void *user_data)
100 {
101         if (!datapath || !user_data)
102                 return;
103
104         void *event_queue = static_cast<DataPath *>(user_data)->get_eventfd();
105
106         // datapath is created newly. DP class should be needed for it.
107         // event_queue is the same as corresponding DPServer.
108         DPClient *connected_client_dp = new DPClient(event_queue, datapath);
109         vine_data_path_set_received_cb(datapath,
110                         _received_cb, static_cast<void *>(connected_client_dp));
111         vine_data_path_set_terminated_cb(datapath,
112                         _terminated_cb, static_cast<void *>(connected_client_dp));
113
114         connected_client_dp->update_local_address_info();
115         static_cast<DPServer *>(user_data)->invoke_accepted_cb(connected_client_dp);
116 }
117
118 static void _connected_cb(vine_data_path_h datapath, int result, void *user_data)
119 {
120         if (result == 0) {
121                 vine_data_path_set_received_cb(datapath, _received_cb, user_data);
122                 vine_data_path_set_terminated_cb(datapath, _terminated_cb, user_data);
123         }
124
125         DataPath *dp = static_cast<DataPath *>(user_data);
126         dp->update_local_address_info();
127         dp->invoke_opened_cb(result);
128 }
129
130 static void _pubsub_received_cb(vine_data_path_h datapath, size_t received_len, void *user_data)
131 {
132         if (!datapath || !user_data || received_len == 0)
133                 return;
134
135         VINE_LOGD("receive %zd bytes from datapath[%p]", received_len, datapath);
136
137         static_cast<DPPubSub *>(user_data)->noti_received_peer(datapath, received_len);
138         static_cast<DataPath *>(user_data)->invoke_received_cb(received_len);
139 }
140
141 static void _pubsub_terminated_cb(vine_data_path_h datapath, void *user_data)
142 {
143         if (!datapath || !user_data)
144                 return;
145
146         VINE_LOGD("datapath[%p] is terminated by peer.", datapath);
147         DPPubSub *dp =  static_cast<DPPubSub *>(user_data);
148
149         char *peer_id = STRDUP(dp->get_joined_peer_id(datapath));
150         dp->del_joined_peer(datapath);
151         if (peer_id) {
152                 dp->invoke_peer_left_cb(peer_id);
153                 free(peer_id);
154         }
155         vine_data_path_destroy(datapath);
156 }
157
158 static void _pubsub_opened_cb(vine_data_path_h datapath, int result, int port, void *userdata)
159 {
160         if (!userdata)
161                 return;
162
163         DPPubSub *dp = static_cast<DPPubSub *>(userdata);
164         dp->set_port(port);
165
166         VINE_LOGI("port[%d] result[%d]", port, result);
167
168         // Notify user that a listen socket cannot be used anymore.
169         if (result != VINE_ERROR_NONE) {
170                 static_cast<DataPath *>(userdata)->invoke_opened_cb(result);
171                 dp->close();
172                 return;
173         }
174
175         int ret = dp->publish_service();
176         if (ret != VINE_ERROR_NONE) {
177                 dp->close();
178                 static_cast<DataPath *>(userdata)->invoke_opened_cb(ret);
179                 return;
180         }
181 }
182
183 static void _pubsub_accepted_cb(vine_data_path_h datapath, void *user_data)
184 {
185         if (!datapath || !user_data)
186                 return;
187
188         DPPubSub *dp = static_cast<DPPubSub *>(user_data);
189         if (dp->get_max_conns() <= dp->get_joined_peer()) {
190                 VINE_LOGE("The max connection limit is reached. Ignore [%p].", datapath);
191                 vine_data_path_close(datapath);
192                 return;
193         }
194
195         char *token = NULL;
196         if (vine_data_path_get_token(datapath, &token) != VINE_ERROR_NONE) {
197                 VINE_LOGE("Cannot find peer's service name. Ignore[%p]", datapath);
198                 vine_data_path_close(datapath);
199                 vine_data_path_destroy(datapath);
200                 return;
201         }
202
203         dp->add_joined_peer(token, datapath);
204
205         vine_data_path_set_received_cb(datapath, _pubsub_received_cb, user_data);
206         vine_data_path_set_terminated_cb(datapath, _pubsub_terminated_cb, user_data);
207
208         dp->invoke_peer_joined_cb(token);
209         free(token);
210 }
211
212 struct pubsub_connect_data {
213         void *dp;
214         const char *service_name;
215 };
216
217 static void _pubsub_connected_cb(vine_data_path_h datapath, int result, void *user_data)
218 {
219         if (!datapath || !user_data || result != 0) {
220                 VINE_LOGE("connect failure.");
221                 return;
222         }
223
224         struct pubsub_connect_data *conn_data = (struct pubsub_connect_data *)user_data;
225         if (!conn_data->service_name) {
226                 VINE_LOGE("service_name is NULL.");
227                 vine_data_path_close(datapath);
228                 vine_data_path_destroy(datapath);
229                 return;
230         }
231
232         DPPubSub *dp = static_cast<DPPubSub *>(conn_data->dp);
233         dp->add_joined_peer(conn_data->service_name, datapath);
234
235         vine_data_path_set_received_cb(datapath, _pubsub_received_cb, conn_data->dp);
236         vine_data_path_set_terminated_cb(datapath, _pubsub_terminated_cb, conn_data->dp);
237
238         dp->invoke_peer_joined_cb(conn_data->service_name);
239         conn_data->dp = NULL;
240         free(conn_data);
241 }
242
243 // for pubsub
244 static int __vine_set_discovered_service(vine_service_h service,
245                 const char *service_type, const char *service_name,
246                 const char *host_name, int port,
247                 const map<string, string> &attr, const char *iface_name)
248 {
249         int ret = VINE_ERROR_NONE;
250         ret = _vine_service_set_type(service, service_type);
251         RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set service type");
252         ret = _vine_service_set_name(service, service_name);
253         RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set service name");
254         ret = _vine_service_set_host_name(service, host_name);
255         RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set host name");
256         ret = _vine_service_set_port(service, port);
257         RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set port");
258
259         for (const auto &kv : attr)
260                 _vine_service_add_attribute(service, kv.first.c_str(), kv.second.c_str());
261
262         ret = _vine_service_set_iface_name(service, iface_name);
263         RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set iface_name");
264
265         return VINE_ERROR_NONE;
266 }
267
268 static void _ip_resolved_cb(vine_disc_h disc, vine_service_h service, bool add,
269                                 const char *ip, vine_address_family_e address_family, void *user_data)
270 {
271         if (!user_data || !add) {
272                 VINE_LOGD("state: %s", add ? "add" : "remove");
273                 return;
274         }
275
276         DPPubSub *dp = static_cast<DPPubSub *>(user_data);
277         int port = _vine_service_get_port(service);
278         if (dp->is_joined_peer(_vine_service_get_name(service), ip)) {
279                 VINE_LOGD("%s:%d was already joined.", ip, port);
280                 return;
281         }
282
283         VINE_LOGD("IP Resolved %s:%d", ip, port);
284
285         vine_address_family_e supported_addr_family = dp->get_addr_family();
286         if (supported_addr_family != VINE_ADDRESS_FAMILY_DEFAULT
287                         && supported_addr_family != address_family) {
288                 VINE_LOGD("address family is dismatched. peer type[%d]", address_family);
289                 return;
290         }
291
292         auto attr = _vine_service_get_attributes(service);
293         auto it = attr.find(VINE_DP_PUBSUB_RANK_KEY);
294         if (it == attr.end()) {
295                 VINE_LOGE("peer doens't have a rank.");
296                 return;
297         }
298
299         if (dp->get_max_conns() > dp->get_joined_peer()
300                         && dp->check_if_connect(it->second.c_str(), address_family, ip, port)) {
301                 const char *service_name = _vine_service_get_name(service);
302                 if (!service_name)
303                         return;
304
305                 VINE_LOGD("Try to connect a peer(%s:%d)", ip, port);
306                 dp->connect(service_name, ip, port);
307         }
308 }
309
310 static void _service_discovered_cb(vine_disc_h disc, bool available,
311                 const char *service_type, const char *service_name,
312                 const char *host_name, int port, const map<string, string> &attr,
313                 const char *iface_name, int more_coming, void *user_data)
314 {
315         VINE_LOGD("%s is discovered. %s",
316                         service_name, available ? "available" : "not available");
317
318         if (!user_data || !available)
319                 return;
320
321         vine_disc_h disc_handle;
322         int ret = vine_disc_create(VINE_DISCOVERY_METHOD_DNS_SD, &disc_handle);
323         RET_IF(ret != VINE_ERROR_NONE, "Fail to create a disc");
324
325         vine_service_h service;
326         ret = _vine_service_create(&service, false);
327         if (ret != VINE_ERROR_NONE) {
328                 vine_disc_destroy(disc_handle);
329                 return;
330         }
331
332         auto it = attr.find(VINE_DP_PUBSUB_RANK_KEY);
333         if (it == attr.end()) {
334                 VINE_LOGE("peer doens't have a rank.");
335                 _vine_service_destroy(service);
336                 vine_disc_destroy(disc_handle);
337                 return;
338         }
339
340         ret = _vine_service_set_disc_handle(service, disc_handle);
341         if (ret != VINE_ERROR_NONE) {
342                 VINE_LOGE("Fail to set disc_handle. error(%d)", ret);
343                 _vine_service_destroy(service);
344                 vine_disc_destroy(disc_handle);
345                 return;
346         }
347
348         ret = __vine_set_discovered_service(service,
349                         service_type, service_name, host_name, port, attr, iface_name);
350         if (ret != VINE_ERROR_NONE) {
351                 VINE_LOGE("Fail to set a service. error(%d)", ret);
352                 _vine_service_destroy(service);
353                 vine_disc_destroy(disc_handle);
354                 return;
355         }
356
357         ret = vine_disc_resolve_ip(disc_handle, service,
358                         _ip_resolved_cb, user_data,
359                         (vine_event_queue_h)static_cast<DataPath *>(user_data)->get_eventfd());
360         if (ret != VINE_ERROR_NONE) {
361                 VINE_LOGE("Fail to resolve IP. error(%d)", ret);
362                 _vine_service_destroy(service);
363                 vine_disc_destroy(disc_handle);
364                 return;
365         }
366 }
367
368 static void _service_published_cb(vine_disc_h disc,
369                                 const char *service_name, vine_error_e error, void *user_data)
370 {
371         VINE_LOGD("%s publish request %s.",
372                         service_name, error == VINE_ERROR_NONE ? "succeed" : "failed");
373
374         if (!user_data)
375                 return;
376
377         DPPubSub *dp = static_cast<DPPubSub *>(user_data);
378         dp->set_id(service_name);
379         int ret = dp->subscribe_service();
380         if (ret != VINE_ERROR_NONE)
381                 dp->close();
382
383         static_cast<DataPath *>(user_data)->invoke_opened_cb(ret);
384 }
385
386 int DataPath::set_security(void *security)
387 {
388         return _vine_security_clone(&mSecurity, security);
389 }
390
391 int DataPath::set_iface_name(const std::string &iface_name)
392 {
393         mIfaceName = iface_name;
394         return VINE_ERROR_NONE;
395 }
396
397 void DataPath::invoke_opened_cb(int result)
398 {
399         if (mOpenedCb)
400                 mOpenedCb(static_cast<void *>(this), (vine_error_e)result, mOpenedCbData);
401
402         // called only once.
403         mOpenedCb = NULL;
404         mOpenedCbData = NULL;
405         mOpenState = VINE_DP_OPEN_STATE_DONE;
406 }
407
408 int DataPath::set_received_cb(vine_dp_received_cb callback, void *user_data)
409 {
410         mReceivedCb = callback;
411         mReceivedCbData = user_data;
412         return VINE_ERROR_NONE;
413 }
414
415 int DataPath::unset_received_cb()
416 {
417         mReceivedCb = NULL;
418         mReceivedCbData = NULL;
419         return VINE_ERROR_NONE;
420 }
421
422 void DataPath::invoke_received_cb(size_t received_len)
423 {
424         if (mReceivedCb)
425                 mReceivedCb(static_cast<void *>(this), received_len, mReceivedCbData);
426 }
427
428 int DataPath::set_terminated_cb(vine_dp_terminated_cb callback, void *user_data)
429 {
430         mTerminatedCb = callback;
431         mTerminatedCbData = user_data;
432         return VINE_ERROR_NONE;
433 }
434
435 int DataPath::unset_terminated_cb()
436 {
437         mTerminatedCb = NULL;
438         mTerminatedCbData = NULL;
439         return VINE_ERROR_NONE;
440 }
441
442 void DataPath::invoke_terminated_cb()
443 {
444         mOpenState = VINE_DP_OPEN_STATE_NONE;
445         if (mTerminatedCb)
446                 mTerminatedCb(static_cast<void *>(this), mTerminatedCbData);
447 }
448
449 DPServer::DPServer(void *event_queue)
450 {
451         VINE_LOGD("DPServer[%p] is created.", this);
452         mEventQueue = event_queue;
453         mSecurity = NULL;
454         mAddrFamily = VINE_ADDRESS_FAMILY_DEFAULT;
455         mIfaceName = "";
456         mOpenState = VINE_DP_OPEN_STATE_NONE;
457         mDataPath = NULL;
458         mListenPort = 0;
459         mMaxConnNum = VINE_DP_DEFAULT_CONNECTIONS_NUM;
460         mAcceptedCb = NULL;
461         mAcceptedCbData = NULL;
462         mReceivedCb = NULL;
463         mReceivedCbData = NULL;
464         mOpenedCb = NULL;
465         mOpenedCbData = NULL;
466         mTerminatedCb = NULL;
467         mTerminatedCbData = NULL;
468 }
469
470 DPServer::~DPServer()
471 {
472         VINE_LOGD("DPServer[%p] is deleted.", this);
473         _vine_security_destroy(mSecurity);
474         vine_data_path_destroy(mDataPath);
475 }
476
477 int DPServer::get_id(char **id)
478 {
479         return VINE_ERROR_INVALID_OPERATION;
480 }
481
482 int DPServer::set_addr_family(vine_address_family_e addr_family)
483 {
484         mAddrFamily = addr_family;
485         return VINE_ERROR_NONE;
486 }
487
488 int DPServer::set_remote_ip(vine_address_family_e addr_family, const std::string &ip)
489 {
490         return VINE_ERROR_INVALID_OPERATION;
491 }
492
493 int DPServer::get_remote_ip(vine_address_family_e *addr_family, char **ip)
494 {
495         return VINE_ERROR_INVALID_OPERATION;
496 }
497
498 int DPServer::set_remote_port(int port)
499 {
500         return VINE_ERROR_INVALID_OPERATION;
501 }
502
503 int DPServer::get_remote_port()
504 {
505         return -1;
506 }
507
508 int DPServer::set_port(int port)
509 {
510         if (port < 0 || port > 65535)
511                 return VINE_ERROR_INVALID_PARAMETER;
512
513         mListenPort = port;
514
515         return VINE_ERROR_NONE;
516 }
517
518 int DPServer::get_ip(vine_address_family_e *addr_family, char **ip)
519 {
520         *addr_family = mAddrFamily;
521         *ip = STRDUP(mLocalIp.c_str());
522         return VINE_ERROR_NONE;
523 }
524
525 int DPServer::update_local_address_info()
526 {
527         char *ip;
528         int addr_family;
529         int port;
530         int ret = vine_data_path_get_local_address_info(mDataPath, &addr_family, &ip, &port);
531         if (ret != VINE_ERROR_NONE)
532                 return ret;
533
534         mLocalIp = string(ip);
535         free(ip);
536
537         return VINE_ERROR_NONE;
538 }
539
540 int DPServer::set_topic(std::string topic)
541 {
542         return VINE_ERROR_INVALID_OPERATION;
543 }
544
545 int DPServer::set_max_connections(int max_conn)
546 {
547         if (max_conn < 1 || max_conn > VINE_DP_MAX_CONNECTIONS_NUM)
548                 return VINE_ERROR_INVALID_PARAMETER;
549
550         mMaxConnNum = max_conn;
551         return VINE_ERROR_NONE;
552 }
553
554 int DPServer::set_accepted_cb(vine_dp_accepted_cb callback, void *user_data)
555 {
556         mAcceptedCb = callback;
557         mAcceptedCbData = user_data;
558         return VINE_ERROR_NONE;
559 }
560
561 int DPServer::unset_accepted_cb()
562 {
563         mAcceptedCb = NULL;
564         mAcceptedCbData = NULL;
565         return VINE_ERROR_NONE;
566 }
567
568 int DPServer::set_peer_joined_cb(vine_dp_peer_joined_cb callback, void *user_data)
569 {
570         return VINE_ERROR_INVALID_OPERATION;
571 }
572
573 int DPServer::unset_peer_joined_cb()
574 {
575         return VINE_ERROR_INVALID_OPERATION;
576 }
577
578 int DPServer::set_peer_left_cb(vine_dp_peer_left_cb callback, void *user_data)
579 {
580         return VINE_ERROR_INVALID_OPERATION;
581 }
582
583 int DPServer::unset_peer_left_cb()
584 {
585         return VINE_ERROR_INVALID_OPERATION;
586 }
587
588 void DPServer::invoke_accepted_cb(vine_dp_h dp)
589 {
590         if (mAcceptedCb)
591                 mAcceptedCb(static_cast<void *>(this), dp, mAcceptedCbData);
592 }
593
594 int DPServer::open(vine_dp_opened_cb callback, void *user_data)
595 {
596         RET_ERR_IF_DP_OPEN_STATE_ISNT_IDLE(mOpenState);
597
598         mOpenedCb = callback;
599         mOpenedCbData = user_data;
600         mOpenState = VINE_DP_OPEN_STATE_WAIT;
601
602         int ret = vine_data_path_open(mAddrFamily, mListenPort,
603                         mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL,
604                         mMaxConnNum, mSecurity, NULL,
605                         _opened_cb, static_cast<void *>(this),
606                         _accepted_cb, static_cast<void *>(this),
607                         &mDataPath, mEventQueue);
608         if (ret != VINE_ERROR_NONE)
609                 mOpenState = VINE_DP_OPEN_STATE_NONE;
610
611         return ret;
612 }
613
614 void DPServer::close()
615 {
616         mOpenState = VINE_DP_OPEN_STATE_NONE;
617         vine_data_path_close(mDataPath);
618 }
619
620 int DPServer::send(unsigned char *buf, size_t len)
621 {
622         return vine_data_path_write(mDataPath, buf, len);
623 }
624
625 int DPServer::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
626 {
627         return vine_data_path_read(mDataPath, buf, buf_len, read_len);
628 }
629
630 DPClient::DPClient(void *event_queue)
631 {
632         VINE_LOGD("DPClient[%p] is created.", this);
633         mEventQueue = event_queue;
634         mSecurity = NULL;
635         mAddrFamily = VINE_ADDRESS_FAMILY_DEFAULT;
636         mIfaceName = "";
637         mDataPath = NULL;
638         mPeerIp = "";
639         mPeerPort = 0;
640         mPort = 0;
641         mReceivedCb = NULL;
642         mReceivedCbData = NULL;
643         mOpenedCb = NULL;
644         mOpenedCbData = NULL;
645         mTerminatedCb = NULL;
646         mTerminatedCbData = NULL;
647
648         isCreatedByServerDp = false;
649 }
650
651 DPClient::DPClient(void *event_queue, void *datapath)
652 {
653         VINE_LOGD("DPClient[%p] is created with datapath[%p]", this, datapath);
654         mEventQueue = event_queue;
655         mSecurity = NULL;
656         mOpenState = VINE_DP_OPEN_STATE_NONE;
657         mDataPath = datapath;
658         mAddrFamily = vine_data_path_get_addr_family(mDataPath);
659         mPeerIp = vine_data_path_get_ip(mDataPath);
660         mPeerPort = vine_data_path_get_port(mDataPath);
661         mPort = 0;
662         mReceivedCb = NULL;
663         mReceivedCbData = NULL;
664         mOpenedCb = NULL;
665         mOpenedCbData = NULL;
666         mTerminatedCb = NULL;
667         mTerminatedCbData = NULL;
668
669         isCreatedByServerDp = true;
670 }
671
672 DPClient::~DPClient()
673 {
674         VINE_LOGD("DPClient[%p] is deleted.", this);
675         _vine_security_destroy(mSecurity);
676         vine_data_path_destroy(mDataPath);
677 }
678
679 int DPClient::get_id(char **id)
680 {
681         return VINE_ERROR_INVALID_OPERATION;
682 }
683
684 int DPClient::set_addr_family(vine_address_family_e addr_family)
685 {
686         return VINE_ERROR_INVALID_OPERATION;
687 }
688
689 int DPClient::set_remote_ip(vine_address_family_e addr_family, const std::string &ip)
690 {
691         RET_VAL_IF(isCreatedByServerDp, VINE_ERROR_INVALID_OPERATION, "cannot set remote IP");
692         if (!_check_if_valid_ip(addr_family, ip.c_str()))
693                 return VINE_ERROR_INVALID_PARAMETER;
694
695         mPeerIp = ip;
696         mAddrFamily = addr_family;
697
698         return VINE_ERROR_NONE;
699 }
700
701 int DPClient::get_remote_ip(vine_address_family_e *addr_family, char **ip)
702 {
703         *addr_family = mAddrFamily;
704         *ip = STRDUP(mPeerIp.c_str());
705         return VINE_ERROR_NONE;
706 }
707
708 int DPClient::set_remote_port(int port)
709 {
710         RET_VAL_IF(isCreatedByServerDp, VINE_ERROR_INVALID_OPERATION, "cannot set port");
711         if (port <= 0 || port > 65535) // Do not allow 0
712                 return VINE_ERROR_INVALID_PARAMETER;
713
714         mPeerPort = port;
715
716         return VINE_ERROR_NONE;
717 }
718
719 int DPClient::set_port(int port)
720 {
721         return VINE_ERROR_INVALID_OPERATION;
722 }
723
724 int DPClient::get_ip(vine_address_family_e *addr_family, char **ip)
725 {
726         *addr_family = mAddrFamily;
727         *ip = STRDUP(mLocalIp.c_str());
728         return VINE_ERROR_NONE;
729 }
730
731 int DPClient::update_local_address_info()
732 {
733         char *ip;
734         int addr_family;
735         int port;
736         int ret = vine_data_path_get_local_address_info(mDataPath, &addr_family, &ip, &port);
737         if (ret != VINE_ERROR_NONE)
738                 return ret;
739
740         mLocalIp = string(ip);
741         mPort  = port;
742
743         free(ip);
744
745         return VINE_ERROR_NONE;
746 }
747
748 int DPClient::set_topic(std::string topic)
749 {
750         return VINE_ERROR_INVALID_OPERATION;
751 }
752
753 int DPClient::set_accepted_cb(vine_dp_accepted_cb callback, void *user_data)
754 {
755         return VINE_ERROR_INVALID_OPERATION;
756 }
757
758 int DPClient::unset_accepted_cb()
759 {
760         return VINE_ERROR_INVALID_OPERATION;
761 }
762
763 int DPClient::set_peer_joined_cb(vine_dp_peer_joined_cb callback, void *user_data)
764 {
765         return VINE_ERROR_INVALID_OPERATION;
766 }
767
768 int DPClient::unset_peer_joined_cb()
769 {
770         return VINE_ERROR_INVALID_OPERATION;
771 }
772
773 int DPClient::set_peer_left_cb(vine_dp_peer_left_cb callback, void *user_data)
774 {
775         return VINE_ERROR_INVALID_OPERATION;
776 }
777
778 int DPClient::unset_peer_left_cb()
779 {
780         return VINE_ERROR_INVALID_OPERATION;
781 }
782
783 int DPClient::open(vine_dp_opened_cb callback, void *user_data)
784 {
785         RET_VAL_IF(isCreatedByServerDp, VINE_ERROR_INVALID_OPERATION, "cannot open");
786         RET_ERR_IF_DP_OPEN_STATE_ISNT_IDLE(mOpenState);
787
788         mOpenedCb = callback;
789         mOpenedCbData = user_data;
790         mOpenState = VINE_DP_OPEN_STATE_WAIT;
791
792         int ret = vine_data_path_connect(mAddrFamily, mPeerIp.c_str(), mPeerPort,
793                         mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL,
794                         mSecurity, NULL, NULL,
795                         _connected_cb, static_cast<void *>(this), &mDataPath, mEventQueue);
796         if (ret != VINE_ERROR_NONE)
797                 mOpenState = VINE_DP_OPEN_STATE_NONE;
798         return ret;
799 }
800
801 void DPClient::close()
802 {
803         mOpenState = VINE_DP_OPEN_STATE_NONE;
804         vine_data_path_close(mDataPath);
805 }
806
807 int DPClient::send(unsigned char *buf, size_t len)
808 {
809         return vine_data_path_write(mDataPath, buf, len);
810 }
811
812 int DPClient::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
813 {
814         return vine_data_path_read(mDataPath, buf, buf_len, read_len);
815 }
816
817 DPPubSub::DPPubSub(void *event_queue)
818 {
819         VINE_LOGD("DPPubSub[%p] is created.", this);
820         mEventQueue = event_queue;
821         mSecurity = NULL;
822         mIfaceName = "";
823         mOpenState = VINE_DP_OPEN_STATE_NONE;
824         mReceivedCb = NULL;
825         mReceivedCbData = NULL;
826         mOpenedCb = NULL;
827         mOpenedCbData = NULL;
828         mTerminatedCb = NULL;
829         mTerminatedCbData = NULL;
830         mPeerJoinedCb = NULL;
831         mPeerJoinedCbData = NULL;
832         mPeerLeftCb = NULL;
833         mPeerLeftCbData = NULL;
834
835         // Discovery Info
836         mTopic = "";
837         mSdSub = NULL;
838         mSdPub = NULL;
839         mId = "";
840
841         // Network Info
842         mAddrFamily = VINE_ADDRESS_FAMILY_DEFAULT;
843         mListenPort = 0;
844         mMaxConnNum = VINE_DP_DEFAULT_CONNECTIONS_NUM;
845         mServerDataPath = NULL;
846         mRank = 0;
847 }
848
849 DPPubSub::~DPPubSub()
850 {
851         VINE_LOGD("DPPubSub[%p] is deleted.", this);
852         _vine_security_destroy(mSecurity);
853         close();
854 }
855
856 void DPPubSub::set_id(const char *id)
857 {
858         if (id == NULL || mId.compare(id) == 0)
859                 return;
860
861         VINE_LOGD("Id is changed %s -> %s", mId.c_str(), id);
862         mId = id;
863 }
864
865 int DPPubSub::get_id(char **id)
866 {
867         *id = STRDUP(mId.c_str());
868         return VINE_ERROR_NONE;
869 }
870
871 int DPPubSub::set_addr_family(vine_address_family_e addr_family)
872 {
873         mAddrFamily = addr_family;
874         return VINE_ERROR_NONE;
875 }
876
877 int DPPubSub::set_remote_ip(vine_address_family_e addr_family, const std::string &ip)
878 {
879         return VINE_ERROR_INVALID_OPERATION;
880 }
881
882 int DPPubSub::get_remote_ip(vine_address_family_e *addr_family, char **ip)
883 {
884         return VINE_ERROR_INVALID_OPERATION;
885 }
886
887 int DPPubSub::set_remote_port(int port)
888 {
889         return VINE_ERROR_INVALID_OPERATION;
890 }
891
892 int DPPubSub::get_remote_port()
893 {
894         return -1;
895 }
896
897 int DPPubSub::set_port(int port)
898 {
899         if (port < 0 || port > 65535)
900                 return VINE_ERROR_INVALID_PARAMETER;
901
902         mListenPort = port;
903
904         return VINE_ERROR_NONE;
905 }
906
907 int DPPubSub::get_ip(vine_address_family_e *addr_family, char **ip)
908 {
909         return VINE_ERROR_INVALID_OPERATION;
910 }
911
912 int DPPubSub::update_local_address_info()
913 {
914         char *ip;
915         int addr_family;
916         int port;
917         int ret = vine_data_path_get_local_address_info(mServerDataPath, &addr_family, &ip, &port);
918         if (ret != VINE_ERROR_NONE)
919                 return ret;
920
921         mLocalIp = string(ip);
922         free(ip);
923
924         return VINE_ERROR_NONE;
925 }
926
927 int DPPubSub::set_topic(std::string topic)
928 {
929         mTopic = topic;
930         return VINE_ERROR_NONE;
931 }
932
933 int DPPubSub::set_max_connections(int max_conn)
934 {
935         if (max_conn < 1 || max_conn > VINE_DP_MAX_CONNECTIONS_NUM)
936                 return VINE_ERROR_INVALID_PARAMETER;
937
938         mMaxConnNum = max_conn;
939         return VINE_ERROR_NONE;
940 }
941
942 int DPPubSub::set_accepted_cb(vine_dp_accepted_cb callback, void *user_data)
943 {
944         return VINE_ERROR_INVALID_OPERATION;
945 }
946
947 int DPPubSub::unset_accepted_cb()
948 {
949         return VINE_ERROR_INVALID_OPERATION;
950 }
951
952 int DPPubSub::set_peer_joined_cb(vine_dp_peer_joined_cb callback, void *user_data)
953 {
954         mPeerJoinedCb = callback;
955         mPeerJoinedCbData = user_data;
956
957         return VINE_ERROR_NONE;
958 }
959
960 int DPPubSub::unset_peer_joined_cb()
961 {
962         mPeerJoinedCb = NULL;
963         mPeerJoinedCbData = NULL;
964
965         return VINE_ERROR_NONE;
966 }
967
968 int DPPubSub::set_peer_left_cb(vine_dp_peer_left_cb callback, void *user_data)
969 {
970         mPeerLeftCb = callback;
971         mPeerLeftCbData = user_data;
972
973         return VINE_ERROR_NONE;
974 }
975
976 int DPPubSub::unset_peer_left_cb()
977 {
978         mPeerLeftCb = NULL;
979         mPeerLeftCbData = NULL;
980
981         return VINE_ERROR_NONE;
982 }
983
984 void DPPubSub::invoke_peer_joined_cb(const char *peer_id)
985 {
986         if (mPeerJoinedCb)
987                 mPeerJoinedCb(static_cast<void *>(this), peer_id, mPeerJoinedCbData);
988 }
989
990 void DPPubSub::invoke_peer_left_cb(const char *peer_id)
991 {
992         if (mPeerLeftCb)
993                 mPeerLeftCb(static_cast<void *>(this), peer_id, mPeerJoinedCbData);
994 }
995
996 int DPPubSub::connect(const char *service_name, const char *ip, int port)
997 {
998         vine_data_path_h datapath = NULL;
999         const char *iface_name = mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL;
1000         struct pubsub_connect_data *conn_data
1001                 = (struct pubsub_connect_data *)calloc(1, sizeof(struct pubsub_connect_data));
1002
1003         conn_data->dp = static_cast<void *>(this);
1004         conn_data->service_name = STRDUP(service_name);
1005
1006         int ret = vine_data_path_connect(mAddrFamily, ip, port, iface_name,
1007                         mSecurity, service_name, mId.c_str(),
1008                         _pubsub_connected_cb, (void *)conn_data,
1009                         &datapath, mEventQueue);
1010         return ret;
1011 }
1012
1013 int DPPubSub::close_server_dp()
1014 {
1015         int ret = vine_data_path_close(mServerDataPath);
1016         mServerDataPath = NULL;
1017         return ret;
1018 }
1019
1020 void DPPubSub::create_id(char id[])
1021 {
1022         const char *map = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
1023         const size_t map_len = strlen(map);
1024
1025         string rand_str;
1026         srand(time(NULL));
1027         generate_n(back_inserter(rand_str), 6,
1028                 [&](){
1029                         return map[rand() % map_len];
1030                 });
1031         snprintf(id, VINE_MAX_SERVICE_NAME_LEN, "%s-%s",
1032                         VINE_DP_PUBSUB_SERVICE_NAME_PREFIX, rand_str.c_str());
1033 }
1034
1035 int DPPubSub::publish_service()
1036 {
1037         vine_service_h service;
1038         int ret;
1039         char rank_str[VINE_DP_PUBSUB_RANK_LEN] = {0 , };
1040
1041         ret = vine_service_create(&service);
1042         if (ret != VINE_ERROR_NONE)
1043                 return ret;
1044
1045         vine_service_set_type(service, mTopic.c_str());
1046         vine_service_set_port(service, mListenPort);
1047
1048         mRank = create_rank();
1049         sprintf(rank_str, "%d", mRank);
1050
1051         vine_service_set_name(service, mId.c_str());
1052         vine_service_add_attribute(service, VINE_DP_PUBSUB_RANK_KEY, (const char *)rank_str);
1053
1054         if (mSdPub == NULL) {
1055                 ret = vine_disc_create(VINE_DISCOVERY_METHOD_DNS_SD, &mSdPub);
1056                 if (ret != VINE_ERROR_NONE) {
1057                         VINE_LOGE("Fail to vine_disc_create");
1058                         vine_service_destroy(service);
1059                         return ret;
1060                 }
1061         }
1062         ret = vine_disc_publish(mSdPub,
1063                         service, NULL,
1064                         _service_published_cb, static_cast<void *>(this),
1065                         mEventQueue);
1066         if (ret != VINE_ERROR_NONE) {
1067                 vine_disc_destroy(mSdPub);
1068                 mSdPub = NULL;
1069         }
1070
1071         vine_service_destroy(service);
1072
1073         VINE_LOGD("Publish %s:%d with rank %d", mTopic.c_str(), mListenPort, mRank);
1074         return ret;
1075 }
1076
1077 int DPPubSub::subscribe_service()
1078 {
1079         int ret;
1080
1081         if (mSdSub == NULL) {
1082                 ret = vine_disc_create(VINE_DISCOVERY_METHOD_DNS_SD, &mSdSub);
1083                 RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to vine_disc_create");
1084         }
1085
1086         ret = vine_disc_subscribe(mSdSub,
1087                         mTopic.c_str(), NULL,
1088                         _service_discovered_cb, static_cast<void *>(this),
1089                         mEventQueue);
1090         if (ret != VINE_ERROR_NONE) {
1091                 vine_disc_destroy(mSdSub);
1092                 mSdSub = NULL;
1093         }
1094
1095         return ret;
1096 }
1097
1098 int DPPubSub::create_rank()
1099 {
1100         srand(time(NULL));
1101         return rand() % VINE_DP_PUBSUB_RANK_MAX;
1102 }
1103
1104 int DPPubSub::open(vine_dp_opened_cb callback, void *user_data)
1105 {
1106         RET_ERR_IF_DP_OPEN_STATE_ISNT_IDLE(mOpenState);
1107
1108         mOpenState = VINE_DP_OPEN_STATE_WAIT;
1109         mOpenedCb = callback;
1110         mOpenedCbData = user_data;
1111
1112         char service_name[VINE_MAX_SERVICE_NAME_LEN + 1] = {0 , };
1113         create_id(service_name);
1114         set_id(service_name);
1115
1116         int ret = vine_data_path_open(mAddrFamily, mListenPort,
1117                         mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL,
1118                         mMaxConnNum, mSecurity, mId.c_str(),
1119                         _pubsub_opened_cb, static_cast<void *>(this),
1120                         _pubsub_accepted_cb, static_cast<void *>(this),
1121                         &mServerDataPath, mEventQueue);
1122         if (ret != VINE_ERROR_NONE) {
1123                 mOpenedCb = NULL;
1124                 mOpenedCbData = NULL;
1125                 mOpenState = VINE_DP_OPEN_STATE_NONE;
1126                 return ret;
1127         }
1128
1129         return VINE_ERROR_NONE;
1130 }
1131
1132 void DPPubSub::close()
1133 {
1134         vine_disc_stop_publish(mSdPub);
1135         vine_disc_stop_subscribe(mSdSub);
1136
1137         vine_disc_destroy(mSdSub);
1138         mSdSub = NULL;
1139         vine_disc_destroy(mSdPub);
1140         mSdPub = NULL;
1141
1142         clear_joined_peer();
1143         vine_data_path_close(mServerDataPath);
1144         vine_data_path_destroy(mServerDataPath);
1145         mServerDataPath = NULL;
1146
1147         mOpenState = VINE_DP_OPEN_STATE_NONE;
1148 }
1149
1150 int DPPubSub::send(unsigned char *buf, size_t len)
1151 {
1152         int ret = VINE_ERROR_NONE;
1153
1154         for (auto &peer : mDataPathList) {
1155                 if (!peer.second)
1156                         continue;
1157                 ret = vine_data_path_write(peer.second, buf, len);
1158                 if (ret != VINE_ERROR_NONE) {
1159                         VINE_LOGE("fail to write to a peer[%p] ", peer.second);
1160                         break;
1161                 }
1162         }
1163
1164         return ret;
1165 }
1166
1167 int DPPubSub::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
1168 {
1169         auto &dp_info = mRecvDataPathList.front();
1170         size_t bytes = 0;
1171         int ret = vine_data_path_read(dp_info.first, buf, buf_len, &bytes);
1172         if (ret != VINE_ERROR_NONE)
1173                 return ret;
1174
1175         dp_info.second -= bytes;
1176         VINE_LOGD("%zd bytes remained", dp_info.second);
1177
1178         *read_len = bytes;
1179         if (dp_info.second == 0)
1180                 mRecvDataPathList.pop();
1181
1182         return VINE_ERROR_NONE;
1183 }
1184
1185 bool DPPubSub::is_joined_peer(const char *service_name, const char *ip)
1186 {
1187         if (mId.compare(service_name) == 0) {
1188                 VINE_LOGD("It's me!");
1189                 mIpList.insert(ip);
1190                 return true;
1191         }
1192
1193         return (mDataPathList.count(string(service_name)) > 0);
1194 }
1195
1196 int DPPubSub::get_joined_peer()
1197 {
1198         return mDataPathList.size();
1199 }
1200
1201 const char *DPPubSub::get_joined_peer_id(vine_data_path_h datapath)
1202 {
1203         if (!datapath)
1204                 return NULL;
1205
1206         DPMap::iterator found = std::find_if(mDataPathList.begin(),
1207                         mDataPathList.end(),
1208                         [datapath](std::pair<std::string, vine_data_path_h> const& item)
1209                         {
1210                                 VINE_LOGD("item.second[%p]", item.second);
1211                                 return (item.second == datapath);
1212                         });
1213
1214         if (found == mDataPathList.end()) {
1215                 VINE_LOGE("Cannot find the datapath[%p].", datapath);
1216                 return NULL;
1217         }
1218
1219         return found->first.c_str();
1220 }
1221
1222 void DPPubSub::add_joined_peer(const char *service_name, vine_data_path_h datapath)
1223 {
1224         if (!service_name || !datapath)
1225                 return;
1226
1227         VINE_LOGD("%s, %p is added.", service_name, datapath);
1228         mDataPathList.insert(std::make_pair(string(service_name), datapath));
1229 }
1230
1231 void DPPubSub::del_joined_peer(vine_data_path_h datapath)
1232 {
1233         if (!datapath)
1234                 return;
1235
1236         DPMap::iterator found = std::find_if(mDataPathList.begin(),
1237                         mDataPathList.end(),
1238                         [datapath](std::pair<std::string, vine_data_path_h> const& item)
1239                         {
1240                                 VINE_LOGD("item.second[%p]", item.second);
1241                                 return (item.second == datapath);
1242                         });
1243
1244         if (found == mDataPathList.end()) {
1245                 VINE_LOGE("Cannot find the datapath[%p].", datapath);
1246                 return;
1247         }
1248
1249         VINE_LOGD("datapath[%p] is deleted from list.", datapath);
1250         mDataPathList.erase(found);
1251 }
1252
1253 void DPPubSub::clear_joined_peer()
1254 {
1255         for (auto &peer : mDataPathList) {
1256                 if (!peer.second)
1257                         continue;
1258                 vine_data_path_close(peer.second);
1259                 vine_data_path_destroy(peer.second);
1260         }
1261
1262         mDataPathList.clear();
1263 }
1264
1265 void DPPubSub::noti_received_peer(vine_data_path_h datapath, size_t bytes)
1266 {
1267         mRecvDataPathList.push(std::make_pair(datapath, bytes));
1268 }
1269
1270 #if 0
1271 static int _get_last_two_octets(string ip)
1272 {
1273         size_t pos = ip.find_last_of(':');
1274         string sub = ip.substr(pos + 1);
1275
1276         if (sub.size() == 0) {
1277                 VINE_LOGE("invalid IP.");
1278                 return -1;
1279         }
1280
1281         return std::stoi(sub, NULL, 16);
1282 }
1283 #endif
1284
1285 // The smaller the last octet, the higher the priority.
1286 //  1: peers priority is higher than me
1287 //  0: same or failure
1288 // -1: peers priority is lower than me
1289 int DPPubSub::compare_ip_priority(const char *peer_ip)
1290 {
1291         std::string peer_ip_str(peer_ip);
1292         size_t pos = peer_ip_str.find_last_of('.');
1293         std::string subnet = peer_ip_str.substr(0, pos);
1294         std::string found_ip;
1295         int last = 0, peer_last = 0;
1296
1297         if (subnet.size() == 0)
1298                 return 0;
1299
1300         for (auto &ip : mIpList) {
1301                 if (ip.find(subnet) != 0)
1302                         continue;
1303
1304                 found_ip = ip;
1305                 break;
1306         }
1307
1308         VINE_LOGD("peer_ip[%s] found_ip[%s] subnet[%s]", peer_ip, found_ip.c_str(), subnet.c_str());
1309         try {
1310                 last = std::stoi(found_ip.substr(pos + 1));
1311                 peer_last = std::stoi(peer_ip_str.substr(pos + 1));
1312         } catch (...) {
1313                 VINE_LOGD("Failed to convert last octet. IP is invalid.");
1314         }
1315
1316         if (last == peer_last)
1317                 return 0;
1318         return (last < peer_last ? 1 : -1);
1319 }
1320
1321 // If true is return, it will connect to a peer.
1322 bool DPPubSub::check_if_connect(const char *peer_rank,
1323                 vine_address_family_e ip_type, const char *peer_ip, int peer_port)
1324 {
1325         // The smaller the rank value, the higher the priority.
1326         // Connect to a peer when both 1 and 2 are satisfied.
1327         // 1. peer has rank key and value
1328         // 2. the rank value is higher than peers value
1329         int prank = std::stoi(peer_rank);
1330         VINE_LOGD("ip_type[%d] rank[%d], Peer rank[%s/%d] peer_ip[%s] peer_port[%d]",
1331                         ip_type, mRank, peer_rank, prank, peer_ip, peer_port);
1332         if (mRank > prank)
1333                 return true;
1334         else if (mRank < prank)
1335                 return false;
1336
1337         // If rank is the same, Need to check the IP address
1338         int is_higher = 0;
1339         if (ip_type == VINE_ADDRESS_FAMILY_IPV4) {
1340                 is_higher = compare_ip_priority(peer_ip);
1341         } else {
1342                 // TODO : handle IPv6 address
1343         }
1344
1345         if (is_higher == 1)
1346                 return true;
1347
1348         // If last value of IP is the same, Need to check the port
1349         if (mListenPort > peer_port)
1350                 return true;
1351
1352         return false;
1353 }
1354
1355 int _vine_dp_create(vine_session_h session, vine_dp_type_e type, vine_dp_h *dp)
1356 {
1357         RET_VAL_IF(session == NULL, VINE_ERROR_INVALID_PARAMETER, "session is null.");
1358         RET_VAL_IF(type >= VINE_DP_TYPE_UNKNOWN, VINE_ERROR_INVALID_PARAMETER, "invalid type");
1359         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1360
1361         vine_event_queue_h eq = NULL;
1362         int ret = _vine_session_get_event_queue(session, &eq);
1363
1364         if (ret != VINE_ERROR_NONE || !eq)
1365                 return VINE_ERROR_INVALID_PARAMETER;
1366
1367         if (type == VINE_DP_TYPE_SERVER) {
1368                 *dp = new DPServer((void *)eq);
1369         } else if (type == VINE_DP_TYPE_CLIENT) {
1370                 *dp = new DPClient((void *)eq);
1371         } else if (type == VINE_DP_TYPE_PUBSUB) {
1372                 *dp = new DPPubSub((void *)eq);
1373         }
1374
1375         return VINE_ERROR_NONE;
1376 }
1377
1378 int _vine_dp_destroy(vine_dp_h dp)
1379 {
1380         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1381
1382         DataPath *_dp = static_cast<DataPath *>(dp);
1383         delete _dp;
1384
1385         return VINE_ERROR_NONE;
1386 }
1387
1388 int _vine_dp_get_id(vine_dp_h dp, char **id)
1389 {
1390         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1391         RET_VAL_IF(id == NULL, VINE_ERROR_INVALID_PARAMETER, "id is null.");
1392
1393         DataPath *_dp = static_cast<DataPath *>(dp);
1394         return _dp->get_id(id);
1395 }
1396
1397 int _vine_dp_set_iface_name(vine_dp_h dp, const char *iface_name)
1398 {
1399         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1400         RET_VAL_IF(iface_name == NULL, VINE_ERROR_INVALID_PARAMETER, "iface_name is null.");
1401
1402         DataPath *_dp = static_cast<DataPath *>(dp);
1403         return _dp->set_iface_name(iface_name);
1404 }
1405
1406 int _vine_dp_set_addr_family(vine_dp_h dp, vine_address_family_e addr_family)
1407 {
1408         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1409         RET_VAL_IF(addr_family < VINE_ADDRESS_FAMILY_DEFAULT
1410                         || addr_family > VINE_ADDRESS_FAMILY_IPV6,
1411                         VINE_ERROR_INVALID_PARAMETER, "addr_family is invalid.");
1412
1413         DataPath *_dp = static_cast<DataPath *>(dp);
1414         return _dp->set_addr_family(addr_family);
1415 }
1416
1417 int _vine_dp_set_remote_ip(vine_dp_h dp, vine_address_family_e addr_family, const char *ip)
1418 {
1419         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1420         RET_VAL_IF(ip == NULL, VINE_ERROR_INVALID_PARAMETER, "ip is null.");
1421
1422         DataPath *_dp = static_cast<DataPath *>(dp);
1423         return _dp->set_remote_ip(addr_family, ip);
1424 }
1425
1426 int _vine_dp_get_remote_ip(vine_dp_h dp, vine_address_family_e *addr_family, char **ip)
1427 {
1428         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1429         RET_VAL_IF(addr_family == NULL, VINE_ERROR_INVALID_PARAMETER, "addr_family is null.");
1430         RET_VAL_IF(ip == NULL, VINE_ERROR_INVALID_PARAMETER, "ip is null.");
1431
1432         DataPath *_dp = static_cast<DataPath *>(dp);
1433         return _dp->get_remote_ip(addr_family, ip);
1434 }
1435
1436 int _vine_dp_set_remote_port(vine_dp_h dp, int port)
1437 {
1438         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1439
1440         DataPath *_dp = static_cast<DataPath *>(dp);
1441         return _dp->set_remote_port(port);
1442 }
1443
1444 int _vine_dp_get_remote_port(vine_dp_h dp, int *port)
1445 {
1446         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1447         RET_VAL_IF(port == NULL, VINE_ERROR_INVALID_PARAMETER, "port is null.");
1448
1449         DataPath *_dp = static_cast<DataPath *>(dp);
1450         int remote_port = _dp->get_remote_port();
1451         if (remote_port < 0)
1452                 return VINE_ERROR_INVALID_OPERATION;
1453         *port = remote_port;
1454         return VINE_ERROR_NONE;
1455 }
1456
1457 int _vine_dp_get_ip(vine_dp_h dp, vine_address_family_e *addr_family, char **ip)
1458 {
1459         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1460         RET_VAL_IF(addr_family == NULL, VINE_ERROR_INVALID_PARAMETER, "addr_family is null.");
1461         RET_VAL_IF(ip == NULL, VINE_ERROR_INVALID_PARAMETER, "ip is null.");
1462
1463         DataPath *_dp = static_cast<DataPath *>(dp);
1464         return _dp->get_ip(addr_family, ip);
1465 }
1466
1467 int _vine_dp_set_port(vine_dp_h dp, int port)
1468 {
1469         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1470
1471         DataPath *_dp = static_cast<DataPath *>(dp);
1472         return _dp->set_port(port);
1473 }
1474
1475 int _vine_dp_get_port(vine_dp_h dp, int *port)
1476 {
1477         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1478         RET_VAL_IF(port == NULL, VINE_ERROR_INVALID_PARAMETER, "port is null.");
1479
1480         DataPath *_dp = static_cast<DataPath *>(dp);
1481         *port = _dp->get_port();
1482         return VINE_ERROR_NONE;
1483 }
1484
1485 int _vine_dp_set_topic(vine_dp_h dp, const char *topic)
1486 {
1487         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1488         RET_VAL_IF(topic == NULL, VINE_ERROR_INVALID_PARAMETER, "topic is null.");
1489         RET_VAL_IF(_check_topic_len(topic) == false,
1490                 VINE_ERROR_INVALID_PARAMETER, "invalid length of topic.");
1491
1492         DataPath *_dp = static_cast<DataPath *>(dp);
1493         return _dp->set_topic(topic);
1494 }
1495
1496 int _vine_dp_set_max_connections(vine_dp_h dp, int max_conn)
1497 {
1498         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1499
1500         DataPath *_dp = static_cast<DataPath *>(dp);
1501         return _dp->set_max_connections(max_conn);
1502 }
1503
1504 int _vine_dp_set_security(vine_dp_h dp, vine_security_h security)
1505 {
1506         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1507         RET_VAL_IF(security == NULL, VINE_ERROR_INVALID_PARAMETER, "security is null.");
1508
1509         DataPath *_dp = static_cast<DataPath *>(dp);
1510         return _dp->set_security(security);
1511 }
1512
1513 int _vine_dp_set_accepted_cb(vine_dp_h dp, vine_dp_accepted_cb callback, void *user_data)
1514 {
1515         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1516         RET_VAL_IF(callback == NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1517
1518         DataPath *_dp = static_cast<DataPath *>(dp);
1519         return _dp->set_accepted_cb(callback, user_data);
1520 }
1521
1522 int _vine_dp_unset_accepted_cb(vine_dp_h dp)
1523 {
1524         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1525
1526         DataPath *_dp = static_cast<DataPath *>(dp);
1527         return _dp->unset_accepted_cb();
1528 }
1529
1530 int _vine_dp_set_terminated_cb(vine_dp_h dp, vine_dp_terminated_cb callback, void *user_data)
1531 {
1532         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1533         RET_VAL_IF(callback == NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1534
1535         DataPath *_dp = static_cast<DataPath *>(dp);
1536         return _dp->set_terminated_cb(callback, user_data);
1537 }
1538
1539 int _vine_dp_unset_terminated_cb(vine_dp_h dp)
1540 {
1541         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1542
1543         DataPath *_dp = static_cast<DataPath *>(dp);
1544         return _dp->unset_terminated_cb();
1545 }
1546
1547 int _vine_dp_open(vine_dp_h dp, vine_dp_opened_cb callback, void *user_data)
1548 {
1549         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1550         RET_VAL_IF(callback == NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1551
1552         DataPath *_dp = static_cast<DataPath *>(dp);
1553         return _dp->open(callback, user_data);
1554 }
1555
1556 int _vine_dp_close(vine_dp_h dp)
1557 {
1558         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1559
1560         DataPath *_dp = static_cast<DataPath *>(dp);
1561         _dp->close();
1562         return VINE_ERROR_NONE;
1563 }
1564
1565 int _vine_dp_send(vine_dp_h dp, unsigned char *buf, size_t len)
1566 {
1567         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1568         RET_VAL_IF(buf == NULL, VINE_ERROR_INVALID_PARAMETER, "buf is null.");
1569
1570         DataPath *_dp = static_cast<DataPath *>(dp);
1571         return _dp->send(buf, len);
1572 }
1573
1574 int _vine_dp_recv(vine_dp_h dp, unsigned char *buf, size_t buf_len, size_t *read_len)
1575 {
1576         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1577         RET_VAL_IF(buf == NULL, VINE_ERROR_INVALID_PARAMETER, "buf is null.");
1578         RET_VAL_IF(read_len == NULL, VINE_ERROR_INVALID_PARAMETER, "read_len is null.");
1579
1580         DataPath *_dp = static_cast<DataPath *>(dp);
1581         return _dp->recv(buf, buf_len, read_len);
1582 }
1583
1584 int _vine_dp_set_received_cb(vine_dp_h dp, vine_dp_received_cb callback, void *user_data)
1585 {
1586         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1587         RET_VAL_IF(callback== NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1588
1589         DataPath *_dp = static_cast<DataPath *>(dp);
1590         return _dp->set_received_cb(callback, user_data);
1591 }
1592
1593 int _vine_dp_unset_received_cb(vine_dp_h dp)
1594 {
1595         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1596
1597         DataPath *_dp = static_cast<DataPath *>(dp);
1598         return _dp->unset_received_cb();
1599 }
1600
1601 int _vine_dp_set_peer_joined_cb(vine_dp_h dp, vine_dp_peer_joined_cb callback, void *user_data)
1602 {
1603         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1604         RET_VAL_IF(callback== NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1605
1606         DataPath *_dp = static_cast<DataPath *>(dp);
1607         return _dp->set_peer_joined_cb(callback, user_data);
1608 }
1609
1610 int _vine_dp_unset_peer_joined_cb(vine_dp_h dp)
1611 {
1612         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1613
1614         DataPath *_dp = static_cast<DataPath *>(dp);
1615         return _dp->unset_peer_joined_cb();
1616 }
1617
1618 int _vine_dp_set_peer_left_cb(vine_dp_h dp, vine_dp_peer_left_cb callback, void *user_data)
1619 {
1620         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1621         RET_VAL_IF(callback== NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1622
1623         DataPath *_dp = static_cast<DataPath *>(dp);
1624         return _dp->set_peer_left_cb(callback, user_data);
1625 }
1626
1627 int _vine_dp_unset_peer_left_cb(vine_dp_h dp)
1628 {
1629         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1630
1631         DataPath *_dp = static_cast<DataPath *>(dp);
1632         return _dp->unset_peer_left_cb();
1633 }