Revert "[CONPRO-1568] Ignoring duplicate IPv6/Ipv4 messages"
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / camessagehandler.c
old mode 100755 (executable)
new mode 100644 (file)
index 394a728..da4320d
@@ -51,6 +51,8 @@
 #define SINGLE_HANDLE
 #define MAX_THREAD_POOL_SIZE    20
 
+#define UNUSED(x) (void)(x)
+
 // thread pool handle
 static ca_thread_pool_t g_threadPoolHandle = NULL;
 
@@ -58,6 +60,10 @@ static ca_thread_pool_t g_threadPoolHandle = NULL;
 static CAQueueingThread_t g_sendThread;
 static CAQueueingThread_t g_receiveThread;
 
+#ifdef WITH_PROCESS_EVENT
+static oc_event g_processEvent = NULL;
+#endif // WITH_PROCESS_EVENT
+
 #else
 #define CA_MAX_RT_ARRAY_SIZE    3
 #endif  // SINGLE_THREAD
@@ -110,7 +116,12 @@ void CAAddDataToSendThread(CAData_t *data)
     VERIFY_NON_NULL_VOID(data, TAG, "data");
 
     // add thread
-    CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
+    CAResult_t result = CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t), true);
+    if (result != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CADestroyData(data, sizeof(CAData_t));
+    }
 }
 
 void CAAddDataToReceiveThread(CAData_t *data)
@@ -118,7 +129,20 @@ void CAAddDataToReceiveThread(CAData_t *data)
     VERIFY_NON_NULL_VOID(data, TAG, "data");
 
     // add thread
-    CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t));
+    CAResult_t result = CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t), false);
+    if (result != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CADestroyData(data, sizeof(CAData_t));
+        return;
+    }
+
+#ifdef WITH_PROCESS_EVENT
+    if (g_processEvent)
+    {
+        oc_event_signal(g_processEvent);
+    }
+#endif
 }
 #endif
 
@@ -333,8 +357,21 @@ static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uin
 #ifdef SINGLE_THREAD
     CAProcessReceivedData(cadata);
 #else
-    CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
-#endif
+    CAResult_t result = CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t), false);
+    if (result != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CADestroyData(cadata, sizeof(CAData_t));
+        return;
+    }
+
+#ifdef WITH_PROCESS_EVENT
+    if (g_processEvent)
+    {
+        oc_event_signal(g_processEvent);
+    }
+#endif//WITH_PROCESS_EVENT
+#endif// SINGLE_THREAD
 }
 
 static void CADestroyData(void *data, uint32_t size)
@@ -711,7 +748,10 @@ static bool CADropSecondMessage(CAHistory_t *history, const CAEndpoint_t *ep, ui
     {
         return false;
     }
-
+    if (!history)
+    {
+        return false;
+    }
     if (tokenLength > CA_MAX_TOKEN_LEN)
     {
         /*
@@ -841,6 +881,7 @@ static CAResult_t CAReceivedPacketCallback(const CASecureEndpoint_t *sep,
                     {
                         OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list");
                         OICFree(info->token);
+                        info->token = NULL;
                         info->tokenLength = 0;
                     }
                 }
@@ -863,7 +904,7 @@ static CAResult_t CAReceivedPacketCallback(const CASecureEndpoint_t *sep,
         if (CA_NOT_SUPPORTED == res || CA_REQUEST_TIMEOUT == res)
         {
             OIC_LOG(DEBUG, TAG, "this message does not have block option");
-            CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+            CAAddDataToReceiveThread(cadata);
         }
         else
         {
@@ -873,7 +914,21 @@ static CAResult_t CAReceivedPacketCallback(const CASecureEndpoint_t *sep,
     else
 #endif
     {
-        CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+        CAResult_t result = CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t), false);
+        if (result != CA_STATUS_OK)
+        {
+            OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+            CADestroyData(cadata, sizeof(CAData_t));
+            coap_delete_pdu(pdu);
+            goto exit;
+        }
+
+#ifdef WITH_PROCESS_EVENT
+        if (g_processEvent)
+        {
+            oc_event_signal(g_processEvent);
+        }
+#endif
     }
 #endif // SINGLE_THREAD
 
@@ -895,6 +950,7 @@ static void CAAdapterStateChangedCallback(CATransportAdapter_t transportType, bo
 
 static bool CAClearQueueEndpointDataContext(void *data, uint32_t size, void *ctx)
 {
+    UNUSED(size);
     if (NULL == data || NULL == ctx)
     {
         return false;
@@ -921,7 +977,7 @@ static void CAConnectionStateChangedCallback(const CAEndpoint_t *info, bool isCo
     {
         CAResult_t res = CAQueueingThreadClearContextData(&g_sendThread,
                                                           CAClearQueueEndpointDataContext,
-                                                          info);
+                                                          (void *)info);
         if (CA_STATUS_OK != res)
         {
             OIC_LOG(ERROR, TAG, "Could not clear the send queue");
@@ -929,6 +985,18 @@ static void CAConnectionStateChangedCallback(const CAEndpoint_t *info, bool isCo
     }
 }
 
+static u_queue_message_t *get_receive_queue_item(void)
+{
+    u_queue_message_t *item = NULL;
+
+    oc_mutex_lock(g_receiveThread.threadMutex);
+    item = u_queue_get_element(g_receiveThread.dataQueue);
+    oc_mutex_unlock(g_receiveThread.threadMutex);
+
+    return item;
+}
+
+
 void CAHandleRequestResponseCallbacks()
 {
 #ifdef SINGLE_THREAD
@@ -940,39 +1008,44 @@ void CAHandleRequestResponseCallbacks()
     // #1 parse the data
     // #2 get endpoint
 
-    oc_mutex_lock(g_receiveThread.threadMutex);
-
-    u_queue_message_t *item = u_queue_get_element(g_receiveThread.dataQueue);
+    u_queue_message_t *item = NULL;
+#ifdef WITH_PROCESS_EVENT
+    while ((item = get_receive_queue_item()) != NULL)
+#else
+    if ((item = get_receive_queue_item()) != NULL)
+#endif
+    {        if (NULL == item->msg)
+        {
+            OICFree(item);
+#ifdef WITH_PROCESS_EVENT
+            continue;
+#else
+            return;
+#endif
+        }
 
-    oc_mutex_unlock(g_receiveThread.threadMutex);
+        // get endpoint
+        CAData_t *td = (CAData_t *) item->msg;
 
-    if (NULL == item || NULL == item->msg)
-    {
-        return;
-    }
-
-    // get endpoint
-    CAData_t *td = (CAData_t *) item->msg;
+        if (td->requestInfo && g_requestHandler)
+        {
+            OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions);
+            g_requestHandler(td->remoteEndpoint, td->requestInfo);
+        }
+        else if (td->responseInfo && g_responseHandler)
+        {
+            OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions);
+            g_responseHandler(td->remoteEndpoint, td->responseInfo);
+        }
+        else if (td->errorInfo && g_errorHandler)
+        {
+            OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result);
+            g_errorHandler(td->remoteEndpoint, td->errorInfo);
+        }
 
-    if (td->requestInfo && g_requestHandler)
-    {
-        OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions);
-        g_requestHandler(td->remoteEndpoint, td->requestInfo);
-    }
-    else if (td->responseInfo && g_responseHandler)
-    {
-        OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions);
-        g_responseHandler(td->remoteEndpoint, td->responseInfo);
+        CADestroyData(item->msg, sizeof(CAData_t));
+        OICFree(item);
     }
-    else if (td->errorInfo && g_errorHandler)
-    {
-        OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result);
-        g_errorHandler(td->remoteEndpoint, td->errorInfo);
-    }
-
-    CADestroyData(item->msg, sizeof(CAData_t));
-    OICFree(item);
-
 #endif // SINGLE_HANDLE
 #endif // SINGLE_THREAD
 }
@@ -1066,7 +1139,7 @@ CAResult_t CADetachSendNetworkReqMessage(const CAEndpoint_t *endpoint,
     if (!cadata)
     {
         OIC_LOG(ERROR, TAG, "cadata memory allocation failed");
-        return CA_STATUS_FAILED;
+        return CA_MEMORY_ALLOC_FAILED;
     }
 
     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
@@ -1074,14 +1147,20 @@ CAResult_t CADetachSendNetworkReqMessage(const CAEndpoint_t *endpoint,
     {
         OIC_LOG(ERROR, TAG, "endpoint clone failed");
         OICFree(cadata);
-        return CA_STATUS_FAILED;
+        return CA_MEMORY_ALLOC_FAILED;
     }
 
     cadata->remoteEndpoint = ep;
     cadata->eventInfo = event;
     cadata->dataType = dataType;
 
-    CAQueueingThreadAddData(&g_sendThread, cadata, sizeof(CAData_t));
+    CAResult_t result = CAQueueingThreadAddData(&g_sendThread, cadata, sizeof(CAData_t), true);
+    if (result != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CADestroyData(cadata, sizeof(CAData_t));
+        return result;
+    }
 #endif
 
     return CA_STATUS_OK;
@@ -1137,7 +1216,13 @@ CAResult_t CADetachSendMessage(const CAEndpoint_t *endpoint, const void *sendMsg
         if (CA_NOT_SUPPORTED == res)
         {
             OIC_LOG(DEBUG, TAG, "normal msg will be sent");
-            CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
+            CAResult_t result = CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t), true);
+            if (result != CA_STATUS_OK)
+            {
+                OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+                CADestroyData(data, sizeof(CAData_t));
+                return result;
+            }
             return CA_STATUS_OK;
         }
         else
@@ -1150,7 +1235,13 @@ CAResult_t CADetachSendMessage(const CAEndpoint_t *endpoint, const void *sendMsg
     else
 #endif // WITH_BWT
     {
-        CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
+        CAResult_t result = CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t), true);
+        if (result != CA_STATUS_OK)
+        {
+            OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+            CADestroyData(data, sizeof(CAData_t));
+            return result;
+        }
     }
 #endif // SINGLE_THREAD
 
@@ -1172,8 +1263,8 @@ void CASetNetworkMonitorCallback(CANetworkMonitorCallback nwMonitorHandler)
 
 CAResult_t CAInitializeMessageHandler(CATransportAdapter_t transportType)
 {
-    CASetPacketReceivedCallback(CAReceivedPacketCallback);
-    CASetErrorHandleCallback(CAErrorHandler);
+    CASetPacketReceivedCallback((CANetworkPacketReceivedCallback)CAReceivedPacketCallback);
+    CASetErrorHandleCallback((CAErrorHandleCallback)CAErrorHandler);
 
 #ifndef SINGLE_THREAD
     // create thread pool
@@ -1272,6 +1363,8 @@ CAResult_t CAInitializeMessageHandler(CATransportAdapter_t transportType)
 
 static bool CAClearQueueAdapterDataContext(void *data, uint32_t size, void *ctx)
 {
+    (void)size;
+
     if (NULL == data || NULL == ctx)
     {
         return false;
@@ -1318,6 +1411,10 @@ void CATerminateMessageHandler()
     u_arraylist_t *list = CAGetSelectedNetworkList();
     uint32_t length = u_arraylist_length(list);
 
+ #ifdef WITH_PROCESS_EVENT
+    g_processEvent = NULL;
+#endif
+
     uint32_t i = 0;
     for (i = 0; i < length; i++)
     {
@@ -1449,7 +1546,21 @@ void CAErrorHandler(const CAEndpoint_t *endpoint,
 
     cadata->errorInfo->result = result;
 
-    CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+    CAResult_t res = CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t), false);
+    if (res != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CADestroyData(cadata, sizeof(CAData_t));
+        coap_delete_pdu(pdu);
+        return;
+    }
+
+#ifdef WITH_PROCESS_EVENT
+    if (g_processEvent)
+    {
+        oc_event_signal(g_processEvent);
+    }
+#endif
     coap_delete_pdu(pdu);
 #else
     (void)result;
@@ -1502,7 +1613,20 @@ static void CASendErrorInfo(const CAEndpoint_t *endpoint, const CAInfo_t *info,
     cadata->errorInfo = errorInfo;
     cadata->dataType = CA_ERROR_DATA;
 
-    CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+    res = CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t), false);
+    if (res != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CADestroyData(cadata, sizeof(CAData_t));
+        return;
+    }
+
+#ifdef WITH_PROCESS_EVENT
+    if (g_processEvent)
+    {
+        oc_event_signal(g_processEvent);
+    }
+#endif//WITH_PROCESS_EVENT
 #endif
     OIC_LOG(DEBUG, TAG, "CASendErrorInfo OUT");
 }
@@ -1577,6 +1701,11 @@ static void CALogPDUInfo(const CAData_t *data, const coap_pdu_t *pdu)
        char log_buffer[1024] = "";
        sprintf(log_buffer, "CA_LOG [%5d] | %-13s | %-12s | msg size : %4d | %s", pdu->transport_hdr->udp.id , type, method, pdu->length, info->resourceUri);
 
+       if(NULL != info)
+       {
+               sprintf(log_buffer, "CA_LOG [%5d] | %-13s | %-12s | msg size : %4d | %s", pdu->transport_hdr->udp.id , type, method, pdu->length, info->resourceUri);
+       }
+
        puts(log_buffer);
 }
 
@@ -1604,7 +1733,7 @@ static void CALogPDUInfo(const CAData_t *data, const coap_pdu_t *pdu)
     if (NULL != data->remoteEndpoint)
     {
         CALogAdapterTypeInfo(data->remoteEndpoint->adapter);
-        OIC_LOG_V(INFO, ANALYZER_TAG, "Address = [%s]:[%d]", data->remoteEndpoint->addr,
+        OIC_LOG_V(DEBUG, ANALYZER_TAG, "Address = [%s]:[%d]", data->remoteEndpoint->addr,
                   data->remoteEndpoint->port);
     }
 
@@ -1668,7 +1797,7 @@ static void CALogPDUInfo(const CAData_t *data, const coap_pdu_t *pdu)
         OIC_LOG_BUFFER(INFO, ANALYZER_TAG, (const uint8_t *) info->token, info->tokenLength);
         OIC_TRACE_BUFFER("OIC_CA_MSG_HANDLE:CALogPDUInfo:token",
                          (const uint8_t *) info->token, info->tokenLength);
-        OIC_LOG_V(INFO, ANALYZER_TAG, "Res URI = [%s]", info->resourceUri);
+        OIC_LOG_V(INFO_PRIVATE, ANALYZER_TAG, "Res URI = [%s]", info->resourceUri);
         OIC_TRACE_MARK(%s:CALogPDUInfo:uri:%s, TAG, info->resourceUri);
 
         if (CA_FORMAT_APPLICATION_CBOR == info->payloadFormat)
@@ -1682,15 +1811,15 @@ static void CALogPDUInfo(const CAData_t *data, const coap_pdu_t *pdu)
     }
 
     size_t payloadLen = (pdu->data) ? (unsigned char *) pdu->hdr + pdu->length - pdu->data : 0;
-    OIC_LOG_V(INFO, ANALYZER_TAG, "CoAP Message Full Size = [%lu]", pdu->length);
+    OIC_LOG_V(INFO, ANALYZER_TAG, "CoAP Message Full Size = [%u]", pdu->length);
     OIC_LOG(INFO, ANALYZER_TAG, "CoAP Header (+ 0xFF)");
     OIC_LOG_BUFFER(INFO, ANALYZER_TAG,  (const uint8_t *) pdu->transport_hdr,
                    pdu->length - payloadLen);
-    OIC_LOG_V(INFO, ANALYZER_TAG, "CoAP Header size = [%lu]", pdu->length - payloadLen);
+    OIC_LOG_V(INFO, ANALYZER_TAG, "CoAP Header size = [%" PRIuPTR "]", (size_t) pdu->length - payloadLen);
 
     OIC_LOG_V(INFO, ANALYZER_TAG, "CoAP Payload");
     OIC_LOG_BUFFER(INFO_PRIVATE, ANALYZER_TAG, pdu->data, payloadLen);
-    OIC_LOG_V(INFO, ANALYZER_TAG, "CoAP Payload Size = [%lu]", payloadLen);
+    OIC_LOG_V(INFO, ANALYZER_TAG, "CoAP Payload Size = [%" PRIuPTR "]", payloadLen);
     OIC_LOG(INFO, ANALYZER_TAG, "=================================================");
 
     // samsung log
@@ -1731,7 +1860,7 @@ static void CASamsungLogMessage(const CAData_t *data, const coap_pdu_t *pdu)
     if (NULL != data->remoteEndpoint)
     {
         int i = 0;
-        while (NULL != data->remoteEndpoint->addr[i])
+        while (data->remoteEndpoint->addr[i])
         {
             g_headerBuffer[g_headerIndex++] = data->remoteEndpoint->addr[i];
             i++;
@@ -1786,7 +1915,7 @@ static void CASamsungLogMessage(const CAData_t *data, const coap_pdu_t *pdu)
     if (info->resourceUri)
     {
         size_t i = 0;
-        while (NULL != info->resourceUri[i])
+        while (info->resourceUri[i])
         {
             g_headerBuffer[g_headerIndex++] = info->resourceUri[i];
             i++;
@@ -1814,3 +1943,10 @@ static void CALogPDUInfo(const CAData_t *data, const coap_pdu_t *pdu)
                    pdu->transport_hdr->udp.token_length);
 }
 #endif
+
+#ifdef WITH_PROCESS_EVENT
+void CARegisterMessageProcessEvent(oc_event event)
+{
+    g_processEvent = event;
+}
+#endif // WITH_PROCESS_EVENT