Handle one write websocket job per one writable callback 59/170859/1
authorJi-hoon Lee <dalton.lee@samsung.com>
Thu, 22 Feb 2018 11:38:46 +0000 (20:38 +0900)
committerJihoon Kim <jihoon48.kim@samsung.com>
Fri, 23 Feb 2018 00:50:32 +0000 (09:50 +0900)
Change-Id: Ie12c7d751d1abe42c504c35861bd38aef4262849

src/legacy_support/websocket.cpp
src/legacy_support/websocket.h

index 507f3b8..4fecacc 100644 (file)
@@ -35,6 +35,9 @@
 
 #include <libwebsockets.h>
 
+#define RECVED_MESSAGE "recved"
+#define MESSAGE_LEFT "left"
+
 pthread_t g_ws_server_thread = (pthread_t)NULL;
 pthread_mutex_t g_ws_server_mutex = PTHREAD_MUTEX_INITIALIZER;
 
@@ -127,6 +130,7 @@ static int callback_keyboard(struct lws *wsi,
             if (pss->valid) {
                 pthread_mutex_lock(&g_ws_server_mutex);
                 std::queue<ISE_MESSAGE>& messages = agent->get_send_message_queue();
+                /* One write allowed per one writable callback */
                 if (messages.size() > 0) {
                     ISE_MESSAGE &message = messages.front();
                     std::string str = CISEMessageSerializer::serialize(message);
@@ -136,6 +140,9 @@ static int callback_keyboard(struct lws *wsi,
                     n = lws_write(wsi, p, n, LWS_WRITE_TEXT);
                     messages.pop();
                 }
+                if (messages.size() > 0) {
+                    ecore_pipe_write(agent->get_message_pipe(), MESSAGE_LEFT, strlen(MESSAGE_LEFT));
+                }
                 pthread_mutex_unlock(&g_ws_server_mutex);
 
                 if (n < 0) {
@@ -176,8 +183,7 @@ static int callback_keyboard(struct lws *wsi,
                 messages.push(message);
                 pthread_mutex_unlock(&g_ws_server_mutex);
 
-                const char *recved_message = "recved";
-                ecore_pipe_write(agent->get_recv_message_pipe(), recved_message, strlen(recved_message));
+                ecore_pipe_write(agent->get_message_pipe(), RECVED_MESSAGE, strlen(RECVED_MESSAGE));
 
                 /* If we received reply message, let's send signal to wake up our main thread */
                 if (message.type.compare(ISE_MESSAGE_TYPE_STRINGS[ISE_MESSAGE_TYPE_REPLY]) == 0) {
@@ -226,7 +232,7 @@ CWebHelperAgentWebSocket::CWebHelperAgentWebSocket()
         LOGD("WARNING : m_current_instance is NOT NULL");
     }
     m_current_instance = this;
-    m_recv_message_pipe = NULL;
+    m_message_pipe = NULL;
 }
 
 CWebHelperAgentWebSocket::~CWebHelperAgentWebSocket()
@@ -235,17 +241,27 @@ CWebHelperAgentWebSocket::~CWebHelperAgentWebSocket()
         m_current_instance = NULL;
     }
 
-    if (m_recv_message_pipe) {
-        ecore_pipe_del(m_recv_message_pipe);
-        m_recv_message_pipe = NULL;
+    if (m_message_pipe) {
+        ecore_pipe_del(m_message_pipe);
+        m_message_pipe = NULL;
     }
 }
 
-static void recv_message_pipe_handler(void *data, void *buffer, unsigned int nbyte)
+static void message_pipe_handler(void *data, void *buffer, unsigned int nbyte)
 {
-    CWebHelperAgentWebSocket *agent = CWebHelperAgentWebSocket::get_current_instance();
-    if (agent) {
-        agent->process_recved_messages();
+    if (buffer) {
+        if (strncmp((const char*)buffer, RECVED_MESSAGE, strlen(RECVED_MESSAGE)) == 0) {
+            CWebHelperAgentWebSocket *agent = CWebHelperAgentWebSocket::get_current_instance();
+            if (agent) {
+                agent->process_recved_messages();
+            }
+        } else {
+            if (g_ws_server_context) {
+                lws_callback_on_writable_all_protocol(g_ws_server_context, &protocols[PROTOCOL_KEYBOARD]);
+            } else {
+                LOGD("WARNING : g_ws_server_context is NULL");
+            }
+        }
     }
 }
 
@@ -258,8 +274,8 @@ bool CWebHelperAgentWebSocket::init()
     memset(&info, 0, sizeof info);
     info.port = 7681;
 
-    int debug_level = LLL_DEBUG;
-    lws_set_log_level(debug_level, log_func);
+    int log_level = LLL_ERR | LLL_WARN | LLL_DEBUG;
+    lws_set_log_level(log_level, log_func);
 
     info.iface = NULL;
     info.protocols = protocols;
@@ -274,7 +290,7 @@ bool CWebHelperAgentWebSocket::init()
 
     /* The WebSocket server is running on a separate thread, and let the thread send a message
         through this pipe to guarantee thread safety */
-    m_recv_message_pipe = ecore_pipe_add(recv_message_pipe_handler, NULL);
+    m_message_pipe = ecore_pipe_add(message_pipe_handler, NULL);
 
     /* Let's retry creating server context for a certain number of times */
     const int max_retry_num = 10;
@@ -311,9 +327,9 @@ bool CWebHelperAgentWebSocket::exit()
 
     g_ws_server_exit = true;
 
-    if (m_recv_message_pipe) {
-        ecore_pipe_del(m_recv_message_pipe);
-        m_recv_message_pipe = NULL;
+    if (m_message_pipe) {
+        ecore_pipe_del(m_message_pipe);
+        m_message_pipe = NULL;
     }
 
     if (g_ws_server_thread) {
@@ -870,9 +886,9 @@ std::queue<ISE_MESSAGE>& CWebHelperAgentWebSocket::get_recv_message_queue()
     return m_recv_message_queue;
 }
 
-Ecore_Pipe* CWebHelperAgentWebSocket::get_recv_message_pipe()
+Ecore_Pipe* CWebHelperAgentWebSocket::get_message_pipe()
 {
-    return m_recv_message_pipe;
+    return m_message_pipe;
 }
 
 void CWebHelperAgentWebSocket::wait_for_reply_message()
index 811b3f9..98483cb 100644 (file)
@@ -242,7 +242,7 @@ public:
 
     std::queue<ISE_MESSAGE>& get_send_message_queue();
     std::queue<ISE_MESSAGE>& get_recv_message_queue();
-    Ecore_Pipe* get_recv_message_pipe();
+    Ecore_Pipe* get_message_pipe();
 
     void wait_for_reply_message();
 
@@ -257,7 +257,7 @@ protected:
     std::queue<ISE_MESSAGE> m_send_message_queue;
     std::queue<ISE_MESSAGE> m_recv_message_queue;
 
-    Ecore_Pipe *m_recv_message_pipe;
+    Ecore_Pipe *m_message_pipe;
 };
 
 #endif // _WEB_HELPER_AGENT_WEBSOCKET_H_