2 * Copyright (c) 2016 Samsung Electronics Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
29 #define JSON_KEY_IS_HOST 0
30 #define JSON_KEY_CONNECT_TIME 1
31 #define JSON_KEY_STATUS 2
32 #define JSON_KEY_CODE 3
33 #define JSON_KEY_ATTRIBUTES 4
34 #define JSON_KEY_RESULT 5
35 #define JSON_KEY_MESSAGE_ID 6
36 #define JSON_KEY_ERROR 7
37 #define JSON_KEY_EVENT 8
38 #define JSON_KEY_FROM 9
39 #define JSON_KEY_MESSAGE 10
40 #define JSON_KEY_METHOD 11
41 #define JSON_KEY_DATA 12
42 #define JSON_KEY_CLIENTS 13
46 int Channel::msgId = 0;
49 string ChannelConnectionHandler::PING = "channel.ping";
50 string ChannelConnectionHandler::PONG = "pong";
51 Channel *ChannelConnectionHandler::channel_ptr = NULL;
52 string Channel::ROUTE = "channels";
53 string Channel::ERROR_EVENT = "ms.error";
54 string Channel::CONNECT_EVENT = "ms.channel.connect";
55 string Channel::CLIENT_CONNECT_EVENT = "ms.channel.clientConnect";
56 string Channel::CLIENT_DISCONNECT_EVENT = "ms.channel.clientDisconnect";
57 string Channel::READY_EVENT = "ms.channel.read";
58 map<Channel *, int> Channel::channel_alive_map;
59 map<string, int> Channel::json_keys;
60 pthread_t Channel::connect_thread;
61 JsonObject *Channel::root_json_object = NULL;
63 ChannelConnectionHandler::ChannelConnectionHandler() {
64 pingTimeout = 5000000;
81 disconnecting = false;
83 clients = new Clients(this); // new to destroy in destructer
85 connectionHandler = new ChannelConnectionHandler();
87 channel_alive_map.insert({this, 1});
93 is_header_parsed = false;
96 pthread_mutex_init(&sendBufMutex, NULL);
101 Channel::Channel(Service *service1, string uri1) {
102 MSF_DBG("Channel()");
103 clientisHost = false;
107 disconnecting = false;
109 clients = new Clients(this); // new to destroy in destructer
111 connectionHandler = new ChannelConnectionHandler();
115 channel_alive_map.insert({this, 1});
121 is_header_parsed = false;
124 pthread_mutex_init(&sendBufMutex, NULL);
126 closeRequest = false;
129 Channel *Channel::create(Service *service, string uri) {
130 MSF_DBG("Channel::create()");
131 if ((service == NULL) || uri.length() <= 0) {
134 Channel *channel = new Channel(service, uri);
138 Channel::~Channel() {
139 MSF_DBG("~Channel()");
140 onConnectListener = NULL;
141 onDisconnectListener = NULL;
142 onClientConnectListener = NULL;
143 onClientDisconnectListener = NULL;
144 onReadyListener = NULL;
145 channel_alive_map[this] = 0;
147 if (clients != NULL) {
152 if (connectionHandler != NULL) {
153 delete connectionHandler;
154 connectionHandler = NULL;
157 if (cl_payload != NULL) {
162 if (cl_data != NULL) {
172 pthread_mutex_lock(&sendBufMutex);
173 while (sendBufQueue.begin() != sendBufQueue.end()) {
174 SendBufList::iterator itr = sendBufQueue.begin();
175 _SendBufInfo sendBufInfo = *itr;
176 sendBufQueue.erase(itr);
178 unsigned char* sendBuf = sendBufInfo.sendBuf;
180 // cleanup write buffer
181 if ((sendBuf != NULL)) {
186 pthread_mutex_unlock(&sendBufMutex);
189 void Channel::init_json_key_map() {
190 static bool done = false;
193 json_keys["isHost"] = JSON_KEY_IS_HOST;
194 json_keys["connectTime"] = JSON_KEY_CONNECT_TIME;
195 json_keys["status"] = JSON_KEY_STATUS;
196 json_keys["code"] = JSON_KEY_CODE;
197 json_keys["attributes"] = JSON_KEY_ATTRIBUTES;
198 json_keys[Message::PROPERTY_RESULT] = JSON_KEY_RESULT;
199 json_keys[Message::PROPERTY_MESSAGE_ID] = JSON_KEY_MESSAGE_ID;
200 json_keys[Message::PROPERTY_ERROR] = JSON_KEY_ERROR;
201 json_keys[Message::PROPERTY_EVENT] = JSON_KEY_EVENT;
202 json_keys[Message::PROPERTY_FROM] = JSON_KEY_FROM;
203 json_keys[Message::PROPERTY_MESSAGE] = JSON_KEY_MESSAGE;
204 json_keys[Message::PROPERTY_METHOD] = JSON_KEY_METHOD;
205 json_keys[Message::PROPERTY_DATA] = JSON_KEY_DATA;
206 json_keys[Message::PROPERTY_CLIENTS] = JSON_KEY_CLIENTS;
212 void Channel::foreach_json_object(JsonObject *object, const gchar *name,
213 JsonNode *node, gpointer user_data) {
214 if (json_keys.find(name) == json_keys.end()) {
218 int key = json_keys[name];
219 Channel *p = static_cast<Channel *>(user_data);
220 static int iferror = 0;
221 static bool arrayofclients = false;
225 case JSON_KEY_IS_HOST: {
226 p->clientisHost = json_node_get_boolean(node);
228 if (json_object_has_member(root_json_object, Message::PROPERTY_EVENT.c_str())) {
229 p->eventType = json_object_get_string_member(root_json_object,
230 Message::PROPERTY_EVENT.c_str());
233 if (!strncmp(p->eventType.c_str(), CLIENT_CONNECT_EVENT.c_str(), 25) ||
234 !strncmp(p->eventType.c_str(), CONNECT_EVENT.c_str(), 25)) {
235 p->client.create(p, p->clientid, p->clientconnectTime,
236 p->clientisHost, map<string, string>());
237 MSF_DBG("add clientList");
238 p->clientList.push_back(p->client);
243 case JSON_KEY_CONNECT_TIME: {
244 p->clientconnectTime = json_node_get_int(node);
245 MSF_DBG("clientconnectTime set as %lld",
246 p->clientconnectTime);
250 //{"id":5768,"error":{"status":404,"message":"Not Found","code":404}}
252 case JSON_KEY_STATUS: {
254 p->errstatus = json_node_get_int(node);
255 MSF_DBG("p->errstatus set");
260 case JSON_KEY_CODE: {
262 p->errcode = json_node_get_int(node);
263 string eid = std::to_string(p->msg_id);
264 MSF_DBG("p->errcode set");
266 if (p->callbacks.find(eid) != p->callbacks.end()) {
267 pair<void *, int> temp = p->getcallback(eid);
268 if (temp.first != NULL) {
269 Result_Base *temp1 = (Result_Base *)(temp.first);
271 err.append("status:");
272 err.append(to_string(p->errstatus));
273 err.append(" message:");
274 err.append(p->errMsg);
275 err.append(" code:");
276 err.append(to_string(p->errcode));
278 temp1->onError(Error::create(err));
280 if (p->onErrorListener) {
281 p->onErrorListener->onError();
289 case JSON_KEY_ATTRIBUTES: {
292 // key : MESSAGE::PROPERTY_RESULT
293 case JSON_KEY_RESULT: {
294 if (json_node_get_node_type(node) == JSON_NODE_VALUE) {
295 p->resultresp = json_node_get_boolean(node);
297 MSF_DBG("set resultresp as true");
299 MSF_DBG("set resultresp as false");
301 p->msg_subject = Message::PROPERTY_RESULT;
302 } else if (json_node_get_node_type(node) == JSON_NODE_OBJECT) {
303 MSF_DBG("set resultobj");
305 g_free(p->resultobj);
308 p->resultobj = json_node_dup_object(node);
312 // key : MESSAGE::PROPERTY_MESSAGE_ID
313 case JSON_KEY_MESSAGE_ID: {
314 if (json_node_get_node_type(node) == JSON_NODE_VALUE) {
315 if (json_node_get_value_type(node) == G_TYPE_INT ||
316 json_node_get_value_type(node) == G_TYPE_INT64) {
317 p->msg_id = json_node_get_int(node);
318 MSF_DBG("msg-id set as %d", p->msg_id);
319 } else if (json_node_get_value_type(node) == G_TYPE_STRING) {
320 if (json_object_has_member(root_json_object, Message::PROPERTY_EVENT.c_str())) {
321 p->eventType = json_object_get_string_member(root_json_object,
322 Message::PROPERTY_EVENT.c_str());
325 if ((p->eventType == CLIENT_CONNECT_EVENT) || arrayofclients) {
326 p->clientid = json_node_get_string(node);
327 MSF_DBG("clientid set as = %s",
328 p->clientid.c_str());
330 p->from = json_node_get_string(node);
331 MSF_DBG("from set as = %s",
338 // key : MESSAGE::PROPERTY_ERROR
339 case JSON_KEY_ERROR: {
340 if (json_node_get_node_type(node) == JSON_NODE_VALUE) {
342 MSF_DBG("iferror set as 1");
343 } else if (json_node_get_node_type(node) == JSON_NODE_OBJECT) {
345 MSF_DBG("iferror set as 1");
346 // g_free(p->errobj);
348 json_object_foreach_member(json_node_get_object(node),
349 foreach_json_object, user_data);
353 // key : MESSAGE::PROPERTY_EVENT
354 case JSON_KEY_EVENT: {
355 p->eventType = json_node_get_string(node);
356 MSF_DBG("eventType set as = %s",
357 p->eventType.c_str());
360 // key : MESSAGE::PROPERTY_FROM
361 case JSON_KEY_FROM: {
362 p->from = json_node_get_string(node);
363 MSF_DBG("from set as = %s", p->from.c_str());
366 // key : MESSAGE::PROPERTY_MESSAGE
367 case JSON_KEY_MESSAGE: {
368 p->errMsg = json_node_get_string(node);
369 MSF_DBG("errMsg set as = %s", p->errMsg.c_str());
372 // key : MESSAGE::PROPERTY_METHOD
373 case JSON_KEY_METHOD: {
374 p->method = json_node_get_string(node);
375 MSF_DBG("method set as = %s", p->method.c_str());
377 // key : MESSAGE:PROPERTY_DATA
378 case JSON_KEY_DATA: {
379 if (json_node_get_node_type(node) == JSON_NODE_VALUE) {
380 if (json_node_get_value_type(node) == G_TYPE_STRING) {
381 p->data = json_node_get_string(node);
382 MSF_DBG("data set as = %s",
385 } else if (json_node_get_node_type(node) == JSON_NODE_OBJECT) {
386 json_object_foreach_member(json_node_get_object(node),
387 foreach_json_object, user_data);
391 // key : MESSAGE:PROPERTY_CLIENTS
392 case JSON_KEY_CLIENTS: {
393 arrayofclients = true;
394 MSF_DBG("array start");
395 json_array_foreach_element(json_node_get_array(node),
396 foreach_json_array, user_data);
397 MSF_DBG("array end");
398 arrayofclients = false;
406 void Channel::foreach_json_array(JsonArray *array, guint index, JsonNode *node,
407 gpointer user_data) {
408 json_object_foreach_member(json_node_get_object(node), foreach_json_object,
412 void Channel::json_parse(const char *in) {
413 MSF_DBG("Channel::json_parse : %s", in);
415 JsonParser *parser = json_parser_new();
417 if (json_parser_load_from_data(parser, in, -1, NULL)) {
418 JsonNode *node = json_parser_get_root(parser);
420 root_json_object = json_node_get_object(node);
422 if (json_node_get_node_type(node) == JSON_NODE_OBJECT) {
423 json_object_foreach_member(root_json_object,
424 foreach_json_object, this);
427 MSF_DBG("json_parsing error");
431 Clients *Channel::getclients() {
432 if (clients->size()) {
439 void Channel::setConnectionTimeout(long timeout) {
442 } else if (timeout == 0) {
443 connectionHandler->stopPing();
445 connectionHandler->setPingTimeout(timeout);
446 if (isWebSocketOpen()) {
447 connectionHandler->startPing(this);
452 void Channel::handleConnectMessage(string UID) {
454 clients->add(clientList);
456 clients->setMyClientId(from);
461 void Channel::handleMessage(string UID) { handleMessage(UID, NULL); }
463 void Channel::handleMessage(string UID, unsigned char payload[]) {
464 if (eventType.length() == 0) {
465 // if (msg_subject == Message::PROPERTY_RESULT) {
466 // handleClientMessage(NULL,NULL);
470 handleApplicationMessage(UID);
471 } else if (eventType == CLIENT_CONNECT_EVENT) {
472 handleClientConnectMessage();
473 } else if (eventType == CLIENT_DISCONNECT_EVENT) {
474 handleClientDisconnectMessage();
475 } else if (eventType == ERROR_EVENT) {
476 handleErrorMessage(UID);
477 } else if (eventType == READY_EVENT) {
478 handleReadyMessage();
479 } else if (eventType == ChannelConnectionHandler::PING) {
482 handleClientMessage(data.c_str(), payload);
486 void Channel::handleApplicationMessage(string uid) {
487 string messageId = uid;
488 bool errorMap = errobj;
489 pair<void *, int> temp = getcallback(messageId);
492 if (temp.first != NULL) {
493 if ((waitForOnReady) && (errorMap == false)) {
494 onReadyCallbacks.insert(temp);
502 if (temp.second == Result_bool) {
503 ((Result_Base*)temp.first)->onSuccess(resultresp);
506 //doApplicationCallback(NULL);
510 void Channel::doApplicationCallback(Result_Base *result1) {
511 bool errorMap = errobj;
513 if (errorMap != false) {
517 string str = ss.str();
518 if (result1 != NULL) {
519 result1->onError(Error::create(str));
522 if (resultobj != NULL) {
523 // string json=json_object_to_json_string(resultobj);
524 // InfoObj->create(json);
525 if (result1 != NULL) {
526 result1->onSuccess(true);
530 if (msg_subject == Message::PROPERTY_RESULT) {
531 if (result1 != NULL) {
532 result1->onSuccess(true);
538 void Channel::handleErrorMessage(string UID) {
539 Error err = Error::create(errMsg);
540 handleError(UID, err);
543 void Channel::handleError(string UID, Error err) {
544 if (UID.length() != 0) {
545 pair<void *, int> temp = Channel::getcallback(UID);
546 if (temp.first != NULL) {
547 if (temp.second == Result_Client) {
548 Result_Base *temp1 = (Result_Base *)(temp.first);
552 } else if (temp.second == Result_bool) {
553 Result_Base *temp1 = (Result_Base *)(temp.first);
559 if (onErrorListener) {
560 onErrorListener->onError();
566 void Channel::handleClientMessage(const char *msg, unsigned char payload[]) {
567 emit(eventType, msg, from, cl_payload, cl_payload_size);
570 void Channel::emit(string event, const char *msg, string from,
571 unsigned char *payld, int payld_size) {
573 MSF_DBG("emit msg = NULL");
577 if (message_receive_cb) {
578 Message mesg(this, event, msg, from, payld, payld_size);
579 message_receive_cb(mesg);
582 if (everyMessageListener) {
583 Message mesg(this, event, msg, from, payld, payld_size);
584 everyMessageListener->onMessage(mesg);
587 if (messageListeners.size() != 0) {
588 if (messageListeners.find(event) == messageListeners.end()) {
592 list<OnMessageListener *> &onMessageListeners =
593 (messageListeners.find(event))->second;
595 if (onMessageListeners.size() != 0) {
596 std::list<OnMessageListener *>::const_iterator iterator;
597 for (iterator = onMessageListeners.begin();
598 iterator != onMessageListeners.end(); ++iterator) {
599 Message mesg(this, event, msg, from, payld, payld_size);
600 (*iterator)->onMessage(mesg);
607 void Channel::setSecureSupport(bool support)
609 IsSecureSupport = support;
612 bool Channel::setSecurityMode(bool mode)
614 if (!IsSecureSupport && mode) {
615 MSF_ERR("not support secure mode");
620 MSF_DBG("secure mode is setted to '%d'", mode);
624 bool Channel::connect() { return connect(NULL); }
626 bool Channel::connect(Result_Base *result1) {
627 bool ret = connect(map<string, string>(), result1);
631 bool Channel::connect(map<string, string> attributes, Result_Base *result1) {
632 string uid = getUID();
633 registerCallback(uid, (void *)result1, Result_Client);
635 if (isWebSocketOpen()) {
636 handleError(uid, Error::create("Already Connected"));
640 map<string, string> *at_data = new map<string, string>(attributes);
641 map<Channel *, map<string, string> *> *pt_user_data =
642 new map<Channel *, map<string, string> *>();
644 pt_user_data->insert({this, at_data});
647 int status = pthread_create(&connect_thread, NULL, pt_startConnect,
648 (void *)pt_user_data);
657 void Channel::disconnect() {
658 MSF_DBG("channel disconnect()");
662 void Channel::disconnect(Result_Base *result1) {
663 string randID = getUID();
665 registerCallback(randID, (void *)result1, Result_Client);
668 if (!isWebSocketOpen()) {
669 message = "Already Disconnected";
673 message = "Already Disconnecting";
677 handleError(UID, Error::create(message));
679 disconnecting = true;
680 // consume the callback
684 if (mirror_lifetime > 0) {
688 if (!mirror_lifetime) {
693 connectionHandler->stopPing();
694 disconnecting = false;
697 MSF_DBG("writeRequest called");
702 void Channel::handlePongMessage() {
703 // MSF_DBG("pong received");
706 void Channel::handleReadyMessage() {
707 waitForOnReady = false;
708 std::map<void *, int>::const_iterator iterator;
709 for (iterator = onReadyCallbacks.begin();
710 iterator != onReadyCallbacks.end(); ++iterator) {
711 Result_Base *res = (Result_Base *)iterator->first;
712 onReadyCallbacks.erase(res);
713 doApplicationCallback(res);
715 if (onConnectListener) {
716 onConnectListener->onConnect(clients->me());
719 if (onReadyListener != NULL) {
720 onReadyListener->onReady();
724 void Channel::handleClientConnectMessage() {
725 MSF_DBG("handleClientConnectMessage");
727 if (client.isHost()) {
731 clients->add(client);
733 if (onClientConnectListener != NULL) {
734 onClientConnectListener->onClientConnect(client);
738 void Channel::handleClientDisconnectMessage() {
739 MSF_DBG("handleClientDisConnectMessage");
740 if (resultresp && isLaunched) {
741 handleSocketClosed();
745 Client client = clients->get(clientid);
746 if (client.isHost()) {
749 clients->remove(client);
751 if (onClientDisconnectListener != NULL) {
752 onClientDisconnectListener->onClientDisconnect(client);
756 void Channel::handleConnect(string UID) {
757 MSF_DBG("handleConnect");
758 Client clienttemp = clients->me();
760 pair<void *, int> temp;
762 temp = Channel::getcallback(UID);
763 if (temp.second == Result_Client) {
764 Result_Base *temp1 = (Result_Base *)(temp.first);
766 temp1->onSuccess(clienttemp);
768 } else if (temp.second == Result_bool) {
769 Result_Base *temp1 = (Result_Base *)(temp.first);
771 temp1->onSuccess(true);
777 if (onConnectListener) {
778 onConnectListener->onConnect(clienttemp);
780 // To start channnel health check
781 if (isWebSocketOpen()) {
782 connectionHandler->startPing(this);
786 void Channel::handleSocketClosedAndNotify() {
787 MSF_DBG("handleSocketClosedAndNotify");
788 Client client = clients->me();
789 handleSocketClosed();
791 if (Channel::onDisconnectListener != NULL) {
792 Channel::onDisconnectListener->onDisconnect(client);
796 void Channel::handleSocketClosed() {
797 MSF_DBG("handleSocketClosed");
798 connectionHandler->stopPing();
802 lws_cancel_service(Context);
806 disconnecting = false;
812 int Channel::writeSocket(Channel* ch_p)
814 MSF_DBG("write_socket");
816 ch_p->setCommunicated(true);
818 if (ch_p->closeRequest) {
819 MSF_DBG("socket closing");
820 lws_close_reason(ch_p->wsi_mirror, LWS_CLOSE_STATUS_NO_STATUS, (unsigned char *)"notack", strlen("notack"));
821 // lws_callback_on_writable(ch_p->wsi_mirror);
822 return -1; // to close this wsi
825 pthread_mutex_lock(&ch_p->sendBufMutex);
826 bool ReqNextWritableCB = false;
828 if (ch_p->sendBufQueue.begin() != ch_p->sendBufQueue.end()) {
829 SendBufList::iterator itr = ch_p->sendBufQueue.begin();
830 _SendBufInfo sendBufInfo = *itr;
832 bool binaryFlag = sendBufInfo.binaryFlag;
833 bool pingByClient = sendBufInfo.pingByClient;
834 unsigned char* sendBuf = sendBufInfo.sendBuf;
835 int sendLength = sendBufInfo.sendLength;
836 int msgId = sendBufInfo.msgId;
837 void* userData = sendBufInfo.userData;
839 if (sendBuf == NULL) {
840 ch_p->sendBufQueue.erase(itr);
841 MSF_DBG("warn : Send Buf is NULL");
843 ch_p->sendBufQueue.erase(itr);
846 n = lws_write(ch_p->wsi_mirror, &(sendBuf[LWS_SEND_BUFFER_PRE_PADDING]), sendLength, LWS_WRITE_BINARY);
848 n = lws_write(ch_p->wsi_mirror, &(sendBuf[LWS_SEND_BUFFER_PRE_PADDING]), sendLength, LWS_WRITE_TEXT);
852 MSF_DBG("Writing failed\n");
853 if (ch_p->onPublishListener) {
854 ch_p->onPublishListener->onPublished(false, userData);
857 if (pingByClient == false) {
858 MSF_DBG("Writing succeed. id:%d, length:%d", msgId, n);
859 if (ch_p->onPublishListener) {
860 ch_p->onPublishListener->onPublished(true, userData);
864 // cleanup write buffer
865 if ((sendBuf != NULL)) {
871 if ((ch_p->sendBufQueue.begin() != ch_p->sendBufQueue.end()) || ReqNextWritableCB) {
873 lws_callback_on_writable(ch_p->wsi_mirror);
875 pthread_mutex_unlock(&ch_p->sendBufMutex);
880 int Channel::callback_lws_mirror(struct lws *wsi,
881 enum lws_callback_reasons reason, void *user,
882 void *in, size_t len) {
883 void *user_data = NULL;
884 Channel *this_ptr = NULL;
886 MSF_DBG("wsi is NULL");
889 struct lws_context *context = lws_get_context(wsi);
891 if (context != NULL) {
892 user_data = lws_context_user(context);
895 this_ptr = static_cast<Channel *>(user_data);
897 if (this_ptr == NULL) {
898 // it means Channel object was deleted
902 if (channel_alive_map.find(this_ptr) != channel_alive_map.end()) {
903 if (channel_alive_map[this_ptr] == 0) {
911 case LWS_CALLBACK_ESTABLISHED:
914 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
917 case LWS_CALLBACK_CLOSED:
918 this_ptr->handleSocketClosedAndNotify();
921 case LWS_CALLBACK_CLIENT_ESTABLISHED:
924 case LWS_CALLBACK_CLIENT_RECEIVE:
925 if (lws_frame_is_binary(wsi)) {
926 MSF_DBG("BINARY MESSAGE ARRIVED. len:%d", len);
927 // header needs to be parsed on first chunk
928 if (this_ptr->is_header_parsed == false) {
929 MSF_DBG("first chunk. parsing header");
932 header_size = (int)((unsigned char *)in)[0];
933 header_size = header_size << 8;
934 header_size = header_size & 0xff00;
935 header_size = header_size | (int)((unsigned char *)in)[1];
936 MSF_DBG("header_size = %d", header_size);
938 char header_json[header_size + 1] = {0};
940 memcpy(&header_json[0], &(((unsigned char *)in)[2]), header_size);
941 header_json[header_size + 1] = 0;
943 MSF_DBG("in = %s", &header_json[0]);
945 this_ptr->json_parse(header_json);
947 this_ptr->cl_payload = (unsigned char*)calloc(len - 2 - header_size, sizeof(unsigned char));
948 memcpy(this_ptr->cl_payload, &(((unsigned char*)in)[2 + header_size]), len - 2 - header_size);
950 this_ptr->cl_payload_size = len - 2 - header_size;
951 this_ptr->connectionHandler->resetLastPingReceived();
952 this_ptr->is_header_parsed = true;
954 MSF_DBG("type:%s", this_ptr->eventType.c_str());
956 if (lws_is_final_fragment(wsi)) {
957 MSF_DBG("there's no remaining packet");
958 if (this_ptr->eventType == CONNECT_EVENT) {
959 MSF_DBG("handle connect message");
960 this_ptr->handleConnectMessage(this_ptr->UID);
962 MSF_DBG("handle user message");
963 this_ptr->handleMessage(this_ptr->UID, this_ptr->cl_payload);
966 if (this_ptr->cl_payload) {
967 free(this_ptr->cl_payload);
968 this_ptr->cl_payload = NULL;
969 this_ptr->cl_payload_size = 0;
970 this_ptr->is_header_parsed = false;
971 this_ptr->eventType.clear();
972 this_ptr->data.clear();
976 MSF_DBG("reallocating to copy payload");
978 this_ptr->cl_payload = (unsigned char*)realloc(this_ptr->cl_payload, this_ptr->cl_payload_size + len);
979 memcpy(&(this_ptr->cl_payload[this_ptr->cl_payload_size]), (char*)in, len);
981 this_ptr->cl_payload_size += len;
983 this_ptr->connectionHandler->resetLastPingReceived();
985 if (lws_is_final_fragment(wsi)) {
986 MSF_DBG("there's no remaining packet");
987 if (this_ptr->eventType == CONNECT_EVENT) {
988 MSF_DBG("handle connect message");
989 this_ptr->handleConnectMessage(this_ptr->UID);
991 MSF_DBG("handle user message");
992 this_ptr->handleMessage(this_ptr->UID, this_ptr->cl_payload);
995 if (this_ptr->cl_payload) {
996 free(this_ptr->cl_payload);
997 this_ptr->cl_payload = NULL;
998 this_ptr->cl_payload_size = 0;
999 this_ptr->is_header_parsed = false;
1000 this_ptr->eventType.clear();
1001 this_ptr->data.clear();
1006 MSF_DBG("TEXT MESSAGE ARRIVED. len:%d", len);
1007 if (this_ptr->cl_data == NULL) {
1008 this_ptr->cl_data = (unsigned char*)calloc(1, sizeof(unsigned char));
1009 this_ptr->cl_data_size = 0;
1012 this_ptr->connectionHandler->resetLastPingReceived();
1013 this_ptr->cl_data = (unsigned char*)realloc(this_ptr->cl_data, this_ptr->cl_data_size + len + 1);
1015 if (this_ptr->cl_data) {
1016 memcpy(&(this_ptr->cl_data[this_ptr->cl_data_size]), (char*)in, len);
1017 this_ptr->cl_data_size += len;
1018 this_ptr->cl_data[this_ptr->cl_data_size] = '\0';
1021 if (lws_is_final_fragment(wsi)) {
1022 if (this_ptr->cl_data) {
1023 this_ptr->json_parse((char*)this_ptr->cl_data);
1026 if (this_ptr->eventType == CONNECT_EVENT) {
1027 this_ptr->handleConnectMessage(this_ptr->UID);
1029 // this_ptr->handleMessage(this_ptr->UID);
1030 this_ptr->handleMessage(this_ptr->UID, NULL);
1033 if (this_ptr->cl_data) {
1034 free(this_ptr->cl_data);
1035 this_ptr->cl_data = NULL;
1036 this_ptr->cl_data_size = 0;
1037 this_ptr->eventType.clear();
1038 this_ptr->data.clear();
1044 case LWS_CALLBACK_CLIENT_WRITEABLE:
1045 return writeSocket(this_ptr);
1048 case LWS_CALLBACK_RECEIVE:
1051 case LWS_CALLBACK_PROTOCOL_DESTROY:
1054 case LWS_CALLBACK_WSI_DESTROY:
1055 this_ptr->handleWsiDestroy();
1058 case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
1059 case LWS_CALLBACK_ADD_POLL_FD:
1060 case LWS_CALLBACK_DEL_POLL_FD:
1061 case LWS_CALLBACK_CONFIRM_EXTENSION_OKAY:
1062 case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
1063 case LWS_CALLBACK_HTTP:
1064 case LWS_CALLBACK_HTTP_WRITEABLE:
1065 case LWS_CALLBACK_HTTP_FILE_COMPLETION:
1066 case LWS_CALLBACK_SERVER_WRITEABLE:
1067 case LWS_CALLBACK_FILTER_NETWORK_CONNECTION:
1068 case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION:
1069 case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS:
1070 case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
1071 case LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION:
1072 case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH:
1073 case LWS_CALLBACK_GET_THREAD_ID:
1083 Client Channel::getclient(string id) { return clients->get(id); }
1085 bool Channel::isConnected() { return isWebSocketOpen(); }
1087 bool Channel::isWebSocketOpen() { return wsi_mirror == NULL ? false : true; }
1089 string Channel::getChannelUri(map<string, string> *attributes) {
1090 return service->getUri();
1093 void Channel::publish(string event, const char *data, void *user_data)
1095 publishMessage(event, data, Message::TARGET_ALL.c_str(), NULL, 0, user_data);
1098 void Channel::publish(string event, const char *data, const unsigned char payload[],
1099 int payload_size, void *user_data)
1101 publishMessage(event, data, Message::TARGET_ALL.c_str(), payload, payload_size, user_data);
1104 void Channel::publish(string event, const char *data, const char *target,
1107 publishMessage(event, data, target, NULL, 0, user_data);
1110 void Channel::publish(string event, const char *data, const char *target,
1111 const unsigned char payload[], int payload_size, void *user_data)
1113 publishMessage(event, data, target, payload, payload_size, user_data);
1116 void Channel::publish(string event, const char *data, Client client,
1119 publishMessage(event, data, client.getId(), NULL, 0, user_data);
1122 void Channel::publish(string event, const char *data, Client client,
1123 const unsigned char payload[], int payload_size, void *user_data)
1125 publishMessage(event, data, client.getId(), payload, payload_size, user_data);
1128 void Channel::publish(string event, const char *data, list<Client> clients,
1131 publish(event, data, clients, NULL, 0, user_data);
1134 void Channel::publish(string event, const char *data, list<Client> clients,
1135 const unsigned char payload[], int payload_size, void *user_data)
1139 std::list<Client>::iterator iterator;
1140 for (iterator = clients.begin(); iterator != clients.end(); ++iterator) {
1142 to.append(iterator->getId());
1145 if (std::next(iterator, 1) != clients.end()) {
1152 Json::Value to_list;
1153 std::list<Client>::iterator iterator;
1154 for (iterator = clients.begin(); iterator != clients.end(); ++iterator) {
1155 to_list.append(iterator->getId());
1158 publishMessage(event, data, to_list, payload, payload_size, user_data);
1161 void Channel::publishMessage(string event, const char *data, Json::Value to,
1162 const unsigned char payload[], int payload_size, void *user_data) {
1163 publishMessage(Message::METHOD_EMIT, event, data, to, payload,
1164 payload_size, user_data);
1167 void Channel::publishMessage(string method, string event, const char *data,
1168 Json::Value to, const unsigned char payload[],
1169 int payload_size, void *user_data) {
1171 MSF_DBG("data len %d, payload len %d", strlen(data), payload_size);
1174 if (!isWebSocketOpen()) {
1175 handleError(string(), Error::create("Not Connected"));
1178 long prepare_buf_len = 0;
1179 unsigned char *prepareBuf = prepareMessageMap(
1180 method, event, data, to, &prepare_buf_len, payload, payload_size);
1182 _SendBufInfo sendBufInfo;
1183 sendBufInfo.pingByClient = false;
1184 sendBufInfo.sendBuf = prepareBuf;
1185 sendBufInfo.sendBuf[LWS_SEND_BUFFER_PRE_PADDING + prepare_buf_len] = 0;
1186 sendBufInfo.sendLength = prepare_buf_len;
1187 sendBufInfo.msgId = ++msgId;
1188 sendBufInfo.userData = user_data;
1190 if (event == ChannelConnectionHandler::PING) {
1191 sendBufInfo.pingByClient = true;
1195 sendBufInfo.binaryFlag = true;
1197 sendBufInfo.binaryFlag = false;
1200 pthread_mutex_lock(&sendBufMutex);
1201 sendBufQueue.push_back(sendBufInfo);
1202 pthread_mutex_unlock(&sendBufMutex);
1204 MSF_DBG("writeRequest called");
1209 unsigned char *Channel::prepareMessageMap(string method, string event,
1210 const char *data, Json::Value to,
1211 long *prepare_buf_len,
1212 const unsigned char payload[],
1215 int header_size = 0;
1217 Json::Value message;
1220 params["event"] = event;
1222 params["data"] = data;
1225 message["method"] = method;
1226 message["params"] = params;
1228 Json::FastWriter writer;
1229 int data_len = (int)(writer.write(message).size());
1231 int prepare_buf_size = LWS_SEND_BUFFER_PRE_PADDING + data_len + payload_size + 8 + LWS_SEND_BUFFER_POST_PADDING;
1232 unsigned char *prepare_buf = new unsigned char[prepare_buf_size];
1235 l += snprintf((char *)&prepare_buf[LWS_SEND_BUFFER_PRE_PADDING + 2],
1236 prepare_buf_size - (LWS_SEND_BUFFER_PRE_PADDING + 2),
1237 "%s", writer.write(message).c_str());
1241 prepare_buf[LWS_SEND_BUFFER_PRE_PADDING + 1] =
1242 (unsigned char)header_size;
1243 prepare_buf[LWS_SEND_BUFFER_PRE_PADDING] =
1244 (unsigned char)((header_size) >> 8);
1248 memcpy(&prepare_buf[LWS_SEND_BUFFER_PRE_PADDING + l], payload,
1253 l += snprintf((char *)&prepare_buf[LWS_SEND_BUFFER_PRE_PADDING],
1254 prepare_buf_size - LWS_SEND_BUFFER_PRE_PADDING,
1255 "%s", writer.write(message).c_str());
1258 *prepare_buf_len = l;
1262 void Channel::handleBinaryMessage(unsigned char payload[]) {}
1264 void Channel::start_app(char *data, int buflength, string msgID) {
1268 unsigned char *prepare_buf = new unsigned char[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
1270 l += snprintf((char *)&prepare_buf[LWS_SEND_BUFFER_PRE_PADDING],
1271 4096 + LWS_SEND_BUFFER_POST_PADDING, "%s", data);
1273 _SendBufInfo sendBufInfo;
1274 sendBufInfo.pingByClient = false;
1275 sendBufInfo.sendBuf = prepare_buf;
1276 sendBufInfo.sendBuf[LWS_SEND_BUFFER_PRE_PADDING + l] = 0;
1277 sendBufInfo.sendLength = l;
1278 sendBufInfo.msgId = ++msgId;
1279 sendBufInfo.userData = NULL;
1280 sendBufInfo.binaryFlag = false;
1282 pthread_mutex_lock(&sendBufMutex);
1283 sendBufQueue.push_back(sendBufInfo);
1284 pthread_mutex_unlock(&sendBufMutex);
1286 MSF_DBG("writeRequest called");
1290 void Channel::registerCallback(string uid, void *callback, int value_type) {
1291 callbacks[uid] = make_pair(callback, value_type);
1294 void Channel::release_callback(string uid) { callbacks.erase(uid); }
1296 pair<void *, int> Channel::getcallback(string uid) {
1297 map<string, pair<void *, int>>::iterator check_it = callbacks.find(uid);
1299 if (check_it == callbacks.end()) {
1300 MSF_DBG("callbacks map not found. critical error");
1301 return pair<void *, int>();
1304 if (uid.length() != 0) {
1305 pair<void *, int> r1 = callbacks[uid];
1307 callbacks.erase(uid);
1309 MSF_DBG("getcallback success uid = %s",
1314 MSF_DBG("getcallback failed", uid.c_str());
1315 return pair<void *, int>();
1318 string Channel::getUID() {
1319 std::stringstream ss;
1320 unsigned int seed = time(NULL);
1321 ss << (rand_r(&seed) % 9000 + 1000);
1322 string randID = ss.str();
1326 void *Channel::pt_startConnect(void *att) {
1327 map<Channel *, map<string, string> *> *pt_user_data =
1328 static_cast<map<Channel *, map<string, string> *> *>(att);
1329 map<string, string> *attributes = pt_user_data->begin()->second;
1331 pt_user_data->begin()->first->createWebsocket(attributes);
1334 delete pt_user_data;
1339 void Channel::createWebsocket(void *att) {
1340 struct lws_protocols protocols[] = {
1341 {NULL, Channel::callback_lws_mirror, sizeof(int), 512000, 0,
1344 // libwebsockets 1.7 has a bug.
1345 {NULL, NULL, 0, 0, 0, NULL} // this is anti-bug code
1350 map<string, string> *attributes = (map<string, string> *)(att);
1351 string uri = getChannelUri(attributes);
1353 closeRequest = false;
1354 int ietf_version = -1; /* latest */
1356 struct lws_context_creation_info info;
1358 memset(&info, 0, sizeof info);
1360 info.port = CONTEXT_PORT_NO_LISTEN;
1361 info.protocols = protocols;
1362 info.extensions = NULL;
1363 info.ssl_cert_filepath = NULL;
1364 info.ssl_private_key_filepath = NULL;
1366 MSF_DBG("'SecureConnection' is true");
1367 info.ssl_ca_filepath = CA_PATH;
1371 info.options = (1 << 12);
1374 socketThreadId = pthread_self();
1376 #ifndef LWS_NO_EXTENSIONS
1377 // info.extensions = lws_get_internal_extensions();
1379 if (isWebSocketOpen()) {
1380 MSF_DBG("createWebsocket already Connected");
1383 Context = lws_create_context(&info);
1384 //Context = context;
1385 if (Context == NULL) {
1386 MSF_DBG("Creating libwebsocket context failed\n");
1391 MSF_DBG("uri is empty");
1395 get_ip_port_from_uri(uri, &server_ip_address, &server_port);
1396 string api = getapifromUri(uri);
1397 api.append("channels/").append(ChannelID);
1404 struct lws_client_connect_info connect_info;
1405 memset(&connect_info, 0, sizeof(lws_client_connect_info));
1407 connect_info.context = Context;
1408 connect_info.address = server_ip_address.c_str();
1409 connect_info.port = server_port;
1410 connect_info.ssl_connection = use_ssl;
1411 connect_info.path = api.c_str();
1412 connect_info.host = server_ip_address.c_str();
1413 connect_info.origin = server_ip_address.c_str();
1414 connect_info.protocol = protocols[0].name;
1415 connect_info.ietf_version_or_minus_one = ietf_version;
1416 connect_info.userdata = NULL;
1417 connect_info.client_exts = NULL;
1419 // loop until socket closed
1420 while (n >= 0 && !closeRequest) {
1421 if (wsi_mirror == NULL) {
1422 wsi_mirror = lws_client_connect_via_info(&connect_info);
1423 if (wsi_mirror == NULL) {
1424 MSF_DBG("Fail to create was_mirror");
1425 handleError(UID, Error::create("ConnectFailed"));
1429 n = lws_service(Context, 0x0FFFFFFF);
1432 MSF_DBG("createWebsocket destroy context");
1435 lws_context_destroy(Context);
1442 void Channel::writeRequest()
1444 MSF_DBG("writeRequest start");
1445 if (wsi_mirror != NULL) {
1446 lws_callback_on_writable(wsi_mirror);
1447 if (pthread_self() != socketThreadId) {
1448 MSF_DBG("current thread is different from websocket thread => lws_cancel_service()");
1449 lws_cancel_service(lws_get_context(wsi_mirror)); // to exit from poll() inside of lws_service()
1454 void Channel::get_ip_port_from_uri(string uri, string* dest_ip, int* dest_port) {
1455 unsigned int http_index = uri.find("http");
1456 unsigned int ip_index = 0;
1457 bool is_https = false;
1459 if (http_index == std::string::npos) {
1460 //there is no http string
1464 if (uri.at(http_index + 4) == 's') {
1466 //https://000.000.000.000
1467 //http://000.000.000.000
1469 ip_index = http_index + 8;
1472 ip_index = http_index + 7;
1482 char now_c = uri.at(itr);
1484 while ((now_c > 47 && now_c < 58) || (now_c == '.')) {
1485 if (dest_ip != NULL) {
1486 (*dest_ip).push_back(now_c);
1488 now_c = uri.at(++itr);
1492 now_c = uri.at(++itr);
1494 while (now_c > 47 && now_c < 58) {
1495 port.push_back(now_c);
1496 now_c = uri.at(++itr);
1499 if (dest_port != NULL) {
1500 *dest_port = atoi(port.c_str());
1504 // It means that server use default port.
1506 //default https server port is 443.
1509 //default http server port is 80"
1515 string Channel::getapifromUri(string uri) {
1516 string startpt = "/api";
1517 const char *api = strstr(uri.c_str(), startpt.c_str());
1520 void Channel::setonConnectListener(OnConnectListener *obj) {
1521 onConnectListener = obj;
1524 void Channel::unsetonConnectListener() { onConnectListener = NULL; }
1526 void Channel::setonDisconnectListener(OnDisconnectListener *obj) {
1527 onDisconnectListener = obj;
1530 void Channel::unsetonDisconnectListener() { onDisconnectListener = NULL; }
1531 void Channel::setonClientConnectListener(OnClientConnectListener *obj) {
1532 onClientConnectListener = obj;
1535 void Channel::unsetonClientConnectListener() { onClientConnectListener = NULL; }
1537 void Channel::addOnMessageListener(string event,
1538 OnMessageListener *onMessageListener) {
1539 if (event.length() == 0 || onMessageListener == NULL) {
1543 if (messageListeners.find(event) == messageListeners.end()) {
1544 messageListeners.insert({event, list<OnMessageListener *>()});
1547 list<OnMessageListener *> &onMessageListeners =
1548 (messageListeners.find(event))->second;
1550 onMessageListeners.push_back(onMessageListener);
1553 void Channel::addOnAllMessageListener(OnMessageListener *onMessageListener) {
1554 everyMessageListener = onMessageListener;
1557 void Channel::removeAllMessageListener() { everyMessageListener = NULL; }
1559 void Channel::removeOnMessageListeners(string event) {
1560 if (event.length() == 0) {
1564 if (messageListeners.find(event) == messageListeners.end()) {
1568 list<OnMessageListener *> &onMessageListeners =
1569 (messageListeners.find(event))->second;
1570 onMessageListeners.clear();
1573 void Channel::removeOnMessageListener(string event,
1574 OnMessageListener *onMessageListener) {
1575 if ((event.length() == 0) || (onMessageListener != NULL)) {
1579 if (messageListeners.find(event) == messageListeners.end()) {
1583 list<OnMessageListener *> &onMessageListeners =
1584 (messageListeners.find(event))->second;
1586 if (onMessageListeners.size() != 0) {
1587 onMessageListeners.remove(onMessageListener);
1591 void Channel::removeOnMessageListeners() { messageListeners.clear(); }
1593 void Channel::removeAllListeners() {
1594 setonConnectListener(NULL);
1595 setonDisconnectListener(NULL);
1596 setonClientConnectListener(NULL);
1597 setonClientDisconnectListener(NULL);
1598 setonReadyListener(NULL);
1599 setonErrorListener(NULL);
1600 removeOnMessageListeners();
1603 void Channel::register_message_receive_cb(_message_receive_cb cb) {
1604 message_receive_cb = cb;
1607 void Channel::setonClientDisconnectListener(OnClientDisconnectListener *obj) {
1608 onClientDisconnectListener = obj;
1611 void Channel::unsetonClientDisconnectListener() {
1612 onClientDisconnectListener = NULL;
1615 void Channel::setonErrorListener(OnErrorListener *obj) {
1616 onErrorListener = obj;
1619 void Channel::unsetonErrorListener()
1621 onErrorListener = NULL;
1624 void Channel::setonReadyListener(OnReadyListener *obj)
1626 onReadyListener = obj;
1629 void Channel::unsetonReadyListener()
1631 onReadyListener = NULL;
1634 void Channel::setonPublishListener(OnPublishListener *obj)
1636 onPublishListener = obj;
1639 void Channel::unsetonPublishListener()
1641 onPublishListener = NULL;
1643 void ChannelConnectionHandler::resetLastPingReceived() {
1644 this->lastPingReceived = time(0);
1647 void ChannelConnectionHandler::calculateAverageRT() {
1648 long lastRT = lastPingReceived - pingSent;
1649 if (lastRT > longestRT) {
1652 // average=((numPings++ * average)+lastRT)/numPings; //will check ankit
1655 void ChannelConnectionHandler::stopPing() {
1657 MSF_DBG("stopping ping");
1658 if (pingpongTimer > 0) {
1659 stopTimer(pingpongTimer);
1666 void ChannelConnectionHandler::startPing(Channel *ptr) {
1667 MSF_DBG("## startPing ###");
1669 MSF_DBG("startPing already running. return ###");
1673 if (pingTimeout <= 0) {
1674 MSF_DBG("## startPing ping timeout. return ###");
1681 startTime = time(0);
1682 pingSent = startTime;
1685 if (pingpongTimer <= 0) {
1686 pingpongTimer = startTimer(__send_ping_pong, __timeout_worker, pingTimeout/1000, ptr);
1690 void ChannelConnectionHandler::__send_ping_pong(void* data)
1692 Channel *ptr = static_cast<Channel *>(data);
1696 if (now > ptr->connectionHandler->lastPingReceived +
1697 ptr->connectionHandler->pingTimeout) {
1698 MSF_DBG("## Pinging timeout. disconnect ###");
1701 ptr->publish(PING, PONG.c_str(), ptr->clients->me(), NULL);
1702 ptr->connectionHandler->pingSent = time(0);
1705 if (ptr->connectionHandler->pingpongTimer > 0) {
1706 stopTimer(ptr->connectionHandler->pingpongTimer);
1707 ptr->connectionHandler->pingpongTimer = 0;
1709 ptr->connectionHandler->pingpongTimer = startTimer(__send_ping_pong, __timeout_worker, ptr->connectionHandler->pingTimeout/1000, ptr);
1712 int ChannelConnectionHandler::startTimer(timer_function function, TimerWorker woker, unsigned int interval, void *data)
1715 GSource *src = NULL;
1716 gpointer *tdata = NULL;
1718 src = g_timeout_source_new(interval);
1720 tdata = g_new0(gpointer, 2);
1722 tdata[0] = (void*)function;
1725 g_source_set_callback(src, woker, tdata, g_free);
1726 id = g_source_attach(src, NULL);
1727 g_source_unref(src);
1732 gboolean ChannelConnectionHandler::__timeout_worker(gpointer ud)
1734 gpointer *tdata = (gpointer*)ud;
1736 ((timer_function)tdata[0])(tdata[1]);
1741 void ChannelConnectionHandler::stopTimer(int timer)
1743 guint id = (guint) timer;
1745 if (!g_source_remove(id)) {
1746 MSF_DBG("g_source_remove is fail (interval_timer)");
1751 void ChannelConnectionHandler::setPingTimeout(long t) {
1755 void Channel::handleWsiDestroy()
1757 if (!isCommunicated) {
1758 closeRequest = true;
1760 MSF_ERR("wsi destroyed with no communication");
1761 if (onErrorListener) {
1762 onErrorListener->onError(Error::create("Closed"));
1767 void Channel::setCommunicated(bool value)
1769 isCommunicated = value;