Unset ip_resolved_cb on vine_service_destroy
[platform/core/api/vine.git] / src / vine-dp.cpp
1 /*
2  * Copyright (c) 2021 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License")
5 {
6 }
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18
19 #include "vine-dp.h"
20
21 #include "vine-constants.h"
22 #include "vine-data-path.h"
23 #include "vine-event-loop.h"
24 #include "vine-log.h"
25 #include "vine-session.h"
26 #include "vine-security.h"
27 #include "vine-utils.h"
28
29 #include <algorithm>
30 #include <string>
31 #include <utility>
32 #include <arpa/inet.h>
33
34 using namespace vine;
35
36 extern vine_dp_plugin_fn g_dp_plugin_fn;
37
38 static bool _check_if_valid_ip(vine_address_family_e addr_family, const char *ip)
39 {
40         int ret = 0;
41         unsigned char buf[sizeof(struct in6_addr)];
42
43         if (addr_family == VINE_ADDRESS_FAMILY_IPV4) {
44                 ret = inet_pton(AF_INET, ip, buf);
45         } else if (addr_family == VINE_ADDRESS_FAMILY_IPV6) {
46                 ret = inet_pton(AF_INET6, ip, buf);
47         } else {
48                 ret |= inet_pton(AF_INET, ip, buf);
49                 ret |= inet_pton(AF_INET6, ip, buf);
50         }
51
52         return (ret == 1);
53 }
54
55 static bool _check_topic_len(const char * topic)
56 {
57         RET_VAL_IF(topic == NULL, false, "topic is NULL");
58         int len = strlen(topic);
59         return len > 0 && len <= VINE_MAX_TOPIC_LEN;
60 }
61
62 static void _received_cb(vine_data_path_h datapath, size_t received_len, void *user_data)
63 {
64         if (!datapath || !user_data)
65                 return;
66
67         static_cast<DataPath *>(user_data)->invoke_received_cb(received_len);
68 }
69
70 static void _terminated_cb(vine_data_path_h datapath, void *user_data)
71 {
72         if (!datapath || !user_data)
73                 return;
74
75         static_cast<DataPath *>(user_data)->invoke_terminated_cb();
76 }
77
78 static void _opened_cb(vine_data_path_h datapath, int result, int port, void *userdata)
79 {
80         DataPath *dp = static_cast<DataPath *>(userdata);
81         dp->set_port(port);
82
83         VINE_LOGI("port[%d] result[%d]", port, result);
84         dp->invoke_opened_cb((vine_error_e)result);
85 }
86
87 static void _accepted_cb(vine_data_path_h datapath, void *user_data)
88 {
89         if (!datapath || !user_data)
90                 return;
91
92         void *event_fd = static_cast<DataPath *>(user_data)->get_eventfd();
93
94         // datapath is created newly. DP class should be needed for it.
95         // event_fd is the same as corresponding DPServer.
96         DPClient *connected_client_dp = new DPClient(event_fd, datapath);
97         _vine_data_path_set_received_cb(datapath,
98                         _received_cb, static_cast<void *>(connected_client_dp));
99         vine_data_path_set_terminated_cb(datapath,
100                         _terminated_cb, static_cast<void *>(connected_client_dp));
101         static_cast<DPServer *>(user_data)->invoke_accepted_cb(connected_client_dp);
102 }
103
104 static void _connected_cb(vine_data_path_h datapath, int result, void *user_data)
105 {
106         _vine_data_path_set_received_cb(datapath, _received_cb, user_data);
107         vine_data_path_set_terminated_cb(datapath, _terminated_cb, user_data);
108         static_cast<DataPath *>(user_data)->invoke_opened_cb(result);
109 }
110
111 static void _pubsub_received_cb(vine_data_path_h datapath, size_t received_len, void *user_data)
112 {
113         if (!datapath || !user_data || received_len == 0)
114                 return;
115
116         VINE_LOGD("receive %zd bytes from datapath[%p]", received_len, datapath);
117
118         static_cast<DPPubSub *>(user_data)->noti_received_peer(datapath, received_len);
119         static_cast<DataPath *>(user_data)->invoke_received_cb(received_len);
120 }
121
122 static void _pubsub_terminated_cb(vine_data_path_h datapath, void *user_data)
123 {
124         if (!datapath || !user_data)
125                 return;
126
127         VINE_LOGD("datapath[%p] is terminated by peer.", datapath);
128         static_cast<DPPubSub *>(user_data)->del_joined_peer(datapath);
129         _vine_data_path_destroy(datapath);
130 }
131
132 static void _pubsub_opened_cb(vine_data_path_h datapath, int result, int port, void *userdata)
133 {
134         if (!userdata)
135                 return;
136
137         DPPubSub *dp = static_cast<DPPubSub *>(userdata);
138         dp->set_port(port);
139
140         VINE_LOGI("port[%d] result[%d]", port, result);
141
142         // Notify user that a listen socket cannot be used anymore.
143         if (result != VINE_ERROR_NONE) {
144                 static_cast<DataPath *>(userdata)->invoke_opened_cb(result);
145                 dp->close();
146                 return;
147         }
148
149         int ret = dp->publish_service();
150         if (ret != VINE_ERROR_NONE) {
151                 dp->close();
152                 static_cast<DataPath *>(userdata)->invoke_opened_cb(ret);
153                 return;
154         }
155
156         ret = dp->subscribe_service();
157         if (ret != VINE_ERROR_NONE) {
158                 dp->close();
159                 static_cast<DataPath *>(userdata)->invoke_opened_cb(ret);
160                 return;
161         }
162 }
163
164 static void _pubsub_accepted_cb(vine_data_path_h datapath, void *user_data)
165 {
166         if (!datapath || !user_data)
167                 return;
168
169         DPPubSub *dp = static_cast<DPPubSub *>(user_data);
170         if (dp->get_max_conns() <= dp->get_joined_peer()) {
171                 VINE_LOGE("The max connection limit is reached. Ignore [%p].", datapath);
172                 dp->decrease_init_disc_num();
173                 _vine_data_path_close(datapath);
174                 return;
175         }
176
177         const char *ip = _vine_data_path_get_ip(datapath);
178         int port = _vine_data_path_get_port(datapath);
179         dp->add_joined_peer(ip, port, datapath);
180
181         _vine_data_path_set_received_cb(datapath, _pubsub_received_cb, user_data);
182         vine_data_path_set_terminated_cb(datapath, _pubsub_terminated_cb, user_data);
183
184         if (dp->get_open_state() == VINE_DP_PUBSUB_OPEN_STATE_WAIT
185                         && dp->decrease_init_disc_num() <= 0) {
186                 dp->set_open_state(VINE_DP_PUBSUB_OPEN_STATE_DONE);
187                 static_cast<DataPath *>(user_data)->invoke_opened_cb(VINE_ERROR_NONE);
188         }
189 }
190
191 static void _pubsub_connected_cb(vine_data_path_h datapath, int result, void *user_data)
192 {
193         if (!datapath || !user_data)
194                 return;
195
196         DPPubSub *dp = static_cast<DPPubSub *>(user_data);
197
198         if (result != 0) {
199                 VINE_LOGE("connect failure.");
200                 dp->decrease_init_disc_num();
201                 return;
202         }
203
204         const char *ip = _vine_data_path_get_ip(datapath);
205         int port = _vine_data_path_get_port(datapath);
206
207         dp->add_joined_peer(ip, port, datapath);
208
209         _vine_data_path_set_received_cb(datapath, _pubsub_received_cb, user_data);
210         vine_data_path_set_terminated_cb(datapath, _pubsub_terminated_cb, user_data);
211
212         if (dp->get_open_state() == VINE_DP_PUBSUB_OPEN_STATE_WAIT
213                         && dp->decrease_init_disc_num() <= 0) {
214                 dp->set_open_state(VINE_DP_PUBSUB_OPEN_STATE_DONE);
215                 static_cast<DataPath *>(user_data)->invoke_opened_cb(VINE_ERROR_NONE);
216         }
217 }
218
219 // for pubsub
220 static int __vine_set_discovered_service(vine_service_h service,
221                 const char *service_type, const char *service_name,
222                 const char *host_name, int port,
223                 const map<string, string> &attr, const char *iface_name)
224 {
225         int ret = VINE_ERROR_NONE;
226         ret = _vine_service_set_type(service, service_type);
227         RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set service type");
228         ret = _vine_service_set_name(service, service_name);
229         RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set service name");
230         ret = _vine_service_set_host_name(service, host_name);
231         RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set host name");
232         ret = _vine_service_set_port(service, port);
233         RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set port");
234
235         for (const auto &kv : attr)
236                 _vine_service_add_attribute(service, kv.first.c_str(), kv.second.c_str());
237
238         ret = _vine_service_set_iface_name(service, iface_name);
239         RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to set iface_name");
240
241         return VINE_ERROR_NONE;
242 }
243
244 static void _ip_resolved_cb(vine_disc_h disc, vine_service_h service, bool add,
245                                 const char *ip, vine_address_family_e address_family, void *user_data)
246 {
247         if (!user_data || !add) {
248                 VINE_LOGD("state: %s", add ? "add" : "remove");
249                 return;
250         }
251
252         DPPubSub *dp = static_cast<DPPubSub *>(user_data);
253         int port = _vine_service_get_port(service);
254         if (dp->is_joined_peer(_vine_service_get_name(service), ip, port)) {
255                 VINE_LOGD("%s:%d was already joined.", ip, port);
256                 return;
257         }
258
259         VINE_LOGD("IP Resolved %s:%d", ip, port);
260
261         vine_address_family_e supported_addr_family = dp->get_addr_family();
262         if (supported_addr_family != VINE_ADDRESS_FAMILY_DEFAULT
263                         && supported_addr_family != address_family) {
264                 VINE_LOGD("address family is dismatched. peer type[%d]", address_family);
265                 return;
266         }
267
268         auto attr = _vine_service_get_attributes(service);
269         auto it = attr.find(VINE_DP_PUBSUB_RANK_KEY);
270         if (it == attr.end()) {
271                 VINE_LOGE("peer doens't have a rank.");
272                 return;
273         }
274
275         if (dp->get_max_conns() > dp->get_joined_peer()
276                         && dp->check_if_connect(it->second.c_str(), address_family, ip, port)) {
277                 VINE_LOGD("Try to connect a peer(%s:%d)", ip, port);
278                 dp->connect(ip, port);
279         }
280 }
281
282 static void _service_discovered_cb(vine_disc_h disc, bool available,
283                 const char *service_type, const char *service_name,
284                 const char *host_name, int port, const map<string, string> &attr,
285                 const char *iface_name, int more_coming, void *user_data)
286 {
287         VINE_LOGD("%s is discovered. %s",
288                         service_name, available ? "available" : "not available");
289
290         if (!user_data || !available)
291                 return;
292
293         vine_disc_h disc_handle;
294         int ret = vine_disc_create(VINE_DISCOVERY_METHOD_DNS_SD, &disc_handle);
295         RET_IF(ret != VINE_ERROR_NONE, "Fail to create a disc");
296
297         vine_service_h service;
298         ret = _vine_service_create(&service, false);
299         if (ret != VINE_ERROR_NONE) {
300                 vine_disc_destroy(disc_handle);
301                 return;
302         }
303
304         auto it = attr.find(VINE_DP_PUBSUB_RANK_KEY);
305         if (it == attr.end()) {
306                 VINE_LOGE("peer doens't have a rank.");
307                 _vine_service_destroy(service);
308                 vine_disc_destroy(disc_handle);
309                 return;
310         }
311
312         ret = _vine_service_set_disc_handle(service, disc_handle);
313         if (ret != VINE_ERROR_NONE) {
314                 VINE_LOGE("Fail to set disc_handle. error(%d)", ret);
315                 _vine_service_destroy(service);
316                 vine_disc_destroy(disc_handle);
317                 return;
318         }
319
320         ret = __vine_set_discovered_service(service,
321                         service_type, service_name, host_name, port, attr, iface_name);
322         if (ret != VINE_ERROR_NONE) {
323                 VINE_LOGE("Fail to set a service. error(%d)", ret);
324                 _vine_service_destroy(service);
325                 vine_disc_destroy(disc_handle);
326                 return;
327         }
328
329         DPPubSub *dp = static_cast<DPPubSub *>(user_data);
330         if (more_coming && dp->get_open_state() == VINE_DP_PUBSUB_OPEN_STATE_WAIT) {
331                 VINE_LOGD("At least one more result is exist.");
332                 dp->increase_init_disc_num();
333         }
334
335         ret = vine_disc_resolve_ip(disc_handle, service,
336                         _ip_resolved_cb, user_data,
337                         (vine_event_queue_h)static_cast<DataPath *>(user_data)->get_eventfd());
338         if (ret != VINE_ERROR_NONE) {
339                 VINE_LOGE("Fail to resolve IP. error(%d)", ret);
340                 _vine_service_destroy(service);
341                 vine_disc_destroy(disc_handle);
342                 return;
343         }
344 }
345
346 static void _service_published_cb(vine_disc_h disc,
347                                 const char *service_name, vine_error_e error, void *user_data)
348 {
349         VINE_LOGD("%s publish request %s.",
350                         service_name, error == VINE_ERROR_NONE ? "succeed" : "failed");
351
352         if (!user_data)
353                 return;
354
355         DPPubSub *dp = static_cast<DPPubSub *>(user_data);
356         dp->set_service_name(service_name);
357 }
358
359 int DataPath::set_security(void *security)
360 {
361         return _vine_security_clone(&mSecurity, security);
362 }
363
364 int DataPath::set_iface_name(const std::string &iface_name)
365 {
366         mIfaceName = iface_name;
367         return VINE_ERROR_NONE;
368 }
369
370 void DataPath::invoke_opened_cb(int result)
371 {
372         if (mOpenedCb)
373                 mOpenedCb(static_cast<void *>(this), (vine_error_e)result, mOpenedCbData);
374
375         // called only once.
376         mOpenedCb = NULL;
377         mOpenedCbData = NULL;
378 }
379
380 int DataPath::set_received_cb(vine_dp_received_cb callback, void *user_data)
381 {
382         mReceivedCb = callback;
383         mReceivedCbData = user_data;
384         return VINE_ERROR_NONE;
385 }
386
387 int DataPath::unset_received_cb()
388 {
389         mReceivedCb = NULL;
390         mReceivedCbData = NULL;
391         return VINE_ERROR_NONE;
392 }
393
394 void DataPath::invoke_received_cb(size_t received_len)
395 {
396         if (mReceivedCb)
397                 mReceivedCb(static_cast<void *>(this), received_len, mReceivedCbData);
398 }
399
400 int DataPath::set_terminated_cb(vine_dp_terminated_cb callback, void *user_data)
401 {
402         mTerminatedCb = callback;
403         mTerminatedCbData = user_data;
404         return VINE_ERROR_NONE;
405 }
406
407 int DataPath::unset_terminated_cb()
408 {
409         mTerminatedCb = NULL;
410         mTerminatedCbData = NULL;
411         return VINE_ERROR_NONE;
412 }
413
414 void DataPath::invoke_terminated_cb()
415 {
416         if (mTerminatedCb)
417                 mTerminatedCb(static_cast<void *>(this), mTerminatedCbData);
418 }
419
420 DPServer::DPServer(void *event_fd)
421 {
422         VINE_LOGD("DPServer[%p] is created.", this);
423         mEventFd = event_fd;
424         mSecurity = NULL;
425         mAddrFamily = VINE_ADDRESS_FAMILY_DEFAULT;
426         mIfaceName = "";
427         mDataPath = NULL;
428         mListenPort = 0;
429         mMaxConnNum = VINE_DP_DEFAULT_CONNECTIONS_NUM;
430         mAcceptedCb = NULL;
431         mAcceptedCbData = NULL;
432         mReceivedCb = NULL;
433         mReceivedCbData = NULL;
434         mOpenedCb = NULL;
435         mOpenedCbData = NULL;
436         mTerminatedCb = NULL;
437         mTerminatedCbData = NULL;
438 }
439
440 DPServer::~DPServer()
441 {
442         VINE_LOGD("DPServer[%p] is deleted.", this);
443         _vine_security_destroy(mSecurity);
444         _vine_data_path_destroy(mDataPath);
445 }
446
447 int DPServer::set_addr_family(vine_address_family_e addr_family)
448 {
449         mAddrFamily = addr_family;
450         return VINE_ERROR_NONE;
451 }
452
453 int DPServer::set_remote_ip(vine_address_family_e addr_family, const std::string &ip)
454 {
455         return VINE_ERROR_INVALID_OPERATION;
456 }
457
458 int DPServer::set_port(int port)
459 {
460         if (port < 0 || port > 65535)
461                 return VINE_ERROR_INVALID_PARAMETER;
462
463         mListenPort = port;
464
465         return VINE_ERROR_NONE;
466 }
467
468 int DPServer::set_topic(std::string topic)
469 {
470         return VINE_ERROR_INVALID_OPERATION;
471 }
472
473 int DPServer::set_max_connections(int max_conn)
474 {
475         if (max_conn < 1 || max_conn > VINE_DP_MAX_CONNECTIONS_NUM)
476                 return VINE_ERROR_INVALID_PARAMETER;
477
478         mMaxConnNum = max_conn;
479         return VINE_ERROR_NONE;
480 }
481
482 int DPServer::set_accepted_cb(vine_dp_accepted_cb callback, void *user_data)
483 {
484         mAcceptedCb = callback;
485         mAcceptedCbData = user_data;
486         return VINE_ERROR_NONE;
487 }
488
489 int DPServer::unset_accepted_cb()
490 {
491         mAcceptedCb = NULL;
492         mAcceptedCbData = NULL;
493         return VINE_ERROR_NONE;
494 }
495
496 void DPServer::invoke_accepted_cb(vine_dp_h dp)
497 {
498         if (mAcceptedCb)
499                 mAcceptedCb(static_cast<void *>(this), dp, mAcceptedCbData);
500 }
501
502 int DPServer::open(vine_dp_opened_cb callback, void *user_data)
503 {
504         const char *iface_name = mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL;
505
506         mOpenedCb = callback;
507         mOpenedCbData = user_data;
508
509         return vine_data_path_open(mAddrFamily, mListenPort, iface_name, mMaxConnNum, mSecurity,
510                         _opened_cb, static_cast<void *>(this),
511                         _accepted_cb, static_cast<void *>(this),
512                         &mDataPath, mEventFd);
513 }
514
515 void DPServer::close()
516 {
517         _vine_data_path_close(mDataPath);
518 }
519
520 int DPServer::send(unsigned char *buf, size_t len)
521 {
522         return _vine_data_path_write(mDataPath, buf, len);
523 }
524
525 int DPServer::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
526 {
527         return _vine_data_path_read(mDataPath, buf, buf_len, read_len);
528 }
529
530 DPClient::DPClient(void *event_fd)
531 {
532         VINE_LOGD("DPClient[%p] is created.", this);
533         mEventFd = event_fd;
534         mSecurity = NULL;
535         mAddrFamily = VINE_ADDRESS_FAMILY_DEFAULT;
536         mIfaceName = "";
537         mDataPath = NULL;
538         mServerIp = "";
539         mServerPort = 0;
540         mReceivedCb = NULL;
541         mReceivedCbData = NULL;
542         mOpenedCb = NULL;
543         mOpenedCbData = NULL;
544         mTerminatedCb = NULL;
545         mTerminatedCbData = NULL;
546 }
547
548 DPClient::DPClient(void *event_fd, void *datapath)
549 {
550         VINE_LOGD("DPClient[%p] is created with datapath[%p]", this, datapath);
551         mEventFd = event_fd;
552         mSecurity = NULL;
553         mDataPath = datapath;
554         mAddrFamily = VINE_ADDRESS_FAMILY_IPV4;
555         mServerIp = "";
556         mServerPort = 0;
557         mReceivedCb = NULL;
558         mReceivedCbData = NULL;
559         mOpenedCb = NULL;
560         mOpenedCbData = NULL;
561         mTerminatedCb = NULL;
562         mTerminatedCbData = NULL;
563 }
564
565 DPClient::~DPClient()
566 {
567         VINE_LOGD("DPClient[%p] is deleted.", this);
568         _vine_security_destroy(mSecurity);
569         _vine_data_path_destroy(mDataPath);
570 }
571
572 int DPClient::set_addr_family(vine_address_family_e addr_family)
573 {
574         return VINE_ERROR_INVALID_OPERATION;
575 }
576
577 int DPClient::set_remote_ip(vine_address_family_e addr_family, const std::string &ip)
578 {
579         if (!_check_if_valid_ip(addr_family, ip.c_str()))
580                 return VINE_ERROR_INVALID_PARAMETER;
581
582         mServerIp = ip;
583         mAddrFamily = addr_family;
584
585         return VINE_ERROR_NONE;
586 }
587
588 int DPClient::set_port(int port)
589 {
590         if (port <= 0 || port > 65535) // Do not allow 0
591                 return VINE_ERROR_INVALID_PARAMETER;
592
593         mServerPort = port;
594
595         return VINE_ERROR_NONE;
596 }
597
598 int DPClient::set_topic(std::string topic)
599 {
600         return VINE_ERROR_INVALID_OPERATION;
601 }
602
603 int DPClient::set_accepted_cb(vine_dp_accepted_cb callback, void *user_data)
604 {
605         return VINE_ERROR_INVALID_OPERATION;
606 }
607
608 int DPClient::unset_accepted_cb()
609 {
610         return VINE_ERROR_INVALID_OPERATION;
611 }
612
613 int DPClient::open(vine_dp_opened_cb callback, void *user_data)
614 {
615         const char *iface_name = mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL;
616
617         mOpenedCb = callback;
618         mOpenedCbData = user_data;
619
620         return vine_data_path_connect(mAddrFamily, mServerIp.c_str(), mServerPort,
621                         iface_name, mSecurity, _connected_cb, static_cast<void *>(this), &mDataPath, mEventFd);
622 }
623
624 void DPClient::close()
625 {
626         _vine_data_path_close(mDataPath);
627 }
628
629 int DPClient::send(unsigned char *buf, size_t len)
630 {
631         return _vine_data_path_write(mDataPath, buf, len);
632 }
633
634 int DPClient::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
635 {
636         return _vine_data_path_read(mDataPath, buf, buf_len, read_len);
637 }
638
639 DPPubSub::DPPubSub(void *event_fd)
640 {
641         VINE_LOGD("DPPubSub[%p] is created.", this);
642         mEventFd = event_fd;
643         mSecurity = NULL;
644         mIfaceName = "";
645         mReceivedCb = NULL;
646         mReceivedCbData = NULL;
647         mOpenedCb = NULL;
648         mOpenedCbData = NULL;
649         mTerminatedCb = NULL;
650         mTerminatedCbData = NULL;
651
652         // Discovery Info
653         mTopic = "";
654         mSdSub = NULL;
655         mSdPub = NULL;
656         mServiceName = "";
657         mInitDiscNum = 1;
658         mOpenState = VINE_DP_PUBSUB_OPEN_STATE_NONE;
659         mSdPubSubState = VINE_DP_PUBSUB_SD_STATE_NONE;
660
661         // Network Info
662         mAddrFamily = VINE_ADDRESS_FAMILY_DEFAULT;
663         mListenPort = 0;
664         mMaxConnNum = VINE_DP_DEFAULT_CONNECTIONS_NUM;
665         mServerDataPath = NULL;
666         mRank = 0;
667 }
668
669 DPPubSub::~DPPubSub()
670 {
671         VINE_LOGD("DPPubSub[%p] is deleted.", this);
672         _vine_security_destroy(mSecurity);
673         close();
674 }
675
676 int DPPubSub::set_addr_family(vine_address_family_e addr_family)
677 {
678         mAddrFamily = addr_family;
679         return VINE_ERROR_NONE;
680 }
681
682 int DPPubSub::set_remote_ip(vine_address_family_e addr_family, const std::string &ip)
683 {
684         return VINE_ERROR_INVALID_OPERATION;
685 }
686
687 int DPPubSub::set_port(int port)
688 {
689         if (port < 0 || port > 65535)
690                 return VINE_ERROR_INVALID_PARAMETER;
691
692         mListenPort = port;
693
694         return VINE_ERROR_NONE;
695 }
696
697 int DPPubSub::set_topic(std::string topic)
698 {
699         mTopic = topic;
700         return VINE_ERROR_NONE;
701 }
702
703 int DPPubSub::set_max_connections(int max_conn)
704 {
705         if (max_conn < 1 || max_conn > VINE_DP_MAX_CONNECTIONS_NUM)
706                 return VINE_ERROR_INVALID_PARAMETER;
707
708         mMaxConnNum = max_conn;
709         return VINE_ERROR_NONE;
710 }
711
712 int DPPubSub::set_accepted_cb(vine_dp_accepted_cb callback, void *user_data)
713 {
714         return VINE_ERROR_INVALID_OPERATION;
715 }
716
717 int DPPubSub::unset_accepted_cb()
718 {
719         return VINE_ERROR_INVALID_OPERATION;
720 }
721
722 int DPPubSub::connect(const char *ip, int port)
723 {
724         vine_data_path_h datapath;
725         const char *iface_name = mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL;
726         int ret = vine_data_path_connect(mAddrFamily, ip, port, iface_name, mSecurity,
727                         _pubsub_connected_cb, static_cast<void *>(this), &datapath, mEventFd);
728         return ret;
729 }
730
731 int DPPubSub::close_server_dp()
732 {
733         int ret = _vine_data_path_close(mServerDataPath);
734         mServerDataPath = NULL;
735         return ret;
736 }
737
738 int DPPubSub::publish_service()
739 {
740         vine_service_h service;
741         int ret;
742         char rank_str[VINE_DP_PUBSUB_RANK_LEN] = {0 , };
743         char service_name[VINE_MAX_SERVICE_NAME_LEN + 1] = {0 , };
744
745         ret = vine_service_create(&service);
746         if (ret != VINE_ERROR_NONE)
747                 return ret;
748
749         vine_service_set_type(service, mTopic.c_str());
750         vine_service_set_port(service, mListenPort);
751
752         mRank = create_rank();
753         sprintf(rank_str, "%d", mRank);
754         snprintf(service_name, VINE_MAX_SERVICE_NAME_LEN,
755                         "%s-%d", VINE_DP_PUBSUB_SERVICE_NAME_PREFIX, mRank);
756
757         vine_service_set_name(service, service_name);
758         vine_service_add_attribute(service, VINE_DP_PUBSUB_RANK_KEY, (const char *)rank_str);
759
760         if (mSdPub == NULL) {
761                 ret = vine_disc_create(VINE_DISCOVERY_METHOD_DNS_SD, &mSdPub);
762                 if (ret != VINE_ERROR_NONE) {
763                         VINE_LOGE("Fail to vine_disc_create");
764                         vine_service_destroy(service);
765                         return ret;
766                 }
767         }
768         ret = vine_disc_publish(mSdPub,
769                         service, NULL,
770                         _service_published_cb, static_cast<void *>(this),
771                         mEventFd);
772         if (ret != VINE_ERROR_NONE) {
773                 vine_disc_destroy(mSdPub);
774                 mSdPub = NULL;
775         } else {
776                 mSdPubSubState |= VINE_DP_PUBSUB_SD_STATE_PUBLISH;
777         }
778         vine_service_destroy(service);
779
780         VINE_LOGD("Publish %s:%d with rank %d", mTopic.c_str(), mListenPort, mRank);
781         return ret;
782 }
783
784 int DPPubSub::subscribe_service()
785 {
786         int ret;
787
788         if (mSdSub == NULL) {
789                 ret = vine_disc_create(VINE_DISCOVERY_METHOD_DNS_SD, &mSdSub);
790                 RET_VAL_IF(ret != VINE_ERROR_NONE, ret, "Fail to vine_disc_create");
791         }
792
793         ret = vine_disc_subscribe(mSdSub,
794                         mTopic.c_str(), NULL,
795                         _service_discovered_cb, static_cast<void *>(this),
796                         mEventFd);
797         if (ret != VINE_ERROR_NONE) {
798                 vine_disc_destroy(mSdSub);
799                 mSdSub = NULL;
800         } else {
801                 mSdPubSubState |= VINE_DP_PUBSUB_SD_STATE_SUBSCRIBE;
802         }
803
804         return ret;
805 }
806
807 int DPPubSub::create_rank()
808 {
809         srand(time(NULL));
810         return rand() % VINE_DP_PUBSUB_RANK_MAX;
811 }
812
813 int DPPubSub::open(vine_dp_opened_cb callback, void *user_data)
814 {
815         if (mOpenState == VINE_DP_PUBSUB_OPEN_STATE_WAIT) {
816                 VINE_LOGE("Ignore duplicate request.");
817                 return VINE_ERROR_NOW_IN_PROGRESS;
818         } else if (mOpenState == VINE_DP_PUBSUB_OPEN_STATE_DONE) {
819                 VINE_LOGE("Already opened.");
820                 return VINE_ERROR_INVALID_OPERATION;
821         }
822
823         mOpenState = VINE_DP_PUBSUB_OPEN_STATE_WAIT;
824         mOpenTimer.start(VINE_DP_PUBSUB_OPEN_TIMEOUT_MS, _open_timer, static_cast<void *>(this));
825
826         mOpenedCb = callback;
827         mOpenedCbData = user_data;
828
829         const char *iface_name = mIfaceName.size() > 0 ? mIfaceName.c_str() : NULL;
830         int ret = vine_data_path_open(mAddrFamily, mListenPort, iface_name, mMaxConnNum, mSecurity,
831                         _pubsub_opened_cb, static_cast<void *>(this),
832                         _pubsub_accepted_cb, static_cast<void *>(this),
833                         &mServerDataPath, mEventFd);
834         if (ret != VINE_ERROR_NONE) {
835                 mOpenedCb = NULL;
836                 mOpenedCbData = NULL;
837                 mOpenTimer.stop();
838                 return ret;
839         }
840
841         return VINE_ERROR_NONE;
842 }
843
844 void DPPubSub::close()
845 {
846         mOpenTimer.stop();
847         vine_disc_stop_publish(mSdPub);
848         vine_disc_stop_subscribe(mSdSub);
849
850         vine_disc_destroy(mSdSub);
851         mSdSub = NULL;
852         vine_disc_destroy(mSdPub);
853         mSdPub = NULL;
854
855         clear_joined_peer();
856         _vine_data_path_close(mServerDataPath);
857         _vine_data_path_destroy(mServerDataPath);
858         mServerDataPath = NULL;
859
860         mOpenState = VINE_DP_PUBSUB_OPEN_STATE_NONE;
861         mSdPubSubState = VINE_DP_PUBSUB_SD_STATE_NONE;
862 }
863
864 int DPPubSub::send(unsigned char *buf, size_t len)
865 {
866         int ret = VINE_ERROR_NONE;
867
868         for (auto &peer : mDataPathList) {
869                 if (!peer.second)
870                         continue;
871                 ret = _vine_data_path_write(peer.second, buf, len);
872                 if (ret != VINE_ERROR_NONE) {
873                         VINE_LOGE("fail to write to a peer[%p] ", peer.second);
874                         break;
875                 }
876         }
877
878         return ret;
879 }
880
881 int DPPubSub::recv(unsigned char *buf, size_t buf_len, size_t *read_len)
882 {
883         auto &dp_info = mRecvDataPathList.front();
884         size_t bytes = 0;
885         int ret = _vine_data_path_read(dp_info.first, buf, buf_len, &bytes);
886         if (ret != VINE_ERROR_NONE)
887                 return ret;
888
889         dp_info.second -= bytes;
890         VINE_LOGD("%zd bytes remained", dp_info.second);
891
892         *read_len = bytes;
893         if (dp_info.second == 0)
894                 mRecvDataPathList.erase();
895
896         return VINE_ERROR_NONE;
897 }
898
899 bool DPPubSub::is_joined_peer(const char *service_name, const char *ip, int port)
900 {
901         if (mServiceName.compare(service_name) == 0) {
902                 VINE_LOGD("It's me!");
903                 mIpList.insert(ip);
904                 return true;
905         }
906
907         // TODO: compare the service name.
908         // service name might be unique.
909
910         DPKey key = std::make_pair(ip, port);
911         return (mDataPathList.count(key) > 0);
912 }
913
914 int DPPubSub::get_joined_peer()
915 {
916         return mDataPathList.size();
917 }
918
919 void DPPubSub::add_joined_peer(const char *ip, int port, vine_data_path_h datapath)
920 {
921         if (!ip || !datapath)
922                 return;
923
924         VINE_LOGD("%s:%d, %p is added.", ip, port, datapath);
925         DPKey key = std::make_pair(ip, port);
926         mDataPathList.insert(std::make_pair(key, datapath));
927 }
928
929 void DPPubSub::del_joined_peer(vine_data_path_h datapath)
930 {
931         if (!datapath)
932                 return;
933
934         DPMap::iterator found = std::find_if(mDataPathList.begin(),
935                         mDataPathList.end(),
936                         [datapath](std::pair<DPKey, vine_data_path_h> const& item)
937                         {
938                                 VINE_LOGD("item.second[%p]", item.second);
939                                 return (item.second == datapath);
940                         });
941
942         if (found == mDataPathList.end()) {
943                 VINE_LOGE("Cannot find the datapath[%p].", datapath);
944                 return;
945         }
946
947         VINE_LOGD("datapath[%p] is deleted from list.", datapath);
948         mDataPathList.erase(found);
949 }
950
951 void DPPubSub::clear_joined_peer()
952 {
953         for (auto &peer : mDataPathList) {
954                 if (!peer.second)
955                         continue;
956                 _vine_data_path_close(peer.second);
957                 _vine_data_path_destroy(peer.second);
958         }
959
960         mDataPathList.clear();
961 }
962
963 void DPPubSub::noti_received_peer(vine_data_path_h datapath, size_t bytes)
964 {
965         mRecvDataPathList.push(std::make_pair(datapath, bytes));
966 }
967
968 #if 0
969 static int _get_last_two_octets(string ip)
970 {
971         size_t pos = ip.find_last_of(':');
972         string sub = ip.substr(pos + 1);
973
974         if (sub.size() == 0) {
975                 VINE_LOGE("invalid IP.");
976                 return -1;
977         }
978
979         return std::stoi(sub, NULL, 16);
980 }
981 #endif
982
983 // The smaller the last octet, the higher the priority.
984 //  1: peers priority is higher than me
985 //  0: same or failure
986 // -1: peers priority is lower than me
987 int DPPubSub::compare_ip_priority(const char *peer_ip)
988 {
989         std::string peer_ip_str(peer_ip);
990         size_t pos = peer_ip_str.find_last_of('.');
991         std::string subnet = peer_ip_str.substr(0, pos);
992         std::string found_ip;
993         int last = 0, peer_last = 0;
994
995         if (subnet.size() == 0)
996                 return 0;
997
998         for (auto &ip : mIpList) {
999                 if (ip.find(subnet) != 0)
1000                         continue;
1001
1002                 found_ip = ip;
1003                 break;
1004         }
1005
1006         VINE_LOGD("peer_ip[%s] found_ip[%s] subnet[%s]", peer_ip, found_ip.c_str(), subnet.c_str());
1007         try {
1008                 last = std::stoi(found_ip.substr(pos + 1));
1009                 peer_last = std::stoi(peer_ip_str.substr(pos + 1));
1010         } catch (...) {
1011                 VINE_LOGD("Failed to convert last octet. IP is invalid.");
1012         }
1013
1014         if (last == peer_last)
1015                 return 0;
1016         return (last < peer_last ? 1 : -1);
1017 }
1018
1019 // If true is return, it will connect to a peer.
1020 bool DPPubSub::check_if_connect(const char *peer_rank,
1021                 vine_address_family_e ip_type, const char *peer_ip, int peer_port)
1022 {
1023         // The smaller the rank value, the higher the priority.
1024         // Connect to a peer when both 1 and 2 are satisfied.
1025         // 1. peer has rank key and value
1026         // 2. the rank value is higher than peers value
1027         int prank = std::stoi(peer_rank);
1028
1029         VINE_LOGD("ip_type[%d] rank[%d], Peer rank[%d] peer_ip[%s] peer_port[%d]",
1030                         ip_type, mRank, prank, peer_ip, peer_port);
1031         if (mRank > prank)
1032                 return true;
1033         else if (mRank < prank)
1034                 return false;
1035
1036         // If rank is the same, Need to check the IP address
1037         int is_higher = 0;
1038         if (ip_type == VINE_ADDRESS_FAMILY_IPV4) {
1039                 is_higher = compare_ip_priority(peer_ip);
1040         } else {
1041                 // TODO : handle IPv6 address
1042         }
1043
1044         if (is_higher == 1)
1045                 return true;
1046
1047         // If last value of IP is the same, Need to check the port
1048         if (mListenPort > peer_port)
1049                 return true;
1050
1051         return false;
1052 }
1053
1054 void DPPubSub::_open_timer(void *user_data)
1055 {
1056         VINE_LOGD("open timeout reached.");
1057         if (!user_data)
1058                 return;
1059
1060         DPPubSub *dp =  static_cast<DPPubSub *>(user_data);
1061         int sd_state = dp->get_sd_pubsub_state();
1062         if ((sd_state & VINE_DP_PUBSUB_SD_STATE_PUBLISH)
1063                         && (sd_state & VINE_DP_PUBSUB_SD_STATE_SUBSCRIBE)) {
1064                 dp->set_open_state(VINE_DP_PUBSUB_OPEN_STATE_DONE);
1065                 static_cast<DataPath *>(user_data)->invoke_opened_cb(VINE_ERROR_NONE);
1066                 return;
1067         }
1068         VINE_LOGD("invoke opened_cb with an error. sd_state[%d]", sd_state);
1069         static_cast<DataPath *>(user_data)->invoke_opened_cb(VINE_ERROR_OPERATION_FAILED);
1070 }
1071
1072 int _vine_dp_create(vine_session_h session, vine_dp_type_e type, vine_dp_h *dp)
1073 {
1074         RET_VAL_IF(session == NULL, VINE_ERROR_INVALID_PARAMETER, "session is null.");
1075         RET_VAL_IF(type >= VINE_DP_TYPE_UNKNOWN, VINE_ERROR_INVALID_PARAMETER, "invalid type");
1076         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1077
1078         vine_event_queue_h eq = NULL;
1079         int ret = _vine_session_get_event_queue(session, &eq);
1080
1081         if (ret != VINE_ERROR_NONE || !eq)
1082                 return VINE_ERROR_INVALID_PARAMETER;
1083
1084         if (type == VINE_DP_TYPE_SERVER) {
1085                 *dp = new DPServer((void *)eq);
1086         } else if (type == VINE_DP_TYPE_CLIENT) {
1087                 *dp = new DPClient((void *)eq);
1088         } else if (type == VINE_DP_TYPE_PUBSUB) {
1089                 *dp = new DPPubSub((void *)eq);
1090         }
1091
1092         return VINE_ERROR_NONE;
1093 }
1094
1095 int _vine_dp_destroy(vine_dp_h dp)
1096 {
1097         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1098
1099         DataPath *_dp = static_cast<DataPath *>(dp);
1100         delete _dp;
1101
1102         return VINE_ERROR_NONE;
1103 }
1104
1105 int _vine_dp_set_iface_name(vine_dp_h dp, const char *iface_name)
1106 {
1107         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1108         RET_VAL_IF(iface_name == NULL, VINE_ERROR_INVALID_PARAMETER, "iface_name is null.");
1109
1110         DataPath *_dp = static_cast<DataPath *>(dp);
1111         return _dp->set_iface_name(iface_name);
1112 }
1113
1114 int _vine_dp_set_addr_family(vine_dp_h dp, vine_address_family_e addr_family)
1115 {
1116         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1117         RET_VAL_IF(addr_family < VINE_ADDRESS_FAMILY_DEFAULT
1118                         || addr_family > VINE_ADDRESS_FAMILY_IPV6,
1119                         VINE_ERROR_INVALID_PARAMETER, "addr_family is invalid.");
1120
1121         DataPath *_dp = static_cast<DataPath *>(dp);
1122         return _dp->set_addr_family(addr_family);
1123 }
1124
1125 int _vine_dp_set_remote_ip(vine_dp_h dp, vine_address_family_e addr_family, const char *ip)
1126 {
1127         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1128         RET_VAL_IF(ip == NULL, VINE_ERROR_INVALID_PARAMETER, "ip is null.");
1129
1130         DataPath *_dp = static_cast<DataPath *>(dp);
1131         return _dp->set_remote_ip(addr_family, ip);
1132 }
1133
1134 int _vine_dp_set_port(vine_dp_h dp, int port)
1135 {
1136         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1137
1138         DataPath *_dp = static_cast<DataPath *>(dp);
1139         return _dp->set_port(port);
1140 }
1141
1142 int _vine_dp_get_port(vine_dp_h dp, int *port)
1143 {
1144         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1145         RET_VAL_IF(port == NULL, VINE_ERROR_INVALID_PARAMETER, "port is null.");
1146
1147         DataPath *_dp = static_cast<DataPath *>(dp);
1148         *port = _dp->get_port();
1149         return VINE_ERROR_NONE;
1150 }
1151
1152 int _vine_dp_set_topic(vine_dp_h dp, const char *topic)
1153 {
1154         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1155         RET_VAL_IF(topic == NULL, VINE_ERROR_INVALID_PARAMETER, "topic is null.");
1156         RET_VAL_IF(_check_topic_len(topic) == false,
1157                 VINE_ERROR_INVALID_PARAMETER, "invalid length of topic.");
1158
1159         DataPath *_dp = static_cast<DataPath *>(dp);
1160         return _dp->set_topic(topic);
1161 }
1162
1163 int _vine_dp_set_max_connections(vine_dp_h dp, int max_conn)
1164 {
1165         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1166
1167         DataPath *_dp = static_cast<DataPath *>(dp);
1168         return _dp->set_max_connections(max_conn);
1169 }
1170
1171 int _vine_dp_set_security(vine_dp_h dp, vine_security_h security)
1172 {
1173         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1174         RET_VAL_IF(security == NULL, VINE_ERROR_INVALID_PARAMETER, "security is null.");
1175
1176         DataPath *_dp = static_cast<DataPath *>(dp);
1177         return _dp->set_security(security);
1178 }
1179
1180 int _vine_dp_set_accepted_cb(vine_dp_h dp, vine_dp_accepted_cb callback, void *user_data)
1181 {
1182         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1183         RET_VAL_IF(callback == NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1184
1185         DataPath *_dp = static_cast<DataPath *>(dp);
1186         return _dp->set_accepted_cb(callback, user_data);
1187 }
1188
1189 int _vine_dp_unset_accepted_cb(vine_dp_h dp)
1190 {
1191         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1192
1193         DataPath *_dp = static_cast<DataPath *>(dp);
1194         return _dp->unset_accepted_cb();
1195 }
1196
1197 int _vine_dp_set_terminated_cb(vine_dp_h dp, vine_dp_terminated_cb callback, void *user_data)
1198 {
1199         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1200         RET_VAL_IF(callback == NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1201
1202         DataPath *_dp = static_cast<DataPath *>(dp);
1203         return _dp->set_terminated_cb(callback, user_data);
1204 }
1205
1206 int _vine_dp_unset_terminated_cb(vine_dp_h dp)
1207 {
1208         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1209
1210         DataPath *_dp = static_cast<DataPath *>(dp);
1211         return _dp->unset_terminated_cb();
1212 }
1213
1214 int _vine_dp_open(vine_dp_h dp, vine_dp_opened_cb callback, void *user_data)
1215 {
1216         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1217         RET_VAL_IF(callback == NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1218
1219         DataPath *_dp = static_cast<DataPath *>(dp);
1220         return _dp->open(callback, user_data);
1221 }
1222
1223 int _vine_dp_close(vine_dp_h dp)
1224 {
1225         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1226
1227         DataPath *_dp = static_cast<DataPath *>(dp);
1228         _dp->close();
1229         return VINE_ERROR_NONE;
1230 }
1231
1232 int _vine_dp_send(vine_dp_h dp, unsigned char *buf, size_t len)
1233 {
1234         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1235         RET_VAL_IF(buf == NULL, VINE_ERROR_INVALID_PARAMETER, "buf is null.");
1236
1237         DataPath *_dp = static_cast<DataPath *>(dp);
1238         return _dp->send(buf, len);
1239 }
1240
1241 int _vine_dp_recv(vine_dp_h dp, unsigned char *buf, size_t buf_len, size_t *read_len)
1242 {
1243         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1244         RET_VAL_IF(buf == NULL, VINE_ERROR_INVALID_PARAMETER, "buf is null.");
1245         RET_VAL_IF(read_len == NULL, VINE_ERROR_INVALID_PARAMETER, "read_len is null.");
1246
1247         DataPath *_dp = static_cast<DataPath *>(dp);
1248         return _dp->recv(buf, buf_len, read_len);
1249 }
1250
1251 int _vine_dp_set_received_cb(vine_dp_h dp, vine_dp_received_cb callback, void *user_data)
1252 {
1253         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1254         RET_VAL_IF(callback== NULL, VINE_ERROR_INVALID_PARAMETER, "callback is null.");
1255
1256         DataPath *_dp = static_cast<DataPath *>(dp);
1257         return _dp->set_received_cb(callback, user_data);
1258 }
1259
1260 int _vine_dp_unset_received_cb(vine_dp_h dp)
1261 {
1262         RET_VAL_IF(dp == NULL, VINE_ERROR_INVALID_PARAMETER, "dp is null.");
1263
1264         DataPath *_dp = static_cast<DataPath *>(dp);
1265         return _dp->unset_received_cb();
1266 }