Make OCProcessEvent method.
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / camessagehandler.c
index 882ddd4..638610e 100644 (file)
@@ -60,6 +60,10 @@ static ca_thread_pool_t g_threadPoolHandle = NULL;
 static CAQueueingThread_t g_sendThread;
 static CAQueueingThread_t g_receiveThread;
 
+#ifdef WITH_PROCESS_EVENT
+static oc_event g_processEvent = NULL;
+#endif // WITH_PROCESS_EVENT
+
 #else
 #define CA_MAX_RT_ARRAY_SIZE    3
 #endif  // SINGLE_THREAD
@@ -121,6 +125,13 @@ void CAAddDataToReceiveThread(CAData_t *data)
 
     // add thread
     CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t));
+
+#ifdef WITH_PROCESS_EVENT
+    if (g_processEvent)
+    {
+        oc_event_signal(g_processEvent);
+    }
+#endif
 }
 #endif
 
@@ -336,7 +347,14 @@ static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uin
     CAProcessReceivedData(cadata);
 #else
     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
-#endif
+
+#ifdef WITH_PROCESS_EVENT
+    if (g_processEvent)
+    {
+        oc_event_signal(g_processEvent);
+    }
+#endif//WITH_PROCESS_EVENT
+#endif// SINGLE_THREAD
 }
 
 static void CADestroyData(void *data, uint32_t size)
@@ -865,7 +883,7 @@ static CAResult_t CAReceivedPacketCallback(const CASecureEndpoint_t *sep,
         if (CA_NOT_SUPPORTED == res || CA_REQUEST_TIMEOUT == res)
         {
             OIC_LOG(DEBUG, TAG, "this message does not have block option");
-            CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+            CAAddDataToReceiveThread(cadata);
         }
         else
         {
@@ -876,6 +894,13 @@ static CAResult_t CAReceivedPacketCallback(const CASecureEndpoint_t *sep,
 #endif
     {
         CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+
+#ifdef WITH_PROCESS_EVENT
+        if (g_processEvent)
+        {
+            oc_event_signal(g_processEvent);
+        }
+#endif
     }
 #endif // SINGLE_THREAD
 
@@ -932,6 +957,18 @@ static void CAConnectionStateChangedCallback(const CAEndpoint_t *info, bool isCo
     }
 }
 
+static u_queue_message_t *get_receive_queue_item(void)
+{
+    u_queue_message_t *item = NULL;
+
+    oc_mutex_lock(g_receiveThread.threadMutex);
+    item = u_queue_get_element(g_receiveThread.dataQueue);
+    oc_mutex_unlock(g_receiveThread.threadMutex);
+
+    return item;
+}
+
+
 void CAHandleRequestResponseCallbacks()
 {
 #ifdef SINGLE_THREAD
@@ -943,39 +980,44 @@ void CAHandleRequestResponseCallbacks()
     // #1 parse the data
     // #2 get endpoint
 
-    oc_mutex_lock(g_receiveThread.threadMutex);
-
-    u_queue_message_t *item = u_queue_get_element(g_receiveThread.dataQueue);
-
-    oc_mutex_unlock(g_receiveThread.threadMutex);
+    u_queue_message_t *item = NULL;
+#ifdef WITH_PROCESS_EVENT
+    while ((item = get_receive_queue_item()) != NULL)
+#else
+    if ((item = get_receive_queue_item()) != NULL)
+#endif
+    {        if (NULL == item->msg)
+        {
+            OICFree(item);
+#ifdef WITH_PROCESS_EVENT
+            continue;
+#else
+            return;
+#endif
+        }
 
-    if (NULL == item || NULL == item->msg)
-    {
-        return;
-    }
+        // get endpoint
+        CAData_t *td = (CAData_t *) item->msg;
 
-    // get endpoint
-    CAData_t *td = (CAData_t *) item->msg;
+        if (td->requestInfo && g_requestHandler)
+        {
+            OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions);
+            g_requestHandler(td->remoteEndpoint, td->requestInfo);
+        }
+        else if (td->responseInfo && g_responseHandler)
+        {
+            OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions);
+            g_responseHandler(td->remoteEndpoint, td->responseInfo);
+        }
+        else if (td->errorInfo && g_errorHandler)
+        {
+            OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result);
+            g_errorHandler(td->remoteEndpoint, td->errorInfo);
+        }
 
-    if (td->requestInfo && g_requestHandler)
-    {
-        OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions);
-        g_requestHandler(td->remoteEndpoint, td->requestInfo);
-    }
-    else if (td->responseInfo && g_responseHandler)
-    {
-        OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions);
-        g_responseHandler(td->remoteEndpoint, td->responseInfo);
-    }
-    else if (td->errorInfo && g_errorHandler)
-    {
-        OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result);
-        g_errorHandler(td->remoteEndpoint, td->errorInfo);
+        CADestroyData(item->msg, sizeof(CAData_t));
+        OICFree(item);
     }
-
-    CADestroyData(item->msg, sizeof(CAData_t));
-    OICFree(item);
-
 #endif // SINGLE_HANDLE
 #endif // SINGLE_THREAD
 }
@@ -1323,6 +1365,10 @@ void CATerminateMessageHandler()
     u_arraylist_t *list = CAGetSelectedNetworkList();
     uint32_t length = u_arraylist_length(list);
 
+ #ifdef WITH_PROCESS_EVENT
+    g_processEvent = NULL;
+#endif
+
     uint32_t i = 0;
     for (i = 0; i < length; i++)
     {
@@ -1455,6 +1501,13 @@ void CAErrorHandler(const CAEndpoint_t *endpoint,
     cadata->errorInfo->result = result;
 
     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+
+#ifdef WITH_PROCESS_EVENT
+    if (g_processEvent)
+    {
+        oc_event_signal(g_processEvent);
+    }
+#endif
     coap_delete_pdu(pdu);
 #else
     (void)result;
@@ -1508,6 +1561,13 @@ static void CASendErrorInfo(const CAEndpoint_t *endpoint, const CAInfo_t *info,
     cadata->dataType = CA_ERROR_DATA;
 
     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+
+#ifdef WITH_PROCESS_EVENT
+    if (g_processEvent)
+    {
+        oc_event_signal(g_processEvent);
+    }
+#endif//WITH_PROCESS_EVENT
 #endif
     OIC_LOG(DEBUG, TAG, "CASendErrorInfo OUT");
 }
@@ -1819,3 +1879,10 @@ static void CALogPDUInfo(const CAData_t *data, const coap_pdu_t *pdu)
                    pdu->transport_hdr->udp.token_length);
 }
 #endif
+
+#ifdef WITH_PROCESS_EVENT
+void CARegisterMessageProcessEvent(oc_event event)
+{
+    g_processEvent = event;
+}
+#endif // WITH_PROCESS_EVENT