class Application;
+int Channel::msgId = 0;
using namespace std;
string ChannelConnectionHandler::PING = "channel.ping";
map<Channel *, int> Channel::channel_alive_map;
map<string, int> Channel::json_keys;
pthread_t Channel::connect_thread;
-pthread_t ChannelConnectionHandler::ping_thread = 0;
JsonObject *Channel::root_json_object = NULL;
ChannelConnectionHandler::ChannelConnectionHandler() {
startTime = 0;
pingSent = 0;
average = 0;
+ pingpongTimer = NULL;
}
Channel::Channel() {
MSF_DBG("Channel()");
clientisHost = false;
- //isWrite = false;
- //buflen = 0;
- was_closed = true;
wsi_mirror = NULL;
Context = NULL;
mirror_lifetime = 0;
channel_alive_map.insert({this, 1});
init_json_key_map();
cl_payload_size = 0;
- write_buf_count = 0;
- write_buf_index = 0;
- write_buf_last_sent_index = 0;
cl_payload = NULL;
cl_data = NULL;
cl_data_size = 0;
is_header_parsed = false;
eventType = "";
resultobj = NULL;
+ pthread_mutex_init(&sendBufMutex, NULL);
+ serverThreadId = pthread_self();
+ closeRequest = false;
}
Channel::Channel(Service *service1, string uri1) {
MSF_DBG("Channel()");
clientisHost = false;
- //isWrite = false;
- //buflen = 0;
- was_closed = true;
wsi_mirror = NULL;
Context = NULL;
mirror_lifetime = 0;
channel_alive_map.insert({this, 1});
init_json_key_map();
cl_payload_size = 0;
- write_buf_count = 0;
- write_buf_index = 0;
- write_buf_last_sent_index = 0;
cl_payload = NULL;
cl_data = NULL;
cl_data_size = 0;
is_header_parsed = false;
eventType = "";
resultobj = NULL;
+ pthread_mutex_init(&sendBufMutex, NULL);
+ serverThreadId = pthread_self();
+ closeRequest = false;
}
Channel *Channel::create(Service *service, string uri) {
g_free(resultobj);
resultobj = NULL;
}
+
+ pthread_mutex_lock(&sendBufMutex);
+ while (sendBufQueue.begin() != sendBufQueue.end()) {
+ SendBufList::iterator itr = sendBufQueue.begin();
+ _SendBufInfo sendBufInfo = *itr;
+ sendBufQueue.erase(itr);
+
+ unsigned char* sendBuf = sendBufInfo.sendBuf;
+
+ // cleanup write buffer
+ if ((sendBuf != NULL)) {
+ free(sendBuf);
+ sendBuf = NULL;
+ }
+ }
+ pthread_mutex_unlock(&sendBufMutex);
}
void Channel::init_json_key_map() {
handleErrorMessage(UID);
} else if (eventType == READY_EVENT) {
handleReadyMessage();
+ } else if (eventType == ChannelConnectionHandler::PING) {
+ handlePongMessage();
} else {
handleClientMessage(data.c_str(), payload);
}
}
connectionHandler->stopPing();
- was_closed = true;
- lws_callback_on_writable(wsi_mirror);
-
disconnecting = false;
+
+ closeRequest = true;
+ MSF_DBG("writeRequest called");
+ writeRequest();
}
}
+void Channel::handlePongMessage() {
+// MSF_DBG("pong received");
+}
+
void Channel::handleReadyMessage() {
waitForOnReady = false;
std::map<void *, int>::const_iterator iterator;
if (onConnectListener) {
onConnectListener->onConnect(clienttemp);
}
- // To start channnel heath check
+ // To start channnel health check
if (isWebSocketOpen()) {
- // connectionHandler->startPing(this);
+ connectionHandler->startPing(this);
}
}
connectionHandler->stopPing();
wsi_mirror = NULL;
connected = false;
- was_closed = true;
if (Context) {
lws_cancel_service(Context);
}
clients->reset();
}
-void Channel::write_socket(Channel* ch_p)
+int Channel::writeSocket(Channel* ch_p)
{
- int n;
MSF_DBG("write_socket");
ch_p->setCommunicated(true);
- if (lws_partial_buffered(ch_p->wsi_mirror) == 1) {
- MSF_DBG("lws is wrting now. it will retry write.");
- lws_callback_on_writable(ch_p->wsi_mirror);
+ if (ch_p->closeRequest) {
+ MSF_DBG("socket closing");
+ lws_close_reason(ch_p->wsi_mirror, LWS_CLOSE_STATUS_NO_STATUS, (unsigned char *)"notack", strlen("notack"));
+// lws_callback_on_writable(ch_p->wsi_mirror);
+ return -1; // to close this wsi
}
- MSF_DBG("write_buf_count:%d", ch_p->write_buf_count);
- if (ch_p->write_buf_count) {
- if (ch_p->write_buf_last_sent_index == 999) {
- ch_p->write_buf_last_sent_index = 1;
- } else {
- ch_p->write_buf_last_sent_index++;
- }
+ pthread_mutex_lock(&ch_p->sendBufMutex);
+ bool ReqNextWritableCB = false;
- if (&(ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING]) == NULL ||
- ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING] == 0 ||
- ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING] == '\0') {
- } else {
- }
+ if (ch_p->sendBufQueue.begin() != ch_p->sendBufQueue.end()) {
+ SendBufList::iterator itr = ch_p->sendBufQueue.begin();
+ _SendBufInfo sendBufInfo = *itr;
- if (ch_p->write_buf_len[ch_p->write_buf_last_sent_index] <= 0) {
- }
+ bool binaryFlag = sendBufInfo.binaryFlag;
+ bool pingByClient = sendBufInfo.pingByClient;
+ unsigned char* sendBuf = sendBufInfo.sendBuf;
+ int sendLength = sendBufInfo.sendLength;
+ int msgId = sendBufInfo.msgId;
+ void* userData = sendBufInfo.userData;
- if (ch_p->write_buf_binary_flag[ch_p->write_buf_last_sent_index]) {
- n = lws_write(ch_p->wsi_mirror,
- &(ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING]),
- ch_p->write_buf_len[ch_p->write_buf_last_sent_index], LWS_WRITE_BINARY);
+ if (sendBuf == NULL) {
+ ch_p->sendBufQueue.erase(itr);
+ MSF_DBG("warn : Send Buf is NULL");
} else {
- n = lws_write(ch_p->wsi_mirror,
- &(ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING]),
- ch_p->write_buf_len[ch_p->write_buf_last_sent_index], LWS_WRITE_TEXT);
- }
+ ch_p->sendBufQueue.erase(itr);
+ int n = 0;
+ if (binaryFlag) {
+ n = lws_write(ch_p->wsi_mirror, &(sendBuf[LWS_SEND_BUFFER_PRE_PADDING]), sendLength, LWS_WRITE_BINARY);
+ } else {
+ n = lws_write(ch_p->wsi_mirror, &(sendBuf[LWS_SEND_BUFFER_PRE_PADDING]), sendLength, LWS_WRITE_TEXT);
+ }
- if (n < 0) {
- MSF_DBG("Writing failed\n");
- if (ch_p->onPublishListener) {
- ch_p->onPublishListener->onPublished(false, ch_p->publish_user_data[ch_p->write_buf_last_sent_index]);
+ if (n < 0) {
+ MSF_DBG("Writing failed\n");
+ if (ch_p->onPublishListener) {
+ ch_p->onPublishListener->onPublished(false, userData);
+ }
+ } else {
+ if (pingByClient == false) {
+ MSF_DBG("Writing succeed. id:%d, length:%d", msgId, n);
+ if (ch_p->onPublishListener) {
+ ch_p->onPublishListener->onPublished(true, userData);
+ }
+ }
}
- } else {
- MSF_DBG("Writing succeed : %d", n);
- if (ch_p->onPublishListener) {
- ch_p->onPublishListener->onPublished(true, ch_p->publish_user_data[ch_p->write_buf_last_sent_index]);
+ // cleanup write buffer
+ if ((sendBuf != NULL)) {
+ free(sendBuf);
+ sendBuf = NULL;
}
}
-
- delete[] ch_p->write_buf[ch_p->write_buf_last_sent_index];
- ch_p->write_buf_count--;
-
- if (ch_p->write_buf_count) {
- lws_callback_on_writable(ch_p->wsi_mirror);
- }
}
+ if ((ch_p->sendBufQueue.begin() != ch_p->sendBufQueue.end()) || ReqNextWritableCB) {
+ MSF_DBG("call");
+ lws_callback_on_writable(ch_p->wsi_mirror);
+ }
+ pthread_mutex_unlock(&ch_p->sendBufMutex);
+
+ return 0;
}
int Channel::callback_lws_mirror(struct lws *wsi,
if (this_ptr == NULL) {
// it means Channel object was deleted
return -1;
- } else {
}
+
if (lws_frame_is_binary(wsi)) {
MSF_DBG("BINARY MESSAGE ARRIVED. len:%d", len);
// header needs to be parsed on first chunk
this_ptr->cl_data_size = 0;
}
+ this_ptr->connectionHandler->resetLastPingReceived();
this_ptr->cl_data = (unsigned char*)realloc(this_ptr->cl_data, this_ptr->cl_data_size + len + 1);
if (this_ptr->cl_data) {
memcpy(&(this_ptr->cl_data[this_ptr->cl_data_size]), (char*)in, len);
this_ptr->cl_data_size += len;
this_ptr->cl_data[this_ptr->cl_data_size] = '\0';
- this_ptr->connectionHandler->resetLastPingReceived();
}
if (lws_is_final_fragment(wsi)) {
break;
case LWS_CALLBACK_CLIENT_WRITEABLE:
- write_socket(this_ptr);
+ return writeSocket(this_ptr);
break;
case LWS_CALLBACK_RECEIVE:
handleError(string(), Error::create("Not Connected"));
return;
} else {
- if (write_buf_index == 999) {
- write_buf_index = 0;
- }
-
- write_buf_index++;
- write_buf_count++;
-
long prepare_buf_len = 0;
- unsigned char *prepare_buf = prepareMessageMap(
+ unsigned char *prepareBuf = prepareMessageMap(
method, event, data, to, &prepare_buf_len, payload, payload_size);
- write_buf[write_buf_index] = prepare_buf;
- write_buf_len[write_buf_index] = prepare_buf_len;
- publish_user_data[write_buf_index] = user_data;
+ _SendBufInfo sendBufInfo;
+ sendBufInfo.pingByClient = false;
+ sendBufInfo.sendBuf = prepareBuf;
+ sendBufInfo.sendBuf[LWS_SEND_BUFFER_PRE_PADDING + prepare_buf_len] = 0;
+ sendBufInfo.sendLength = prepare_buf_len;
+ sendBufInfo.msgId = ++msgId;
+ sendBufInfo.userData = user_data;
+
+ if (event == ChannelConnectionHandler::PING) {
+ sendBufInfo.pingByClient = true;
+ }
+
if (payload) {
- write_buf_binary_flag[write_buf_index] = true;
+ sendBufInfo.binaryFlag = true;
} else {
- write_buf_binary_flag[write_buf_index] = false;
+ sendBufInfo.binaryFlag = false;
}
- write_buf[write_buf_index][LWS_SEND_BUFFER_PRE_PADDING + prepare_buf_len] = 0;
- //memcpy(&buf[LWS_SEND_BUFFER_PRE_PADDING], &prepare_buf[LWS_SEND_BUFFER_PRE_PADDING], prepare_buf_len);
-
- //buf[LWS_SEND_BUFFER_PRE_PADDING + prepare_buf_len] = 0;
-
- //buflen = prepare_buf_len;
-
- //delete[](prepare_buf);
+ pthread_mutex_lock(&sendBufMutex);
+ sendBufQueue.push_back(sendBufInfo);
+ pthread_mutex_unlock(&sendBufMutex);
- //if (binary_message) {
- //MSF_DBG("publish buffer = %s",&buf[LWS_SEND_BUFFER_PRE_PADDING + 2]);
- //} else {
- //MSF_DBG("publish buffer = %s",&buf[LWS_SEND_BUFFER_PRE_PADDING]);
- //}
-
- //isWrite = true;
- //lws_callback_on_writable(wsi_mirror);
- write_socket(this);
+ MSF_DBG("writeRequest called");
+ writeRequest();
}
}
int payload_size) {
int l = 0;
int header_size = 0;
- int data_len = 0;
+ Json::Value message;
+ Json::Value params;
+
+ params["event"] = event;
if (data) {
- data_len = strlen(data);
- } else {
- data_len = 0;
+ params["data"] = data;
}
+ params["to"] = to;
+ message["method"] = method;
+ message["params"] = params;
+
+ Json::FastWriter writer;
+ int data_len = (int)(writer.write(message).size());
- int prepare_buf_size = LWS_SEND_BUFFER_PRE_PADDING + data_len + payload_size + 512 + LWS_SEND_BUFFER_POST_PADDING;
+ int prepare_buf_size = LWS_SEND_BUFFER_PRE_PADDING + data_len + payload_size + 8 + LWS_SEND_BUFFER_POST_PADDING;
unsigned char *prepare_buf = new unsigned char[prepare_buf_size];
if (payload) {
- Json::Value message;
- Json::Value params;
-
- params["event"] = event;
- if (data) {
- params["data"] = data;
- }
- params["to"] = to;
- message["method"] = method;
- message["params"] = params;
-
- Json::FastWriter writer;
-
l += snprintf((char *)&prepare_buf[LWS_SEND_BUFFER_PRE_PADDING + 2],
prepare_buf_size - (LWS_SEND_BUFFER_PRE_PADDING + 2),
"%s", writer.write(message).c_str());
payload_size);
l += payload_size;
- //binary_message = true;
} else {
- Json::Value message;
- Json::Value params;
-
- params["event"] = event;
- if (data) {
- params["data"] = data;
- }
- params["to"] = to;
- message["method"] = method;
- message["params"] = params;
-
- Json::FastWriter writer;
-
l += snprintf((char *)&prepare_buf[LWS_SEND_BUFFER_PRE_PADDING],
prepare_buf_size - LWS_SEND_BUFFER_PRE_PADDING,
"%s", writer.write(message).c_str());
- //binary_message = false;
}
*prepare_buf_len = l;
l += snprintf((char *)&prepare_buf[LWS_SEND_BUFFER_PRE_PADDING],
prepare_buf_size - LWS_SEND_BUFFER_PRE_PADDING, "%s", data);
- if (write_buf_index == 999) {
- write_buf_index = 0;
- }
-
- write_buf_index++;
- write_buf_count++;
-
- write_buf[write_buf_index] = prepare_buf;
- write_buf_len[write_buf_index] = l;
- publish_user_data[write_buf_index] = NULL;
+ _SendBufInfo sendBufInfo;
+ sendBufInfo.pingByClient = false;
+ sendBufInfo.sendBuf = prepare_buf;
+ sendBufInfo.sendBuf[LWS_SEND_BUFFER_PRE_PADDING + l] = 0;
+ sendBufInfo.sendLength = l;
+ sendBufInfo.msgId = ++msgId;
+ sendBufInfo.userData = NULL;
+ sendBufInfo.binaryFlag = false;
- write_buf[write_buf_index][LWS_SEND_BUFFER_PRE_PADDING + l] = 0;
- write_buf_binary_flag[write_buf_index] = false;
+ pthread_mutex_lock(&sendBufMutex);
+ sendBufQueue.push_back(sendBufInfo);
+ pthread_mutex_unlock(&sendBufMutex);
- //buflen = l;
- //buf[LWS_SEND_BUFFER_PRE_PADDING + l] = 0;
-
- //binary_message = false;
-
- //isWrite = true;
- //lws_callback_on_writable(wsi_mirror);
- write_socket(this);
+ MSF_DBG("writeRequest called");
+ writeRequest();
}
void Channel::registerCallback(string uid, void *callback, int value_type) {
static_cast<map<Channel *, map<string, string> *> *>(att);
map<string, string> *attributes = pt_user_data->begin()->second;
- pt_user_data->begin()->first->create_websocket(attributes);
+ pt_user_data->begin()->first->createWebsocket(attributes);
delete attributes;
delete pt_user_data;
return NULL;
}
-void Channel::create_websocket(void *att) {
+void Channel::createWebsocket(void *att) {
struct lws_protocols protocols[] = {
{NULL, Channel::callback_lws_mirror, sizeof(int), 512000, 0,
NULL},
map<string, string> *attributes = (map<string, string> *)(att);
string uri = getChannelUri(attributes);
- was_closed = false;
- //struct lws_context *context;
-
+ closeRequest = false;
int ietf_version = -1; /* latest */
struct lws_context_creation_info info;
// info.extensions = lws_get_internal_extensions();
#endif
if (isWebSocketOpen()) {
- MSF_DBG("create_websocket already Connected");
+ MSF_DBG("createWebsocket already Connected");
return;
}
Context = lws_create_context(&info);
connect_info.client_exts = NULL;
// loop until socket closed
- while (n >= 0 && !was_closed) {
+ while (n >= 0 && !closeRequest) {
if (wsi_mirror == NULL) {
wsi_mirror = lws_client_connect_via_info(&connect_info);
if (wsi_mirror == NULL) {
break;
}
}
-
n = lws_service(Context, 0x0FFFFFFF);
-
- if (n < 0) {
- break;
- }
}
- MSF_DBG("create_websocket destroy context");
+ MSF_DBG("createWebsocket destroy context");
if (Context) {
lws_context_destroy(Context);
}
}
+void Channel::writeRequest()
+{
+ lws_callback_on_writable(wsi_mirror);
+ if (pthread_self() != serverThreadId) {
+ MSF_DBG("current thread is different from websocket server thread => lws_cancel_service()");
+ lws_cancel_service(lws_get_context(wsi_mirror)); // to exit from poll() inside of lws_service()
+ }
+}
+
void Channel::get_ip_port_from_uri(string uri, string* dest_ip, int* dest_port) {
unsigned int http_index = uri.find("http");
unsigned int ip_index = 0;
}
void ChannelConnectionHandler::stopPing() {
- if (ping_thread != 0) {
- pthread_cancel(ping_thread);
- pthread_join(ping_thread, NULL);
+ if (running) {
+ MSF_DBG("stopping ping");
+ if (pingpongTimer) {
+ stopTimer(pingpongTimer);
+ pingpongTimer = NULL;
+ }
running = false;
- ping_thread = 0;
}
}
pingSent = startTime;
channel_ptr = ptr;
- int err = pthread_create(&ping_thread, NULL, Pinging, ptr);
- if (err) {
- MSF_DBG("pthread_create failed err = %d", err);
+ if (pingpongTimer == NULL) {
+ pingpongTimer = startTimer(__send_ping_pong, __timeout_worker, pingTimeout/1000, ptr);
}
- // Need to check this
- // Pinging(ptr);
}
-void ChannelConnectionHandler::ping_again(void *arg) {
- MSF_DBG("## ping again ###");
- Channel *ptr = static_cast<Channel *>(arg);
+void ChannelConnectionHandler::__send_ping_pong(void* data)
+{
+ Channel *ptr = static_cast<Channel *>(data);
+
long now = time(0);
if (now > ptr->connectionHandler->lastPingReceived +
MSF_DBG("## Pinging timeout. disconnect ###");
ptr->disconnect();
} else {
- ptr->publish("msfVersion2", "msfVersion2", ptr->clients->me(), NULL);
- sleep(1);
ptr->publish(PING, PONG.c_str(), ptr->clients->me(), NULL);
ptr->connectionHandler->pingSent = time(0);
}
+
+ if (ptr->connectionHandler->pingpongTimer != NULL) {
+ stopTimer(ptr->connectionHandler->pingpongTimer);
+ ptr->connectionHandler->pingpongTimer = NULL;
+ }
+ ptr->connectionHandler->pingpongTimer = startTimer(__send_ping_pong, __timeout_worker, ptr->connectionHandler->pingTimeout/1000, ptr);
}
-void *ChannelConnectionHandler::Pinging(void *arg) {
- MSF_DBG("## Pinging ###");
+void* ChannelConnectionHandler::startTimer(timer_function function, TimerWorker woker, unsigned int interval, void *data)
+{
+ guint id = 0;
+ GSource *src = NULL;
+ gpointer *tdata = NULL;
+
+ src = g_timeout_source_new(interval);
+
+ tdata = g_new0(gpointer, 2);
+
+ tdata[0] = (void*)function;
+ tdata[1] = data;
+
+ g_source_set_callback(src, woker, tdata, g_free);
+ id = g_source_attach(src, NULL);
+ g_source_unref(src);
- Channel *ptr = static_cast<Channel *>(arg);
+ return (void*)id;
+}
- while (1) {
- ping_again(ptr);
- // usleep(ptr->connectionHandler->pingTimeout);
- sleep(5);
+gboolean ChannelConnectionHandler::__timeout_worker(gpointer ud)
+{
+ gpointer *tdata = (gpointer*)ud;
+ if (tdata[0]) {
+ ((timer_function)tdata[0])(tdata[1]);
}
+ return false;
+}
+
+void ChannelConnectionHandler::stopTimer(void *timer)
+{
+ guint id = (guint) timer;
+ if (id) {
+ if (!g_source_remove(id)) {
+ MSF_DBG("g_source_remove is fail (interval_timer)");
+ }
+ }
}
void ChannelConnectionHandler::setPingTimeout(long t) {
void Channel::handleWsiDestroy()
{
if (!isCommunicated) {
- was_closed = true;
+ closeRequest = true;
if (Context) {
lws_cancel_service(Context);
}