added setonPublishListener(), unsetonPublishListener() 21/80621/2
authorSang gun Lee <sanggun7.lee@samsung.com>
Tue, 19 Jul 2016 07:18:11 +0000 (16:18 +0900)
committerSang gun Lee <sanggun7.lee@samsung.com>
Mon, 25 Jul 2016 07:20:39 +0000 (16:20 +0900)
added user_data param in publish()

fixed libwebsocket publish reaction speed.

Change-Id: I81589b31e5da37d22d8e58e07344bfa10c60b310
Signed-off-by: Sang gun Lee <sanggun7.lee@samsung.com>
daemon/service_provider/app_comm_service_provider.cpp
msf_tizen_client/include/Channel.h
msf_tizen_client/include/SearchProvider.h
msf_tizen_client/src/Channel.cpp
msf_tizen_client/src/Search.cpp
msf_tizen_client/src/SearchProvider.cpp
msf_tizen_client/test/main.cpp

index dccfaae..06b1e50 100755 (executable)
@@ -412,7 +412,7 @@ int conv::app_comm_service_provider::set_request(request* request_obj)
                                string payload_str = payload.str();
                                _D("payload : %s, size : %d", message, strlen(message));
 
-                               (*iter)->application->publish("d2d_service_message", NULL, reinterpret_cast<unsigned char*>(message), strlen(message));
+                               (*iter)->application->publish("d2d_service_message", NULL, reinterpret_cast<unsigned char*>(message), strlen(message), NULL);
 
                                _D("publishing done");
 
index 325ae30..f8bde85 100755 (executable)
@@ -102,6 +102,11 @@ public:
        virtual void onError(/*Error error*/) {}
 };
 
+class OnPublishListener {
+public:
+       virtual void onPublished(bool, void*) {}
+};
+
 class Channel {
 public:
        Channel();
@@ -139,23 +144,25 @@ public:
        void unsetmessageListeners();
        void setonErrorListener(OnErrorListener *);
        void unsetonErrorListener();
-       void publish(string event, const char *data);
+       void setonPublishListener(OnPublishListener *);
+       void unsetonPublishListener();
+       void publish(string event, const char *data, void *user_data);
        void publish(string event, const char *data, unsigned char payload[],
-               int payload_size);
-       void publish(string event, const char *data, const char *target);
+               int payload_size, void *user_data);
+       void publish(string event, const char *data, const char *target, void *user_data);
        void publish(string event, const char *data, const char *target,
-               unsigned char payload[], int payload_size);
-       void publish(string event, const char *data, Client client);
+               unsigned char payload[], int payload_size, void *user_data);
+       void publish(string event, const char *data, Client client, void *user_data);
        void publish(string event, const char *data, Client client,
-               unsigned char payload[], int payload_size);
-       void publish(string event, const char *data, list<Client> clients);
+               unsigned char payload[], int payload_size, void *user_data);
+       void publish(string event, const char *data, list<Client> clients, void *user_data);
        void publish(string event, const char *data, list<Client> clients,
-               unsigned char payload[], int payload_size);
+               unsigned char payload[], int payload_size, void *user_data);
        void publishMessage(string event, const char *data, const char *to,
-                       unsigned char payload[], int payload_size);
+                       unsigned char payload[], int payload_size, void *user_data);
        void publishMessage(string method, string event, const char *data,
                        const char *to, unsigned char payload[],
-                       int payload_size);
+                       int payload_size, void *user_data);
        // unsigned char *createBinaryMessage(string json, unsigned char payload[],
        // int payload_size);
        static void init_json_key_map();
@@ -164,7 +171,6 @@ public:
        static void foreach_json_array(JsonArray *object, guint index,
                                JsonNode *node, gpointer user_data);
 
-       void set_isWrite(bool flag);
        static void write_socket(Channel*);
        static int callback_lws_mirror(struct lws *wsi,
                                enum lws_callback_reasons reason, void *user,
@@ -198,6 +204,7 @@ protected:
        OnClientConnectListener *onClientConnectListener = NULL;
        OnClientDisconnectListener *onClientDisconnectListener = NULL;
        OnReadyListener *onReadyListener = NULL;
+       OnPublishListener *onPublishListener = NULL;
        map<string, list<OnMessageListener *> > messageListeners;
        //Result_Base *connect_cb = NULL;
        //Result_Base *disconnect_cb = NULL;
@@ -235,16 +242,22 @@ private:
        static string CLIENT_CONNECT_EVENT;
        static string CLIENT_DISCONNECT_EVENT;
        static string READY_EVENT;
-       unsigned char
-       buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
+       unsigned char *write_buf[1000];
+       int write_buf_count;
+       int write_buf_index;
+       int write_buf_last_sent_index;
+       void* publish_user_data[1000];
+       int write_buf_len[1000];
+       bool write_buf_binary_flag[1000];
+       //LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
        static pthread_t connect_thread;
        int mirror_lifetime;
        int force_exit;
        char *messagedata;
-       bool isWrite;
-       bool binary_message;
+       //bool isWrite;
+       //bool binary_message;
        bool disconnecting;
-       long buflen;
+       //long buflen;
        unsigned char cl_payload[1000];
        int cl_payload_size;
        int was_closed;
index 4bb7746..d511722 100644 (file)
@@ -44,7 +44,7 @@ private:
 public:
        ttl_info(long ttl, int service_type);
        ttl_info();
-       int update_ttl(long ttl, int service_type);
+       void update_ttl(long ttl, int service_type);
        long get_ttl(int service_type);
        bool is_expired();
 };
index 9c3fddd..41e4fc6 100755 (executable)
@@ -69,8 +69,8 @@ ChannelConnectionHandler::ChannelConnectionHandler() {
 Channel::Channel() {
        MSF_DBG("Channel()");
        clientisHost = false;
-       isWrite = false;
-       buflen = 0;
+       //isWrite = false;
+       //buflen = 0;
        was_closed = 1;
        wsi_mirror = NULL;
        Context = NULL;
@@ -84,13 +84,16 @@ Channel::Channel() {
        channel_alive_map.insert({this, 1});
        init_json_key_map();
        cl_payload_size = 0;
+       write_buf_count = 0;
+       write_buf_index = 0;
+       write_buf_last_sent_index = 0;
 }
 
 Channel::Channel(Service *service1, string uri1) {
        MSF_DBG("Channel()");
        clientisHost = false;
-       isWrite = false;
-       buflen = 0;
+       //isWrite = false;
+       //buflen = 0;
        was_closed = 1;
        wsi_mirror = NULL;
        Context = NULL;
@@ -106,6 +109,9 @@ Channel::Channel(Service *service1, string uri1) {
        channel_alive_map.insert({this, 1});
        init_json_key_map();
        cl_payload_size = 0;
+       write_buf_count = 0;
+       write_buf_index = 0;
+       write_buf_last_sent_index = 0;
 }
 
 Channel *Channel::create(Service *service, string uri) {
@@ -617,6 +623,7 @@ void Channel::disconnect(Result_Base *result1) {
                connectionHandler->stopPing();
 
                was_closed = 1;
+               lws_callback_on_writable(wsi_mirror);
 
                disconnecting = false;
        }
@@ -720,31 +727,59 @@ void Channel::handleSocketClosed() {
 void Channel::write_socket(Channel* ch_p)
 {
        int n;
-       if (ch_p->isWrite) {
-               ch_p->isWrite = false;
+       MSF_DBG("write_socket");
+
+       if (lws_partial_buffered(ch_p->wsi_mirror) == 1) {
+               MSF_DBG("lws is wrting now. it will retry write.");
+               lws_callback_on_writable(ch_p->wsi_mirror);
+       }
+
+       if (ch_p->write_buf_count) {
+               if (ch_p->write_buf_last_sent_index == 999) {
+                       ch_p->write_buf_last_sent_index = 1;
+               } else {
+                       ch_p->write_buf_last_sent_index++;
+               }
 
-               if (&(ch_p->buf[LWS_SEND_BUFFER_PRE_PADDING]) == NULL ||
-                               ch_p->buf[LWS_SEND_BUFFER_PRE_PADDING] == 0 ||
-                               ch_p->buf[LWS_SEND_BUFFER_PRE_PADDING] == '\0') {
+               if (&(ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING]) == NULL ||
+                               ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING] == 0 ||
+                               ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING] == '\0') {
                } else {
                }
 
-               if (ch_p->buflen <= 0) {
+               if (ch_p->write_buf_len[ch_p->write_buf_last_sent_index] <= 0) {
                }
 
-               if (ch_p->binary_message) {
+               if (ch_p->write_buf_binary_flag[ch_p->write_buf_last_sent_index]) {
                        n = lws_write(ch_p->wsi_mirror,
-                                       &(ch_p->buf[LWS_SEND_BUFFER_PRE_PADDING]),
-                                       ch_p->buflen, LWS_WRITE_BINARY);
+                                       &(ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING]),
+                                       ch_p->write_buf_len[ch_p->write_buf_last_sent_index], LWS_WRITE_BINARY);
                } else {
                        n = lws_write(ch_p->wsi_mirror,
-                                       &(ch_p->buf[LWS_SEND_BUFFER_PRE_PADDING]),
-                                       ch_p->buflen, LWS_WRITE_TEXT);
+                                       &(ch_p->write_buf[ch_p->write_buf_last_sent_index][LWS_SEND_BUFFER_PRE_PADDING]),
+                                       ch_p->write_buf_len[ch_p->write_buf_last_sent_index], LWS_WRITE_TEXT);
                }
 
                if (n < 0) {
                        MSF_DBG("Writing failed\n");
+                       if (ch_p->onPublishListener) {
+                               ch_p->onPublishListener->onPublished(false, ch_p->publish_user_data[ch_p->write_buf_last_sent_index]);
+                       }
+               } else {
+                       MSF_DBG("Writing succeed\n");
+                       if (ch_p->onPublishListener) {
+                               ch_p->onPublishListener->onPublished(true, ch_p->publish_user_data[ch_p->write_buf_last_sent_index]);
+                       }
+               }
+
+               delete[] ch_p->write_buf[ch_p->write_buf_last_sent_index];
+               ch_p->write_buf_count--;
+
+               if (ch_p->write_buf_count) {
+                       lws_callback_on_writable(ch_p->wsi_mirror);
                }
+
+
        }
 }
 
@@ -884,58 +919,69 @@ string Channel::getChannelUri(map<string, string> *attributes) {
        return service->getUri();
 }
 
-void Channel::publish(string event, const char *data) {
+void Channel::publish(string event, const char *data, void *user_data)
+{
        string to = "\"";
        to.append(Message::TARGET_ALL.c_str());
        to.append("\"");
 
-       publishMessage(event, data, to.c_str(), NULL, 0);
+       publishMessage(event, data, to.c_str(), NULL, 0, user_data);
 }
 
 void Channel::publish(string event, const char *data, unsigned char payload[],
-                                       int payload_size) {
+                                       int payload_size, void *user_data)
+{
        string to = "\"";
        to.append(Message::TARGET_ALL.c_str());
        to.append("\"");
-       publishMessage(event, data, to.c_str(), payload, payload_size);
+       publishMessage(event, data, to.c_str(), payload, payload_size, user_data);
 }
 
-void Channel::publish(string event, const char *data, const char *target) {
+void Channel::publish(string event, const char *data, const char *target, 
+                                       void *user_data)
+{
        string to = "\"";
        to.append(target);
        to.append("\"");
-       publishMessage(event, data, to.c_str(), NULL, 0);
+       publishMessage(event, data, to.c_str(), NULL, 0, user_data);
 }
 
 void Channel::publish(string event, const char *data, const char *target,
-                                       unsigned char payload[], int payload_size) {
+                               unsigned char payload[], int payload_size, void *user_data)
+{
        string to = "\"";
        to.append(target);
        to.append("\"");
-       publishMessage(event, data, to.c_str(), payload, payload_size);
+       publishMessage(event, data, to.c_str(), payload, payload_size, user_data);
 }
 
-void Channel::publish(string event, const char *data, Client client) {
+void Channel::publish(string event, const char *data, Client client, 
+                                       void* user_data)
+{
        string to = "\"";
        to.append(client.getId());
        to.append("\"");
-       publishMessage(event, data, to.c_str(), NULL, 0);
+       publishMessage(event, data, to.c_str(), NULL, 0, user_data);
 }
 
 void Channel::publish(string event, const char *data, Client client,
-                                       unsigned char payload[], int payload_size) {
+                                       unsigned char payload[], int payload_size, void *user_data)
+{
        string to = "\"";
        to.append(client.getId());
        to.append("\"");
-       publishMessage(event, data, to.c_str(), payload, payload_size);
+       publishMessage(event, data, to.c_str(), payload, payload_size, user_data);
 }
 
-void Channel::publish(string event, const char *data, list<Client> clients) {
-       publish(event, data, clients, NULL, 0);
+void Channel::publish(string event, const char *data, list<Client> clients,
+                                       void* user_data)
+{
+       publish(event, data, clients, NULL, 0, user_data);
 }
 
 void Channel::publish(string event, const char *data, list<Client> clients,
-                                       unsigned char payload[], int payload_size) {
+                                       unsigned char payload[], int payload_size, void *user_data)
+{
        string to = "[";
 
        std::list<Client>::iterator iterator;
@@ -952,45 +998,64 @@ void Channel::publish(string event, const char *data, list<Client> clients,
        }
 
        // TODO
-       publishMessage(event, data, to.c_str(), payload, payload_size);
+       publishMessage(event, data, to.c_str(), payload, payload_size, user_data);
 }
 
 void Channel::publishMessage(string event, const char *data, const char *to,
-                                                        unsigned char payload[], int payload_size) {
+                                                        unsigned char payload[], int payload_size, void *user_data) {
        publishMessage(Message::METHOD_EMIT, event, data, to, payload,
-                                       payload_size);
+                                       payload_size, user_data);
 }
 
 void Channel::publishMessage(string method, string event, const char *data,
                                                         const char *to, unsigned char payload[],
-                                                        int payload_size) {
+                                                        int payload_size, void *user_data) {
        if (!isWebSocketOpen()) {
                handleError(string(), Error::create("Not Connected"));
                return;
        } else {
+
+               if (write_buf_index == 999 && write_buf_count != 999) {
+                       write_buf_index = 0;
+               }
+
+               if (write_buf_count == 999) {
+               }
+
+               write_buf_index++;
+               write_buf_count++;
+
                long prepare_buf_len = 0;
                unsigned char *prepare_buf = prepareMessageMap(
                        method, event, data, to, &prepare_buf_len, payload, payload_size);
 
-               memcpy(&buf[LWS_SEND_BUFFER_PRE_PADDING],
-                               &prepare_buf[LWS_SEND_BUFFER_PRE_PADDING], prepare_buf_len);
+               write_buf[write_buf_index] = prepare_buf;
+               write_buf_len[write_buf_index] = prepare_buf_len;
+               publish_user_data[write_buf_index] = user_data;
+               if (payload) {
+                       write_buf_binary_flag[write_buf_index] = true;
+               } else {
+                       write_buf_binary_flag[write_buf_index] = false;
+               }
 
-               buf[LWS_SEND_BUFFER_PRE_PADDING + prepare_buf_len] = 0;
+               write_buf[write_buf_index][LWS_SEND_BUFFER_PRE_PADDING + prepare_buf_len] = 0;
+               //memcpy(&buf[LWS_SEND_BUFFER_PRE_PADDING], &prepare_buf[LWS_SEND_BUFFER_PRE_PADDING], prepare_buf_len);
 
-               buflen = prepare_buf_len;
+               //buf[LWS_SEND_BUFFER_PRE_PADDING + prepare_buf_len] = 0;
 
-               delete[](prepare_buf);
+               //buflen = prepare_buf_len;
 
-               if (binary_message) {
-                       MSF_DBG("publish buffer = %s",
-                                               &buf[LWS_SEND_BUFFER_PRE_PADDING + 2]);
-               } else {
-                       MSF_DBG("publish buffer = %s",
-                                               &buf[LWS_SEND_BUFFER_PRE_PADDING]);
-               }
+               //delete[](prepare_buf);
 
-               isWrite = true;
-               lws_callback_on_writable(wsi_mirror);
+               //if (binary_message) {
+                       //MSF_DBG("publish buffer = %s",&buf[LWS_SEND_BUFFER_PRE_PADDING + 2]);
+               //} else {
+                       //MSF_DBG("publish buffer = %s",&buf[LWS_SEND_BUFFER_PRE_PADDING]);
+               //}
+
+               //isWrite = true;
+               //lws_callback_on_writable(wsi_mirror);
+               write_socket(this);
        }
 }
 
@@ -1027,7 +1092,7 @@ unsigned char *Channel::prepareMessageMap(string method, string event,
 
                l += payload_size;
 
-               binary_message = true;
+               //binary_message = true;
        } else {
                l += snprintf((char *)&prepare_buf[LWS_SEND_BUFFER_PRE_PADDING],
                                        prepare_buf_size - LWS_SEND_BUFFER_PRE_PADDING,
@@ -1035,7 +1100,7 @@ unsigned char *Channel::prepareMessageMap(string method, string event,
                                        "\"%s\",\"data\": \"%s\",\"to\": %s}}",
                                        method.c_str(), event.c_str(), (unsigned char *)data,
                                        (unsigned char *)to);
-               binary_message = false;
+               //binary_message = false;
        }
 
        *prepare_buf_len = l;
@@ -1048,15 +1113,37 @@ void Channel::start_app(char *data, int buflength, string msgID) {
        UID = msgID;
        int l = 0;
 
-       l += snprintf((char *)&buf[LWS_SEND_BUFFER_PRE_PADDING],
-                       sizeof(buf) - LWS_SEND_BUFFER_PRE_PADDING, "%s", data);
-       buflen = l;
-       buf[LWS_SEND_BUFFER_PRE_PADDING + l] = 0;
+       int prepare_buf_size = LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING;
+       unsigned char *prepare_buf = new unsigned char[prepare_buf_size];
+
+       l += snprintf((char *)&prepare_buf[LWS_SEND_BUFFER_PRE_PADDING],
+                       prepare_buf_size - LWS_SEND_BUFFER_PRE_PADDING, "%s", data);
+
+       if (write_buf_index == 999 && write_buf_count != 999) {
+               write_buf_index = 0;
+       }
+
+       if (write_buf_count == 999) {
+       }
+
+       write_buf_index++;
+       write_buf_count++;
+
+       write_buf[write_buf_index] = prepare_buf;
+       write_buf_len[write_buf_index] = l;
+       publish_user_data[write_buf_index] = NULL;
+
+       write_buf[write_buf_index][LWS_SEND_BUFFER_PRE_PADDING + l] = 0;
+       write_buf_binary_flag[write_buf_index] = false;
 
-       binary_message = false;
+       //buflen = l;
+       //buf[LWS_SEND_BUFFER_PRE_PADDING + l] = 0;
 
-       isWrite = true;
-       lws_callback_on_writable(wsi_mirror);
+       //binary_message = false;
+
+       //isWrite = true;
+       //lws_callback_on_writable(wsi_mirror);
+       write_socket(this);
 }
 
 void Channel::registerCallback(string uid, void *callback, int value_type) {
@@ -1123,7 +1210,7 @@ void Channel::create_websocket(void *att) {
        string uri = getChannelUri(attributes);
 
        was_closed = 0;
-       struct lws_context *context;
+       //struct lws_context *context;
 
        int ietf_version = -1; /* latest */
 
@@ -1148,9 +1235,9 @@ void Channel::create_websocket(void *att) {
                MSF_DBG("create_websocket already Connected");
                return;
        }
-       context = lws_create_context(&info);
-       Context = context;
-       if (context == NULL) {
+       Context = lws_create_context(&info);
+       //Context = context;
+       if (Context == NULL) {
                MSF_DBG("Creating libwebsocket context failed\n");
                return;
        }
@@ -1160,7 +1247,7 @@ void Channel::create_websocket(void *att) {
        api.append("channels/").append(ChannelID);
 
        struct lws_client_connect_info connect_info;
-       connect_info.context = context;
+       connect_info.context = Context;
        connect_info.address = server_ip_address.c_str();
        connect_info.port = server_port;
        connect_info.ssl_connection = use_ssl;
@@ -1186,18 +1273,21 @@ void Channel::create_websocket(void *att) {
                        }
                }
 
-               n = lws_service(context, 500);
+               n = lws_service(Context, -1);
 
                if (n < 0)
                        break;
        }
 
        MSF_DBG("create_websocket destroy context");
-       if (context)
-               lws_context_destroy(context);
 
-       context = NULL;
-       wsi_mirror = NULL;
+       if (Context) {
+               lws_context_destroy(Context);
+
+               Context = NULL;
+               wsi_mirror = NULL;
+       }
+
 }
 
 void Channel::get_ip_port_from_uri(string uri, string* dest_ip, int* dest_port) {
@@ -1362,16 +1452,30 @@ void Channel::setonErrorListener(OnErrorListener *obj) {
        onErrorListener = obj;
 }
 
-void Channel::unsetonErrorListener() { onErrorListener = NULL; }
+void Channel::unsetonErrorListener()
+{
+       onErrorListener = NULL;
+}
 
-void Channel::setonReadyListener(OnReadyListener *obj) {
+void Channel::setonReadyListener(OnReadyListener *obj)
+{
        onReadyListener = obj;
 }
 
-void Channel::unsetonReadyListener() { onReadyListener = NULL; }
+void Channel::unsetonReadyListener()
+{
+       onReadyListener = NULL;
+}
 
-void Channel::set_isWrite(bool flag) {}
+void Channel::setonPublishListener(OnPublishListener *obj)
+{
+       onPublishListener = obj;
+}
 
+void Channel::unsetonPublishListener()
+{
+       onPublishListener = NULL;
+}
 void ChannelConnectionHandler::resetLastPingReceived() {
        this->lastPingReceived = time(0);
 }
@@ -1430,9 +1534,9 @@ void ChannelConnectionHandler::ping_again(void *arg) {
                MSF_DBG("## Pinging timeout. disconnect ###");
                ptr->disconnect();
        } else {
-               ptr->publish("msfVersion2", "msfVersion2", ptr->clients->me());
+               ptr->publish("msfVersion2", "msfVersion2", ptr->clients->me(), NULL);
                sleep(1);
-               ptr->publish(PING, PONG.c_str(), ptr->clients->me());
+               ptr->publish(PING, PONG.c_str(), ptr->clients->me(), NULL);
                ptr->connectionHandler->pingSent = time(0);
        }
 }
index 74120f9..e44d747 100755 (executable)
@@ -286,6 +286,8 @@ void *Search::pt_update_alivemap(void *arg)
                sleep(5);
                SearchProvider::reapServices();
        }
+
+       return NULL;
 }
 
 void Search::startDiscovery()
index ab464c6..1120c6a 100644 (file)
@@ -48,7 +48,7 @@ long ttl_info:: get_ttl(int service_type)
 
 }
 
-int ttl_info::update_ttl(long ttl, int service_type)
+void ttl_info::update_ttl(long ttl, int service_type)
 {
        if (service_type == MSFD) {
                msfd_ttl = ttl;
index 823e91f..b58de28 100755 (executable)
@@ -129,7 +129,7 @@ public:
                fflush(stdout);
                const char* welcome = "hello welcome";
                unsigned char binary[5] = {'a', 'b', 'c', 'd', '\0'};
-               app->publish("test_say", welcome, c, binary, 5);
+               app->publish("test_say", welcome, c, binary, 5, NULL);
 
                //fprintf(stderr,"\n [MSF : API] Debug log Function : [%s] and line [%d] in file [%s] \n", __FUNCTION__, __LINE__, __FILE__);
        }
@@ -313,7 +313,12 @@ public:
        }
 };
 
-
+class published_listener : public OnPublishListener {
+public:
+       void onPublished(bool result, void* user_data) {
+               printf("\n published result = %s, user_data = %d", result ? "true" : "false", (int)user_data);
+       }
+};
 
 //Service service;
 Search* search1;
@@ -335,8 +340,7 @@ SearchListenerinherit search_listener2;
 startListener startListener1;
 stopListener stopListener1;
 installListener installListener1;
-
-
+published_listener published_listener1;
 
 void display_service_list()
 {
@@ -591,10 +595,11 @@ void publish()
 
        if (!done) {
                application->addOnMessageListener("hello", &msg_listener1);
+               application->setonPublishListener(&published_listener1);
                done = true;
        }
 
-       application->publish("hello", "hello");
+       application->publish("hello", "hello", (void*)1);
 }
 
 void Menu()