static CAQueueingThread_t g_sendThread;
static CAQueueingThread_t g_receiveThread;
+#ifdef WITH_PROCESS_EVENT
+static oc_event g_processEvent = NULL;
+#endif // WITH_PROCESS_EVENT
+
#else
#define CA_MAX_RT_ARRAY_SIZE 3
#endif // SINGLE_THREAD
// add thread
CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t));
+
+#ifdef WITH_PROCESS_EVENT
+ if (g_processEvent)
+ {
+ oc_event_signal(g_processEvent);
+ }
+#endif
}
#endif
CAProcessReceivedData(cadata);
#else
CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
-#endif
+
+#ifdef WITH_PROCESS_EVENT
+ if (g_processEvent)
+ {
+ oc_event_signal(g_processEvent);
+ }
+#endif//WITH_PROCESS_EVENT
+#endif// SINGLE_THREAD
}
static void CADestroyData(void *data, uint32_t size)
if (CA_NOT_SUPPORTED == res || CA_REQUEST_TIMEOUT == res)
{
OIC_LOG(DEBUG, TAG, "this message does not have block option");
- CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+ CAAddDataToReceiveThread(cadata);
}
else
{
#endif
{
CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+
+#ifdef WITH_PROCESS_EVENT
+ if (g_processEvent)
+ {
+ oc_event_signal(g_processEvent);
+ }
+#endif
}
#endif // SINGLE_THREAD
}
}
+static u_queue_message_t *get_receive_queue_item(void)
+{
+ u_queue_message_t *item = NULL;
+
+ oc_mutex_lock(g_receiveThread.threadMutex);
+ item = u_queue_get_element(g_receiveThread.dataQueue);
+ oc_mutex_unlock(g_receiveThread.threadMutex);
+
+ return item;
+}
+
+
void CAHandleRequestResponseCallbacks()
{
#ifdef SINGLE_THREAD
// #1 parse the data
// #2 get endpoint
- oc_mutex_lock(g_receiveThread.threadMutex);
-
- u_queue_message_t *item = u_queue_get_element(g_receiveThread.dataQueue);
+ u_queue_message_t *item = NULL;
+#ifdef WITH_PROCESS_EVENT
+ while ((item = get_receive_queue_item()) != NULL)
+#else
+ if ((item = get_receive_queue_item()) != NULL)
+#endif
+ { if (NULL == item->msg)
+ {
+ OICFree(item);
+#ifdef WITH_PROCESS_EVENT
+ continue;
+#else
+ return;
+#endif
+ }
- oc_mutex_unlock(g_receiveThread.threadMutex);
+ // get endpoint
+ CAData_t *td = (CAData_t *) item->msg;
- if (NULL == item || NULL == item->msg)
- {
- return;
- }
-
- // get endpoint
- CAData_t *td = (CAData_t *) item->msg;
+ if (td->requestInfo && g_requestHandler)
+ {
+ OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions);
+ g_requestHandler(td->remoteEndpoint, td->requestInfo);
+ }
+ else if (td->responseInfo && g_responseHandler)
+ {
+ OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions);
+ g_responseHandler(td->remoteEndpoint, td->responseInfo);
+ }
+ else if (td->errorInfo && g_errorHandler)
+ {
+ OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result);
+ g_errorHandler(td->remoteEndpoint, td->errorInfo);
+ }
- if (td->requestInfo && g_requestHandler)
- {
- OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions);
- g_requestHandler(td->remoteEndpoint, td->requestInfo);
+ CADestroyData(item->msg, sizeof(CAData_t));
+ OICFree(item);
}
- else if (td->responseInfo && g_responseHandler)
- {
- OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions);
- g_responseHandler(td->remoteEndpoint, td->responseInfo);
- }
- else if (td->errorInfo && g_errorHandler)
- {
- OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result);
- g_errorHandler(td->remoteEndpoint, td->errorInfo);
- }
-
- CADestroyData(item->msg, sizeof(CAData_t));
- OICFree(item);
-
#endif // SINGLE_HANDLE
#endif // SINGLE_THREAD
}
if (!cadata)
{
OIC_LOG(ERROR, TAG, "cadata memory allocation failed");
- return CA_STATUS_FAILED;
+ return CA_MEMORY_ALLOC_FAILED;
}
CAEndpoint_t* ep = CACloneEndpoint(endpoint);
{
OIC_LOG(ERROR, TAG, "endpoint clone failed");
OICFree(cadata);
- return CA_STATUS_FAILED;
+ return CA_MEMORY_ALLOC_FAILED;
}
cadata->remoteEndpoint = ep;
u_arraylist_t *list = CAGetSelectedNetworkList();
uint32_t length = u_arraylist_length(list);
+ #ifdef WITH_PROCESS_EVENT
+ g_processEvent = NULL;
+#endif
+
uint32_t i = 0;
for (i = 0; i < length; i++)
{
cadata->errorInfo->result = result;
CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+
+#ifdef WITH_PROCESS_EVENT
+ if (g_processEvent)
+ {
+ oc_event_signal(g_processEvent);
+ }
+#endif
coap_delete_pdu(pdu);
#else
(void)result;
cadata->dataType = CA_ERROR_DATA;
CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+
+#ifdef WITH_PROCESS_EVENT
+ if (g_processEvent)
+ {
+ oc_event_signal(g_processEvent);
+ }
+#endif//WITH_PROCESS_EVENT
#endif
OIC_LOG(DEBUG, TAG, "CASendErrorInfo OUT");
}
char log_buffer[1024] = "";
sprintf(log_buffer, "CA_LOG [%5d] | %-13s | %-12s | msg size : %4d | %s", pdu->transport_hdr->udp.id , type, method, pdu->length, info->resourceUri);
+ if(NULL != info)
+ {
+ sprintf(log_buffer, "CA_LOG [%5d] | %-13s | %-12s | msg size : %4d | %s", pdu->transport_hdr->udp.id , type, method, pdu->length, info->resourceUri);
+ }
+
puts(log_buffer);
}
pdu->transport_hdr->udp.token_length);
}
#endif
+
+#ifdef WITH_PROCESS_EVENT
+void CARegisterMessageProcessEvent(oc_event event)
+{
+ g_processEvent = event;
+}
+#endif // WITH_PROCESS_EVENT