Modified send buffer and applied ping pong in msf client 46/106246/3 submit/tizen/20161222.005908
authorkmook <kmook.choi@samsung.com>
Wed, 21 Dec 2016 06:12:32 +0000 (15:12 +0900)
committerkmook <kmook.choi@samsung.com>
Wed, 21 Dec 2016 09:20:36 +0000 (18:20 +0900)
Change-Id: I0790a02ad0437c62415f46b1ebddd15ccb32586c
Signed-off-by: kmook <kmook.choi@samsung.com>
msf_tizen_client/include/Channel.h
msf_tizen_client/src/Channel.cpp

index e01070a..b080f44 100755 (executable)
@@ -35,6 +35,20 @@ class Channel;
 
 #define CA_PATH                                        "/usr/share/d2d-conv-manager/ca_crt.pem"
 
+typedef struct SendBufInfo_s {
+       bool binaryFlag = false;
+       bool pingByClient = false;
+       unsigned char* sendBuf = NULL;
+       int sendLength = 0;
+       int msgId = 0;
+       void* userData = NULL;
+} _SendBufInfo;
+
+typedef std::vector<_SendBufInfo> SendBufList;
+
+typedef gboolean (*TimerWorker)(gpointer ud);
+typedef void (*timer_function)(void *data);
+
 class ChannelConnectionHandler {
 public:
        ChannelConnectionHandler();
@@ -47,11 +61,14 @@ public:
        void calculateAverageRT();
        void stopPing();
        void startPing(Channel *);
-       static void *Pinging(void *);
-       static void ping_again(void *);
-       static pthread_t ping_thread;
 
 private:
+       static void __send_ping_pong(void* data);
+
+       static void* startTimer(timer_function function, TimerWorker woker, unsigned int interval, void *data);
+       static void stopTimer(void *timer);
+       static gboolean __timeout_worker(gpointer ud);
+
        long pingTimeout;
        long lastPingReceived;
        int numPings;
@@ -60,6 +77,7 @@ private:
        double average;
        long longestRT;
        bool running;
+       void* pingpongTimer;
 };
 
 enum Result_base_Type {
@@ -181,7 +199,7 @@ public:
        static void foreach_json_array(JsonArray *object, guint index,
                                JsonNode *node, gpointer user_data);
 
-       static void write_socket(Channel*);
+       static int writeSocket(Channel*);
        static int callback_lws_mirror(struct lws *wsi,
                                enum lws_callback_reasons reason, void *user,
                                void *in, size_t len);
@@ -239,12 +257,14 @@ private:
        void handleClientDisconnectMessage();
        void handleErrorMessage(string);
        void handleReadyMessage();
+       void handlePongMessage();
        void handleDisconnectMessage();
        void handleClientMessage(const char *msg, unsigned char payload[]);
-       void create_websocket(void *att);
+       void createWebsocket(void *att);
        unsigned char *prepareMessageMap(string, string, const char *data,
                                Json::Value to, long *,
                                const unsigned char payload[], int payload_size);
+       void writeRequest();
 
        static string ROUTE;
        static string ERROR_EVENT;
@@ -252,27 +272,23 @@ private:
        static string CLIENT_CONNECT_EVENT;
        static string CLIENT_DISCONNECT_EVENT;
        static string READY_EVENT;
-       unsigned char *write_buf[1000];
-       int write_buf_count;
-       int write_buf_index;
-       int write_buf_last_sent_index;
-       void* publish_user_data[1000];
-       int write_buf_len[1000];
-       bool write_buf_binary_flag[1000];
-       //LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
+
+       pthread_mutex_t sendBufMutex;
+       SendBufList sendBufQueue;
+
+       bool closeRequest;
+       pthread_t serverThreadId;
+       static int msgId;
+
        static pthread_t connect_thread;
        int mirror_lifetime;
        int force_exit;
        unsigned char *cl_data;
        int cl_data_size;
-       //bool isWrite;
-       //bool binary_message;
        bool disconnecting;
-       //long buflen;
        unsigned char *cl_payload;
        int cl_payload_size;
        bool is_header_parsed;
-       int was_closed;
        Client client;
        long long clientconnectTime;
        bool clientisHost;
index 644568b..f4aa9e7 100755 (executable)
@@ -43,6 +43,7 @@
 
 class Application;
 
+int Channel::msgId = 0;
 using namespace std;
 
 string ChannelConnectionHandler::PING = "channel.ping";
@@ -57,7 +58,6 @@ string Channel::READY_EVENT = "ms.channel.read";
 map<Channel *, int> Channel::channel_alive_map;
 map<string, int> Channel::json_keys;
 pthread_t Channel::connect_thread;
-pthread_t ChannelConnectionHandler::ping_thread = 0;
 JsonObject *Channel::root_json_object = NULL;
 
 ChannelConnectionHandler::ChannelConnectionHandler() {
@@ -69,14 +69,12 @@ ChannelConnectionHandler::ChannelConnectionHandler() {
        startTime = 0;
        pingSent = 0;
        average = 0;
+       pingpongTimer = NULL;
 }
 
 Channel::Channel() {
        MSF_DBG("Channel()");
        clientisHost = false;
-       //isWrite = false;
-       //buflen = 0;
-       was_closed = true;
        wsi_mirror = NULL;
        Context = NULL;
        mirror_lifetime = 0;
@@ -89,23 +87,20 @@ Channel::Channel() {
        channel_alive_map.insert({this, 1});
        init_json_key_map();
        cl_payload_size = 0;
-       write_buf_count = 0;
-       write_buf_index = 0;
-       write_buf_last_sent_index = 0;
        cl_payload = NULL;
        cl_data = NULL;
        cl_data_size = 0;
        is_header_parsed = false;
        eventType = "";
        resultobj = NULL;
+       pthread_mutex_init(&sendBufMutex, NULL);
+       serverThreadId = pthread_self();
+       closeRequest = false;
 }
 
 Channel::Channel(Service *service1, string uri1) {
        MSF_DBG("Channel()");
        clientisHost = false;
-       //isWrite = false;
-       //buflen = 0;
-       was_closed = true;
        wsi_mirror = NULL;
        Context = NULL;
        mirror_lifetime = 0;
@@ -120,15 +115,15 @@ Channel::Channel(Service *service1, string uri1) {
        channel_alive_map.insert({this, 1});
        init_json_key_map();
        cl_payload_size = 0;
-       write_buf_count = 0;
-       write_buf_index = 0;
-       write_buf_last_sent_index = 0;
        cl_payload = NULL;
        cl_data = NULL;
        cl_data_size = 0;
        is_header_parsed = false;
        eventType = "";
        resultobj = NULL;
+       pthread_mutex_init(&sendBufMutex, NULL);
+       serverThreadId = pthread_self();
+       closeRequest = false;
 }
 
 Channel *Channel::create(Service *service, string uri) {
@@ -173,6 +168,22 @@ Channel::~Channel() {
                g_free(resultobj);
                resultobj = NULL;
        }
+
+       pthread_mutex_lock(&sendBufMutex);
+       while (sendBufQueue.begin() != sendBufQueue.end()) {
+               SendBufList::iterator itr = sendBufQueue.begin();
+               _SendBufInfo sendBufInfo = *itr;
+               sendBufQueue.erase(itr);
+
+               unsigned char* sendBuf = sendBufInfo.sendBuf;
+
+               // cleanup write buffer
+               if ((sendBuf != NULL)) {
+                       free(sendBuf);
+                       sendBuf = NULL;
+               }
+       }
+       pthread_mutex_unlock(&sendBufMutex);
 }
 
 void Channel::init_json_key_map() {
@@ -465,6 +476,8 @@ void Channel::handleMessage(string UID, unsigned char payload[]) {
                handleErrorMessage(UID);
        } else if (eventType == READY_EVENT) {
                handleReadyMessage();
+       } else if (eventType == ChannelConnectionHandler::PING) {
+               handlePongMessage();
        } else {
                handleClientMessage(data.c_str(), payload);
        }
@@ -678,13 +691,18 @@ void Channel::disconnect(Result_Base *result1) {
                }
 
                connectionHandler->stopPing();
-               was_closed = true;
-               lws_callback_on_writable(wsi_mirror);
-
                disconnecting = false;
+
+               closeRequest = true;
+               MSF_DBG("writeRequest called");
+               writeRequest();
        }
 }
 
+void Channel::handlePongMessage() {
+//     MSF_DBG("pong received");
+}
+
 void Channel::handleReadyMessage() {
        waitForOnReady = false;
        std::map<void *, int>::const_iterator iterator;
@@ -759,9 +777,9 @@ void Channel::handleConnect(string UID) {
        if (onConnectListener) {
                onConnectListener->onConnect(clienttemp);
        }
-       // To start channnel heath check
+       // To start channnel health check
        if (isWebSocketOpen()) {
-               // connectionHandler->startPing(this);
+               connectionHandler->startPing(this);
        }
 }
 
@@ -780,7 +798,6 @@ void Channel::handleSocketClosed() {
        connectionHandler->stopPing();
        wsi_mirror = NULL;
        connected = false;
-       was_closed = true;
        if (Context) {
                lws_cancel_service(Context);
        }
@@ -792,64 +809,72 @@ void Channel::handleSocketClosed() {
        clients->reset();
 }
 
-void Channel::write_socket(Channel* ch_p)
+int Channel::writeSocket(Channel* ch_p)
 {
-       int n;
        MSF_DBG("write_socket");
 
        ch_p->setCommunicated(true);
 
-       if (lws_partial_buffered(ch_p->wsi_mirror) == 1) {
-               MSF_DBG("lws is wrting now. it will retry write.");
-               lws_callback_on_writable(ch_p->wsi_mirror);
+       if (ch_p->closeRequest) {
+               MSF_DBG("socket closing");
+               lws_close_reason(ch_p->wsi_mirror, LWS_CLOSE_STATUS_NO_STATUS, (unsigned char *)"notack", strlen("notack"));
+//             lws_callback_on_writable(ch_p->wsi_mirror);
+               return -1; // to close this wsi
        }
 
-       MSF_DBG("write_buf_count:%d", ch_p->write_buf_count);
-       if (ch_p->write_buf_count) {
-               if (ch_p->write_buf_last_sent_index == 999) {
-                       ch_p->write_buf_last_sent_index = 1;
-               } else {
-                       ch_p->write_buf_last_sent_index++;
-               }
+       pthread_mutex_lock(&ch_p->sendBufMutex);
+       bool ReqNextWritableCB = false;
 
-               if (&(ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING]) == NULL ||
-                               ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING] == 0 ||
-                               ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING] == '\0') {
-               } else {
-               }
+       if (ch_p->sendBufQueue.begin() != ch_p->sendBufQueue.end()) {
+               SendBufList::iterator itr = ch_p->sendBufQueue.begin();
+               _SendBufInfo sendBufInfo = *itr;
 
-               if (ch_p->write_buf_len[ch_p->write_buf_last_sent_index] <= 0) {
-               }
+               bool binaryFlag = sendBufInfo.binaryFlag;
+               bool pingByClient = sendBufInfo.pingByClient;
+               unsigned char* sendBuf = sendBufInfo.sendBuf;
+               int sendLength = sendBufInfo.sendLength;
+               int msgId = sendBufInfo.msgId;
+               void* userData = sendBufInfo.userData;
 
-               if (ch_p->write_buf_binary_flag[ch_p->write_buf_last_sent_index]) {
-                       n = lws_write(ch_p->wsi_mirror,
-                                       &(ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING]),
-                                       ch_p->write_buf_len[ch_p->write_buf_last_sent_index], LWS_WRITE_BINARY);
+               if (sendBuf == NULL) {
+                       ch_p->sendBufQueue.erase(itr);
+                       MSF_DBG("warn : Send Buf is NULL");
                } else {
-                       n = lws_write(ch_p->wsi_mirror,
-                                       &(ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING]),
-                                       ch_p->write_buf_len[ch_p->write_buf_last_sent_index], LWS_WRITE_TEXT);
-               }
+                       ch_p->sendBufQueue.erase(itr);
+                       int n = 0;
+                       if (binaryFlag) {
+                               n = lws_write(ch_p->wsi_mirror, &(sendBuf[LWS_SEND_BUFFER_PRE_PADDING]), sendLength, LWS_WRITE_BINARY);
+                       } else {
+                               n = lws_write(ch_p->wsi_mirror, &(sendBuf[LWS_SEND_BUFFER_PRE_PADDING]), sendLength, LWS_WRITE_TEXT);
+                       }
 
-               if (n < 0) {
-                       MSF_DBG("Writing failed\n");
-                       if (ch_p->onPublishListener) {
-                               ch_p->onPublishListener->onPublished(false, ch_p->publish_user_data[ch_p->write_buf_last_sent_index]);
+                       if (n < 0) {
+                               MSF_DBG("Writing failed\n");
+                               if (ch_p->onPublishListener) {
+                                       ch_p->onPublishListener->onPublished(false, userData);
+                               }
+                       } else {
+                               if (pingByClient == false) {
+                                       MSF_DBG("Writing succeed. id:%d, length:%d", msgId, n);
+                                       if (ch_p->onPublishListener) {
+                                               ch_p->onPublishListener->onPublished(true, userData);
+                                       }
+                               }
                        }
-               } else {
-                       MSF_DBG("Writing succeed : %d", n);
-                       if (ch_p->onPublishListener) {
-                               ch_p->onPublishListener->onPublished(true, ch_p->publish_user_data[ch_p->write_buf_last_sent_index]);
+                       // cleanup write buffer
+                       if ((sendBuf != NULL)) {
+                               free(sendBuf);
+                               sendBuf = NULL;
                        }
                }
-
-               delete[] ch_p->write_buf[ch_p->write_buf_last_sent_index];
-               ch_p->write_buf_count--;
-
-               if (ch_p->write_buf_count) {
-                       lws_callback_on_writable(ch_p->wsi_mirror);
-               }
        }
+       if ((ch_p->sendBufQueue.begin() != ch_p->sendBufQueue.end()) || ReqNextWritableCB) {
+               MSF_DBG("call");
+               lws_callback_on_writable(ch_p->wsi_mirror);
+       }
+       pthread_mutex_unlock(&ch_p->sendBufMutex);
+
+       return 0;
 }
 
 int Channel::callback_lws_mirror(struct lws *wsi,
@@ -887,8 +912,8 @@ int Channel::callback_lws_mirror(struct lws *wsi,
                if (this_ptr == NULL) {
                        // it means Channel object was deleted
                        return -1;
-               } else {
                }
+
                if (lws_frame_is_binary(wsi)) {
                        MSF_DBG("BINARY MESSAGE ARRIVED. len:%d", len);
                        // header needs to be parsed on first chunk
@@ -974,13 +999,13 @@ int Channel::callback_lws_mirror(struct lws *wsi,
                                this_ptr->cl_data_size = 0;
                        }
 
+                       this_ptr->connectionHandler->resetLastPingReceived();
                        this_ptr->cl_data = (unsigned char*)realloc(this_ptr->cl_data, this_ptr->cl_data_size + len + 1);
 
                        if (this_ptr->cl_data) {
                                memcpy(&(this_ptr->cl_data[this_ptr->cl_data_size]), (char*)in, len);
                                this_ptr->cl_data_size += len;
                                this_ptr->cl_data[this_ptr->cl_data_size] = '\0';
-                               this_ptr->connectionHandler->resetLastPingReceived();
                        }
 
                        if (lws_is_final_fragment(wsi)) {
@@ -1006,7 +1031,7 @@ int Channel::callback_lws_mirror(struct lws *wsi,
                break;
 
        case LWS_CALLBACK_CLIENT_WRITEABLE:
-               write_socket(this_ptr);
+               return writeSocket(this_ptr);
                break;
 
        case LWS_CALLBACK_RECEIVE:
@@ -1140,44 +1165,34 @@ void Channel::publishMessage(string method, string event, const char *data,
                handleError(string(), Error::create("Not Connected"));
                return;
        } else {
-               if (write_buf_index == 999) {
-                       write_buf_index = 0;
-               }
-
-               write_buf_index++;
-               write_buf_count++;
-
                long prepare_buf_len = 0;
-               unsigned char *prepare_buf = prepareMessageMap(
+               unsigned char *prepareBuf = prepareMessageMap(
                        method, event, data, to, &prepare_buf_len, payload, payload_size);
 
-               write_buf[write_buf_index] = prepare_buf;
-               write_buf_len[write_buf_index] = prepare_buf_len;
-               publish_user_data[write_buf_index] = user_data;
+               _SendBufInfo sendBufInfo;
+               sendBufInfo.pingByClient = false;
+               sendBufInfo.sendBuf = prepareBuf;
+               sendBufInfo.sendBuf[LWS_SEND_BUFFER_PRE_PADDING + prepare_buf_len] = 0;
+               sendBufInfo.sendLength = prepare_buf_len;
+               sendBufInfo.msgId = ++msgId;
+               sendBufInfo.userData = user_data;
+
+               if (event == ChannelConnectionHandler::PING) {
+                       sendBufInfo.pingByClient = true;
+               }
+
                if (payload) {
-                       write_buf_binary_flag[write_buf_index] = true;
+                       sendBufInfo.binaryFlag = true;
                } else {
-                       write_buf_binary_flag[write_buf_index] = false;
+                       sendBufInfo.binaryFlag = false;
                }
 
-               write_buf[write_buf_index][LWS_SEND_BUFFER_PRE_PADDING + prepare_buf_len] = 0;
-               //memcpy(&buf[LWS_SEND_BUFFER_PRE_PADDING], &prepare_buf[LWS_SEND_BUFFER_PRE_PADDING], prepare_buf_len);
-
-               //buf[LWS_SEND_BUFFER_PRE_PADDING + prepare_buf_len] = 0;
-
-               //buflen = prepare_buf_len;
-
-               //delete[](prepare_buf);
+               pthread_mutex_lock(&sendBufMutex);
+               sendBufQueue.push_back(sendBufInfo);
+               pthread_mutex_unlock(&sendBufMutex);
 
-               //if (binary_message) {
-                       //MSF_DBG("publish buffer = %s",&buf[LWS_SEND_BUFFER_PRE_PADDING + 2]);
-               //} else {
-                       //MSF_DBG("publish buffer = %s",&buf[LWS_SEND_BUFFER_PRE_PADDING]);
-               //}
-
-               //isWrite = true;
-               //lws_callback_on_writable(wsi_mirror);
-               write_socket(this);
+               MSF_DBG("writeRequest called");
+               writeRequest();
        }
 }
 
@@ -1188,31 +1203,25 @@ unsigned char *Channel::prepareMessageMap(string method, string event,
                                                                                int payload_size) {
        int l = 0;
        int header_size = 0;
-       int data_len = 0;
 
+       Json::Value message;
+       Json::Value params;
+
+       params["event"] = event;
        if (data) {
-               data_len = strlen(data);
-       } else {
-               data_len = 0;
+               params["data"] = data;
        }
+       params["to"] = to;
+       message["method"] = method;
+       message["params"] = params;
+
+       Json::FastWriter writer;
+       int data_len = (int)(writer.write(message).size());
 
-       int prepare_buf_size = LWS_SEND_BUFFER_PRE_PADDING + data_len + payload_size + 512 + LWS_SEND_BUFFER_POST_PADDING;
+       int prepare_buf_size = LWS_SEND_BUFFER_PRE_PADDING + data_len + payload_size + 8 + LWS_SEND_BUFFER_POST_PADDING;
        unsigned char *prepare_buf = new unsigned char[prepare_buf_size];
 
        if (payload) {
-               Json::Value message;
-               Json::Value params;
-
-               params["event"] = event;
-               if (data) {
-                       params["data"] = data;
-               }
-               params["to"] = to;
-               message["method"] = method;
-               message["params"] = params;
-
-               Json::FastWriter writer;
-
                l += snprintf((char *)&prepare_buf[LWS_SEND_BUFFER_PRE_PADDING + 2],
                                prepare_buf_size - (LWS_SEND_BUFFER_PRE_PADDING + 2),
                                        "%s", writer.write(message).c_str());
@@ -1230,25 +1239,10 @@ unsigned char *Channel::prepareMessageMap(string method, string event,
                                payload_size);
 
                l += payload_size;
-               //binary_message = true;
        } else {
-               Json::Value message;
-               Json::Value params;
-
-               params["event"] = event;
-               if (data) {
-                       params["data"] = data;
-               }
-               params["to"] = to;
-               message["method"] = method;
-               message["params"] = params;
-
-               Json::FastWriter writer;
-
                l += snprintf((char *)&prepare_buf[LWS_SEND_BUFFER_PRE_PADDING],
                                        prepare_buf_size - LWS_SEND_BUFFER_PRE_PADDING,
                                        "%s", writer.write(message).c_str());
-               //binary_message = false;
        }
 
        *prepare_buf_len = l;
@@ -1267,28 +1261,21 @@ void Channel::start_app(char *data, int buflength, string msgID) {
        l += snprintf((char *)&prepare_buf[LWS_SEND_BUFFER_PRE_PADDING],
                        prepare_buf_size - LWS_SEND_BUFFER_PRE_PADDING, "%s", data);
 
-       if (write_buf_index == 999) {
-               write_buf_index = 0;
-       }
-
-       write_buf_index++;
-       write_buf_count++;
-
-       write_buf[write_buf_index] = prepare_buf;
-       write_buf_len[write_buf_index] = l;
-       publish_user_data[write_buf_index] = NULL;
+       _SendBufInfo sendBufInfo;
+       sendBufInfo.pingByClient = false;
+       sendBufInfo.sendBuf = prepare_buf;
+       sendBufInfo.sendBuf[LWS_SEND_BUFFER_PRE_PADDING + l] = 0;
+       sendBufInfo.sendLength = l;
+       sendBufInfo.msgId = ++msgId;
+       sendBufInfo.userData = NULL;
+       sendBufInfo.binaryFlag = false;
 
-       write_buf[write_buf_index][LWS_SEND_BUFFER_PRE_PADDING + l] = 0;
-       write_buf_binary_flag[write_buf_index] = false;
+       pthread_mutex_lock(&sendBufMutex);
+       sendBufQueue.push_back(sendBufInfo);
+       pthread_mutex_unlock(&sendBufMutex);
 
-       //buflen = l;
-       //buf[LWS_SEND_BUFFER_PRE_PADDING + l] = 0;
-
-       //binary_message = false;
-
-       //isWrite = true;
-       //lws_callback_on_writable(wsi_mirror);
-       write_socket(this);
+       MSF_DBG("writeRequest called");
+       writeRequest();
 }
 
 void Channel::registerCallback(string uid, void *callback, int value_type) {
@@ -1332,7 +1319,7 @@ void *Channel::pt_startConnect(void *att) {
                static_cast<map<Channel *, map<string, string> *> *>(att);
        map<string, string> *attributes = pt_user_data->begin()->second;
 
-       pt_user_data->begin()->first->create_websocket(attributes);
+       pt_user_data->begin()->first->createWebsocket(attributes);
 
        delete attributes;
        delete pt_user_data;
@@ -1340,7 +1327,7 @@ void *Channel::pt_startConnect(void *att) {
        return NULL;
 }
 
-void Channel::create_websocket(void *att) {
+void Channel::createWebsocket(void *att) {
        struct lws_protocols protocols[] = {
                {NULL, Channel::callback_lws_mirror, sizeof(int), 512000, 0,
                 NULL},
@@ -1354,9 +1341,7 @@ void Channel::create_websocket(void *att) {
        map<string, string> *attributes = (map<string, string> *)(att);
        string uri = getChannelUri(attributes);
 
-       was_closed = false;
-       //struct lws_context *context;
-
+       closeRequest = false;
        int ietf_version = -1; /* latest */
 
        struct lws_context_creation_info info;
@@ -1381,7 +1366,7 @@ void Channel::create_websocket(void *att) {
 // info.extensions = lws_get_internal_extensions();
 #endif
        if (isWebSocketOpen()) {
-               MSF_DBG("create_websocket already Connected");
+               MSF_DBG("createWebsocket already Connected");
                return;
        }
        Context = lws_create_context(&info);
@@ -1421,7 +1406,7 @@ void Channel::create_websocket(void *att) {
        connect_info.client_exts = NULL;
 
        // loop until socket closed
-       while (n >= 0 && !was_closed) {
+       while (n >= 0 && !closeRequest) {
                if (wsi_mirror == NULL) {
                        wsi_mirror = lws_client_connect_via_info(&connect_info);
                        if (wsi_mirror == NULL) {
@@ -1430,15 +1415,10 @@ void Channel::create_websocket(void *att) {
                                break;
                        }
                }
-
                n = lws_service(Context, 0x0FFFFFFF);
-
-               if (n < 0) {
-                       break;
-               }
        }
 
-       MSF_DBG("create_websocket destroy context");
+       MSF_DBG("createWebsocket destroy context");
 
        if (Context) {
                lws_context_destroy(Context);
@@ -1448,6 +1428,15 @@ void Channel::create_websocket(void *att) {
        }
 }
 
+void Channel::writeRequest()
+{
+       lws_callback_on_writable(wsi_mirror);
+       if (pthread_self() != serverThreadId) {
+               MSF_DBG("current thread is different from websocket server thread => lws_cancel_service()");
+               lws_cancel_service(lws_get_context(wsi_mirror)); // to exit from poll() inside of lws_service()
+       }
+}
+
 void Channel::get_ip_port_from_uri(string uri, string* dest_ip, int* dest_port) {
        unsigned int http_index = uri.find("http");
        unsigned int ip_index = 0;
@@ -1650,11 +1639,13 @@ void ChannelConnectionHandler::calculateAverageRT() {
 }
 
 void ChannelConnectionHandler::stopPing() {
-       if (ping_thread != 0) {
-               pthread_cancel(ping_thread);
-               pthread_join(ping_thread, NULL);
+       if (running) {
+               MSF_DBG("stopping ping");
+               if (pingpongTimer) {
+                       stopTimer(pingpongTimer);
+                       pingpongTimer = NULL;
+               }
                running = false;
-               ping_thread = 0;
        }
 }
 
@@ -1677,17 +1668,15 @@ void ChannelConnectionHandler::startPing(Channel *ptr) {
        pingSent = startTime;
        channel_ptr = ptr;
 
-       int err = pthread_create(&ping_thread, NULL, Pinging, ptr);
-       if (err) {
-               MSF_DBG("pthread_create failed err = %d", err);
+       if (pingpongTimer == NULL) {
+               pingpongTimer = startTimer(__send_ping_pong, __timeout_worker, pingTimeout/1000, ptr);
        }
-       // Need to check this
-       // Pinging(ptr);
 }
 
-void ChannelConnectionHandler::ping_again(void *arg) {
-       MSF_DBG("## ping again ###");
-       Channel *ptr = static_cast<Channel *>(arg);
+void ChannelConnectionHandler::__send_ping_pong(void* data)
+{
+       Channel *ptr = static_cast<Channel *>(data);
+
        long now = time(0);
 
        if (now > ptr->connectionHandler->lastPingReceived +
@@ -1695,23 +1684,54 @@ void ChannelConnectionHandler::ping_again(void *arg) {
                MSF_DBG("## Pinging timeout. disconnect ###");
                ptr->disconnect();
        } else {
-               ptr->publish("msfVersion2", "msfVersion2", ptr->clients->me(), NULL);
-               sleep(1);
                ptr->publish(PING, PONG.c_str(), ptr->clients->me(), NULL);
                ptr->connectionHandler->pingSent = time(0);
        }
+
+       if (ptr->connectionHandler->pingpongTimer != NULL) {
+               stopTimer(ptr->connectionHandler->pingpongTimer);
+               ptr->connectionHandler->pingpongTimer = NULL;
+       }
+       ptr->connectionHandler->pingpongTimer = startTimer(__send_ping_pong, __timeout_worker, ptr->connectionHandler->pingTimeout/1000, ptr);
 }
 
-void *ChannelConnectionHandler::Pinging(void *arg) {
-       MSF_DBG("## Pinging ###");
+void* ChannelConnectionHandler::startTimer(timer_function function, TimerWorker woker, unsigned int interval, void *data)
+{
+       guint id = 0;
+       GSource *src = NULL;
+       gpointer *tdata = NULL;
+
+       src = g_timeout_source_new(interval);
+
+       tdata = g_new0(gpointer, 2);
+
+       tdata[0] = (void*)function;
+       tdata[1] = data;
+
+       g_source_set_callback(src, woker, tdata, g_free);
+       id = g_source_attach(src, NULL);
+       g_source_unref(src);
 
-       Channel *ptr = static_cast<Channel *>(arg);
+       return (void*)id;
+}
 
-       while (1) {
-               ping_again(ptr);
-               // usleep(ptr->connectionHandler->pingTimeout);
-               sleep(5);
+gboolean ChannelConnectionHandler::__timeout_worker(gpointer ud)
+{
+       gpointer *tdata = (gpointer*)ud;
+       if (tdata[0]) {
+               ((timer_function)tdata[0])(tdata[1]);
        }
+       return false;
+}
+
+void ChannelConnectionHandler::stopTimer(void *timer)
+{
+    guint id = (guint) timer;
+    if (id) {
+        if (!g_source_remove(id)) {
+            MSF_DBG("g_source_remove is fail (interval_timer)");
+        }
+    }
 }
 
 void ChannelConnectionHandler::setPingTimeout(long t) {
@@ -1721,7 +1741,7 @@ void ChannelConnectionHandler::setPingTimeout(long t) {
 void Channel::handleWsiDestroy()
 {
        if (!isCommunicated) {
-               was_closed = true;
+               closeRequest = true;
                if (Context) {
                        lws_cancel_service(Context);
                }