virtual void onError(/*Error error*/) {}
};
+class OnPublishListener {
+public:
+ virtual void onPublished(bool, void*) {}
+};
+
class Channel {
public:
Channel();
void unsetmessageListeners();
void setonErrorListener(OnErrorListener *);
void unsetonErrorListener();
- void publish(string event, const char *data);
+ void setonPublishListener(OnPublishListener *);
+ void unsetonPublishListener();
+ void publish(string event, const char *data, void *user_data);
void publish(string event, const char *data, unsigned char payload[],
- int payload_size);
- void publish(string event, const char *data, const char *target);
+ int payload_size, void *user_data);
+ void publish(string event, const char *data, const char *target, void *user_data);
void publish(string event, const char *data, const char *target,
- unsigned char payload[], int payload_size);
- void publish(string event, const char *data, Client client);
+ unsigned char payload[], int payload_size, void *user_data);
+ void publish(string event, const char *data, Client client, void *user_data);
void publish(string event, const char *data, Client client,
- unsigned char payload[], int payload_size);
- void publish(string event, const char *data, list<Client> clients);
+ unsigned char payload[], int payload_size, void *user_data);
+ void publish(string event, const char *data, list<Client> clients, void *user_data);
void publish(string event, const char *data, list<Client> clients,
- unsigned char payload[], int payload_size);
+ unsigned char payload[], int payload_size, void *user_data);
void publishMessage(string event, const char *data, const char *to,
- unsigned char payload[], int payload_size);
+ unsigned char payload[], int payload_size, void *user_data);
void publishMessage(string method, string event, const char *data,
const char *to, unsigned char payload[],
- int payload_size);
+ int payload_size, void *user_data);
// unsigned char *createBinaryMessage(string json, unsigned char payload[],
// int payload_size);
static void init_json_key_map();
static void foreach_json_array(JsonArray *object, guint index,
JsonNode *node, gpointer user_data);
- void set_isWrite(bool flag);
static void write_socket(Channel*);
static int callback_lws_mirror(struct lws *wsi,
enum lws_callback_reasons reason, void *user,
OnClientConnectListener *onClientConnectListener = NULL;
OnClientDisconnectListener *onClientDisconnectListener = NULL;
OnReadyListener *onReadyListener = NULL;
+ OnPublishListener *onPublishListener = NULL;
map<string, list<OnMessageListener *> > messageListeners;
//Result_Base *connect_cb = NULL;
//Result_Base *disconnect_cb = NULL;
static string CLIENT_CONNECT_EVENT;
static string CLIENT_DISCONNECT_EVENT;
static string READY_EVENT;
- unsigned char
- buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
+ unsigned char *write_buf[1000];
+ int write_buf_count;
+ int write_buf_index;
+ int write_buf_last_sent_index;
+ void* publish_user_data[1000];
+ int write_buf_len[1000];
+ bool write_buf_binary_flag[1000];
+ //LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
static pthread_t connect_thread;
int mirror_lifetime;
int force_exit;
char *messagedata;
- bool isWrite;
- bool binary_message;
+ //bool isWrite;
+ //bool binary_message;
bool disconnecting;
- long buflen;
+ //long buflen;
unsigned char cl_payload[1000];
int cl_payload_size;
int was_closed;
Channel::Channel() {
MSF_DBG("Channel()");
clientisHost = false;
- isWrite = false;
- buflen = 0;
+ //isWrite = false;
+ //buflen = 0;
was_closed = 1;
wsi_mirror = NULL;
Context = NULL;
channel_alive_map.insert({this, 1});
init_json_key_map();
cl_payload_size = 0;
+ write_buf_count = 0;
+ write_buf_index = 0;
+ write_buf_last_sent_index = 0;
}
Channel::Channel(Service *service1, string uri1) {
MSF_DBG("Channel()");
clientisHost = false;
- isWrite = false;
- buflen = 0;
+ //isWrite = false;
+ //buflen = 0;
was_closed = 1;
wsi_mirror = NULL;
Context = NULL;
channel_alive_map.insert({this, 1});
init_json_key_map();
cl_payload_size = 0;
+ write_buf_count = 0;
+ write_buf_index = 0;
+ write_buf_last_sent_index = 0;
}
Channel *Channel::create(Service *service, string uri) {
connectionHandler->stopPing();
was_closed = 1;
+ lws_callback_on_writable(wsi_mirror);
disconnecting = false;
}
void Channel::write_socket(Channel* ch_p)
{
int n;
- if (ch_p->isWrite) {
- ch_p->isWrite = false;
+ MSF_DBG("write_socket");
+
+ if (lws_partial_buffered(ch_p->wsi_mirror) == 1) {
+ MSF_DBG("lws is wrting now. it will retry write.");
+ lws_callback_on_writable(ch_p->wsi_mirror);
+ }
+
+ if (ch_p->write_buf_count) {
+ if (ch_p->write_buf_last_sent_index == 999) {
+ ch_p->write_buf_last_sent_index = 1;
+ } else {
+ ch_p->write_buf_last_sent_index++;
+ }
- if (&(ch_p->buf[LWS_SEND_BUFFER_PRE_PADDING]) == NULL ||
- ch_p->buf[LWS_SEND_BUFFER_PRE_PADDING] == 0 ||
- ch_p->buf[LWS_SEND_BUFFER_PRE_PADDING] == '\0') {
+ if (&(ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING]) == NULL ||
+ ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING] == 0 ||
+ ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING] == '\0') {
} else {
}
- if (ch_p->buflen <= 0) {
+ if (ch_p->write_buf_len[ch_p->write_buf_last_sent_index] <= 0) {
}
- if (ch_p->binary_message) {
+ if (ch_p->write_buf_binary_flag[ch_p->write_buf_last_sent_index]) {
n = lws_write(ch_p->wsi_mirror,
- &(ch_p->buf[LWS_SEND_BUFFER_PRE_PADDING]),
- ch_p->buflen, LWS_WRITE_BINARY);
+ &(ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING]),
+ ch_p->write_buf_len[ch_p->write_buf_last_sent_index], LWS_WRITE_BINARY);
} else {
n = lws_write(ch_p->wsi_mirror,
- &(ch_p->buf[LWS_SEND_BUFFER_PRE_PADDING]),
- ch_p->buflen, LWS_WRITE_TEXT);
+ &(ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING]),
+ ch_p->write_buf_len[ch_p->write_buf_last_sent_index], LWS_WRITE_TEXT);
}
if (n < 0) {
MSF_DBG("Writing failed\n");
+ if (ch_p->onPublishListener) {
+ ch_p->onPublishListener->onPublished(false, ch_p->publish_user_data[ch_p->write_buf_last_sent_index]);
+ }
+ } else {
+ MSF_DBG("Writing succeed\n");
+ if (ch_p->onPublishListener) {
+ ch_p->onPublishListener->onPublished(true, ch_p->publish_user_data[ch_p->write_buf_last_sent_index]);
+ }
+ }
+
+ delete[] ch_p->write_buf[ch_p->write_buf_last_sent_index];
+ ch_p->write_buf_count--;
+
+ if (ch_p->write_buf_count) {
+ lws_callback_on_writable(ch_p->wsi_mirror);
}
+
+
}
}
return service->getUri();
}
-void Channel::publish(string event, const char *data) {
+void Channel::publish(string event, const char *data, void *user_data)
+{
string to = "\"";
to.append(Message::TARGET_ALL.c_str());
to.append("\"");
- publishMessage(event, data, to.c_str(), NULL, 0);
+ publishMessage(event, data, to.c_str(), NULL, 0, user_data);
}
void Channel::publish(string event, const char *data, unsigned char payload[],
- int payload_size) {
+ int payload_size, void *user_data)
+{
string to = "\"";
to.append(Message::TARGET_ALL.c_str());
to.append("\"");
- publishMessage(event, data, to.c_str(), payload, payload_size);
+ publishMessage(event, data, to.c_str(), payload, payload_size, user_data);
}
-void Channel::publish(string event, const char *data, const char *target) {
+void Channel::publish(string event, const char *data, const char *target,
+ void *user_data)
+{
string to = "\"";
to.append(target);
to.append("\"");
- publishMessage(event, data, to.c_str(), NULL, 0);
+ publishMessage(event, data, to.c_str(), NULL, 0, user_data);
}
void Channel::publish(string event, const char *data, const char *target,
- unsigned char payload[], int payload_size) {
+ unsigned char payload[], int payload_size, void *user_data)
+{
string to = "\"";
to.append(target);
to.append("\"");
- publishMessage(event, data, to.c_str(), payload, payload_size);
+ publishMessage(event, data, to.c_str(), payload, payload_size, user_data);
}
-void Channel::publish(string event, const char *data, Client client) {
+void Channel::publish(string event, const char *data, Client client,
+ void* user_data)
+{
string to = "\"";
to.append(client.getId());
to.append("\"");
- publishMessage(event, data, to.c_str(), NULL, 0);
+ publishMessage(event, data, to.c_str(), NULL, 0, user_data);
}
void Channel::publish(string event, const char *data, Client client,
- unsigned char payload[], int payload_size) {
+ unsigned char payload[], int payload_size, void *user_data)
+{
string to = "\"";
to.append(client.getId());
to.append("\"");
- publishMessage(event, data, to.c_str(), payload, payload_size);
+ publishMessage(event, data, to.c_str(), payload, payload_size, user_data);
}
-void Channel::publish(string event, const char *data, list<Client> clients) {
- publish(event, data, clients, NULL, 0);
+void Channel::publish(string event, const char *data, list<Client> clients,
+ void* user_data)
+{
+ publish(event, data, clients, NULL, 0, user_data);
}
void Channel::publish(string event, const char *data, list<Client> clients,
- unsigned char payload[], int payload_size) {
+ unsigned char payload[], int payload_size, void *user_data)
+{
string to = "[";
std::list<Client>::iterator iterator;
}
// TODO
- publishMessage(event, data, to.c_str(), payload, payload_size);
+ publishMessage(event, data, to.c_str(), payload, payload_size, user_data);
}
void Channel::publishMessage(string event, const char *data, const char *to,
- unsigned char payload[], int payload_size) {
+ unsigned char payload[], int payload_size, void *user_data) {
publishMessage(Message::METHOD_EMIT, event, data, to, payload,
- payload_size);
+ payload_size, user_data);
}
void Channel::publishMessage(string method, string event, const char *data,
const char *to, unsigned char payload[],
- int payload_size) {
+ int payload_size, void *user_data) {
if (!isWebSocketOpen()) {
handleError(string(), Error::create("Not Connected"));
return;
} else {
+
+ if (write_buf_index == 999 && write_buf_count != 999) {
+ write_buf_index = 0;
+ }
+
+ if (write_buf_count == 999) {
+ }
+
+ write_buf_index++;
+ write_buf_count++;
+
long prepare_buf_len = 0;
unsigned char *prepare_buf = prepareMessageMap(
method, event, data, to, &prepare_buf_len, payload, payload_size);
- memcpy(&buf[LWS_SEND_BUFFER_PRE_PADDING],
- &prepare_buf[LWS_SEND_BUFFER_PRE_PADDING], prepare_buf_len);
+ write_buf[write_buf_index] = prepare_buf;
+ write_buf_len[write_buf_index] = prepare_buf_len;
+ publish_user_data[write_buf_index] = user_data;
+ if (payload) {
+ write_buf_binary_flag[write_buf_index] = true;
+ } else {
+ write_buf_binary_flag[write_buf_index] = false;
+ }
- buf[LWS_SEND_BUFFER_PRE_PADDING + prepare_buf_len] = 0;
+ write_buf[write_buf_index][LWS_SEND_BUFFER_PRE_PADDING + prepare_buf_len] = 0;
+ //memcpy(&buf[LWS_SEND_BUFFER_PRE_PADDING], &prepare_buf[LWS_SEND_BUFFER_PRE_PADDING], prepare_buf_len);
- buflen = prepare_buf_len;
+ //buf[LWS_SEND_BUFFER_PRE_PADDING + prepare_buf_len] = 0;
- delete[](prepare_buf);
+ //buflen = prepare_buf_len;
- if (binary_message) {
- MSF_DBG("publish buffer = %s",
- &buf[LWS_SEND_BUFFER_PRE_PADDING + 2]);
- } else {
- MSF_DBG("publish buffer = %s",
- &buf[LWS_SEND_BUFFER_PRE_PADDING]);
- }
+ //delete[](prepare_buf);
- isWrite = true;
- lws_callback_on_writable(wsi_mirror);
+ //if (binary_message) {
+ //MSF_DBG("publish buffer = %s",&buf[LWS_SEND_BUFFER_PRE_PADDING + 2]);
+ //} else {
+ //MSF_DBG("publish buffer = %s",&buf[LWS_SEND_BUFFER_PRE_PADDING]);
+ //}
+
+ //isWrite = true;
+ //lws_callback_on_writable(wsi_mirror);
+ write_socket(this);
}
}
l += payload_size;
- binary_message = true;
+ //binary_message = true;
} else {
l += snprintf((char *)&prepare_buf[LWS_SEND_BUFFER_PRE_PADDING],
prepare_buf_size - LWS_SEND_BUFFER_PRE_PADDING,
"\"%s\",\"data\": \"%s\",\"to\": %s}}",
method.c_str(), event.c_str(), (unsigned char *)data,
(unsigned char *)to);
- binary_message = false;
+ //binary_message = false;
}
*prepare_buf_len = l;
UID = msgID;
int l = 0;
- l += snprintf((char *)&buf[LWS_SEND_BUFFER_PRE_PADDING],
- sizeof(buf) - LWS_SEND_BUFFER_PRE_PADDING, "%s", data);
- buflen = l;
- buf[LWS_SEND_BUFFER_PRE_PADDING + l] = 0;
+ int prepare_buf_size = LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING;
+ unsigned char *prepare_buf = new unsigned char[prepare_buf_size];
+
+ l += snprintf((char *)&prepare_buf[LWS_SEND_BUFFER_PRE_PADDING],
+ prepare_buf_size - LWS_SEND_BUFFER_PRE_PADDING, "%s", data);
+
+ if (write_buf_index == 999 && write_buf_count != 999) {
+ write_buf_index = 0;
+ }
+
+ if (write_buf_count == 999) {
+ }
+
+ write_buf_index++;
+ write_buf_count++;
+
+ write_buf[write_buf_index] = prepare_buf;
+ write_buf_len[write_buf_index] = l;
+ publish_user_data[write_buf_index] = NULL;
+
+ write_buf[write_buf_index][LWS_SEND_BUFFER_PRE_PADDING + l] = 0;
+ write_buf_binary_flag[write_buf_index] = false;
- binary_message = false;
+ //buflen = l;
+ //buf[LWS_SEND_BUFFER_PRE_PADDING + l] = 0;
- isWrite = true;
- lws_callback_on_writable(wsi_mirror);
+ //binary_message = false;
+
+ //isWrite = true;
+ //lws_callback_on_writable(wsi_mirror);
+ write_socket(this);
}
void Channel::registerCallback(string uid, void *callback, int value_type) {
string uri = getChannelUri(attributes);
was_closed = 0;
- struct lws_context *context;
+ //struct lws_context *context;
int ietf_version = -1; /* latest */
MSF_DBG("create_websocket already Connected");
return;
}
- context = lws_create_context(&info);
- Context = context;
- if (context == NULL) {
+ Context = lws_create_context(&info);
+ //Context = context;
+ if (Context == NULL) {
MSF_DBG("Creating libwebsocket context failed\n");
return;
}
api.append("channels/").append(ChannelID);
struct lws_client_connect_info connect_info;
- connect_info.context = context;
+ connect_info.context = Context;
connect_info.address = server_ip_address.c_str();
connect_info.port = server_port;
connect_info.ssl_connection = use_ssl;
}
}
- n = lws_service(context, 500);
+ n = lws_service(Context, -1);
if (n < 0)
break;
}
MSF_DBG("create_websocket destroy context");
- if (context)
- lws_context_destroy(context);
- context = NULL;
- wsi_mirror = NULL;
+ if (Context) {
+ lws_context_destroy(Context);
+
+ Context = NULL;
+ wsi_mirror = NULL;
+ }
+
}
void Channel::get_ip_port_from_uri(string uri, string* dest_ip, int* dest_port) {
onErrorListener = obj;
}
-void Channel::unsetonErrorListener() { onErrorListener = NULL; }
+void Channel::unsetonErrorListener()
+{
+ onErrorListener = NULL;
+}
-void Channel::setonReadyListener(OnReadyListener *obj) {
+void Channel::setonReadyListener(OnReadyListener *obj)
+{
onReadyListener = obj;
}
-void Channel::unsetonReadyListener() { onReadyListener = NULL; }
+void Channel::unsetonReadyListener()
+{
+ onReadyListener = NULL;
+}
-void Channel::set_isWrite(bool flag) {}
+void Channel::setonPublishListener(OnPublishListener *obj)
+{
+ onPublishListener = obj;
+}
+void Channel::unsetonPublishListener()
+{
+ onPublishListener = NULL;
+}
void ChannelConnectionHandler::resetLastPingReceived() {
this->lastPingReceived = time(0);
}
MSF_DBG("## Pinging timeout. disconnect ###");
ptr->disconnect();
} else {
- ptr->publish("msfVersion2", "msfVersion2", ptr->clients->me());
+ ptr->publish("msfVersion2", "msfVersion2", ptr->clients->me(), NULL);
sleep(1);
- ptr->publish(PING, PONG.c_str(), ptr->clients->me());
+ ptr->publish(PING, PONG.c_str(), ptr->clients->me(), NULL);
ptr->connectionHandler->pingSent = time(0);
}
}