#define SINGLE_HANDLE
#define MAX_THREAD_POOL_SIZE 20
+#define UNUSED(x) (void)(x)
+
// thread pool handle
static ca_thread_pool_t g_threadPoolHandle = NULL;
static CAQueueingThread_t g_sendThread;
static CAQueueingThread_t g_receiveThread;
+#ifdef WITH_PROCESS_EVENT
+static oc_event g_processEvent = NULL;
+#endif // WITH_PROCESS_EVENT
+
#else
#define CA_MAX_RT_ARRAY_SIZE 3
#endif // SINGLE_THREAD
VERIFY_NON_NULL_VOID(data, TAG, "data");
// add thread
- CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
+ CAResult_t result = CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t), true);
+ if (result != CA_STATUS_OK)
+ {
+ OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+ CADestroyData(data, sizeof(CAData_t));
+ }
}
void CAAddDataToReceiveThread(CAData_t *data)
VERIFY_NON_NULL_VOID(data, TAG, "data");
// add thread
- CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t));
+ CAResult_t result = CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t), false);
+ if (result != CA_STATUS_OK)
+ {
+ OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+ CADestroyData(data, sizeof(CAData_t));
+ return;
+ }
+
+#ifdef WITH_PROCESS_EVENT
+ if (g_processEvent)
+ {
+ oc_event_signal(g_processEvent);
+ }
+#endif
}
#endif
#ifdef SINGLE_THREAD
CAProcessReceivedData(cadata);
#else
- CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
-#endif
+ CAResult_t result = CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t), false);
+ if (result != CA_STATUS_OK)
+ {
+ OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+ CADestroyData(cadata, sizeof(CAData_t));
+ return;
+ }
+
+#ifdef WITH_PROCESS_EVENT
+ if (g_processEvent)
+ {
+ oc_event_signal(g_processEvent);
+ }
+#endif//WITH_PROCESS_EVENT
+#endif// SINGLE_THREAD
}
static void CADestroyData(void *data, uint32_t size)
{
return false;
}
-
+ if (!history)
+ {
+ return false;
+ }
if (tokenLength > CA_MAX_TOKEN_LEN)
{
/*
{
OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list");
OICFree(info->token);
+ info->token = NULL;
info->tokenLength = 0;
}
}
if (CA_NOT_SUPPORTED == res || CA_REQUEST_TIMEOUT == res)
{
OIC_LOG(DEBUG, TAG, "this message does not have block option");
- CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+ CAAddDataToReceiveThread(cadata);
}
else
{
else
#endif
{
- CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+ CAResult_t result = CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t), false);
+ if (result != CA_STATUS_OK)
+ {
+ OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+ CADestroyData(cadata, sizeof(CAData_t));
+ coap_delete_pdu(pdu);
+ goto exit;
+ }
+
+#ifdef WITH_PROCESS_EVENT
+ if (g_processEvent)
+ {
+ oc_event_signal(g_processEvent);
+ }
+#endif
}
#endif // SINGLE_THREAD
static bool CAClearQueueEndpointDataContext(void *data, uint32_t size, void *ctx)
{
- (void)size;
-
+ UNUSED(size);
if (NULL == data || NULL == ctx)
{
return false;
{
CAResult_t res = CAQueueingThreadClearContextData(&g_sendThread,
CAClearQueueEndpointDataContext,
- info);
+ (void *)info);
if (CA_STATUS_OK != res)
{
OIC_LOG(ERROR, TAG, "Could not clear the send queue");
}
}
+static u_queue_message_t *get_receive_queue_item(void)
+{
+ u_queue_message_t *item = NULL;
+
+ oc_mutex_lock(g_receiveThread.threadMutex);
+ item = u_queue_get_element(g_receiveThread.dataQueue);
+ oc_mutex_unlock(g_receiveThread.threadMutex);
+
+ return item;
+}
+
+
void CAHandleRequestResponseCallbacks()
{
#ifdef SINGLE_THREAD
// #1 parse the data
// #2 get endpoint
- oc_mutex_lock(g_receiveThread.threadMutex);
-
- u_queue_message_t *item = u_queue_get_element(g_receiveThread.dataQueue);
-
- oc_mutex_unlock(g_receiveThread.threadMutex);
+ u_queue_message_t *item = NULL;
+#ifdef WITH_PROCESS_EVENT
+ while ((item = get_receive_queue_item()) != NULL)
+#else
+ if ((item = get_receive_queue_item()) != NULL)
+#endif
+ { if (NULL == item->msg)
+ {
+ OICFree(item);
+#ifdef WITH_PROCESS_EVENT
+ continue;
+#else
+ return;
+#endif
+ }
- if (NULL == item || NULL == item->msg)
- {
- return;
- }
+ // get endpoint
+ CAData_t *td = (CAData_t *) item->msg;
- // get endpoint
- CAData_t *td = (CAData_t *) item->msg;
+ if (td->requestInfo && g_requestHandler)
+ {
+ OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions);
+ g_requestHandler(td->remoteEndpoint, td->requestInfo);
+ }
+ else if (td->responseInfo && g_responseHandler)
+ {
+ OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions);
+ g_responseHandler(td->remoteEndpoint, td->responseInfo);
+ }
+ else if (td->errorInfo && g_errorHandler)
+ {
+ OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result);
+ g_errorHandler(td->remoteEndpoint, td->errorInfo);
+ }
- if (td->requestInfo && g_requestHandler)
- {
- OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions);
- g_requestHandler(td->remoteEndpoint, td->requestInfo);
- }
- else if (td->responseInfo && g_responseHandler)
- {
- OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions);
- g_responseHandler(td->remoteEndpoint, td->responseInfo);
- }
- else if (td->errorInfo && g_errorHandler)
- {
- OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result);
- g_errorHandler(td->remoteEndpoint, td->errorInfo);
+ CADestroyData(item->msg, sizeof(CAData_t));
+ OICFree(item);
}
-
- CADestroyData(item->msg, sizeof(CAData_t));
- OICFree(item);
-
#endif // SINGLE_HANDLE
#endif // SINGLE_THREAD
}
if (!cadata)
{
OIC_LOG(ERROR, TAG, "cadata memory allocation failed");
- return CA_STATUS_FAILED;
+ return CA_MEMORY_ALLOC_FAILED;
}
CAEndpoint_t* ep = CACloneEndpoint(endpoint);
{
OIC_LOG(ERROR, TAG, "endpoint clone failed");
OICFree(cadata);
- return CA_STATUS_FAILED;
+ return CA_MEMORY_ALLOC_FAILED;
}
cadata->remoteEndpoint = ep;
cadata->eventInfo = event;
cadata->dataType = dataType;
- CAQueueingThreadAddData(&g_sendThread, cadata, sizeof(CAData_t));
+ CAResult_t result = CAQueueingThreadAddData(&g_sendThread, cadata, sizeof(CAData_t), true);
+ if (result != CA_STATUS_OK)
+ {
+ OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+ CADestroyData(cadata, sizeof(CAData_t));
+ return result;
+ }
#endif
return CA_STATUS_OK;
if (CA_NOT_SUPPORTED == res)
{
OIC_LOG(DEBUG, TAG, "normal msg will be sent");
- CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
+ CAResult_t result = CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t), true);
+ if (result != CA_STATUS_OK)
+ {
+ OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+ CADestroyData(data, sizeof(CAData_t));
+ return result;
+ }
return CA_STATUS_OK;
}
else
else
#endif // WITH_BWT
{
- CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
+ CAResult_t result = CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t), true);
+ if (result != CA_STATUS_OK)
+ {
+ OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+ CADestroyData(data, sizeof(CAData_t));
+ return result;
+ }
}
#endif // SINGLE_THREAD
CAResult_t CAInitializeMessageHandler(CATransportAdapter_t transportType)
{
- CASetPacketReceivedCallback(CAReceivedPacketCallback);
- CASetErrorHandleCallback(CAErrorHandler);
+ CASetPacketReceivedCallback((CANetworkPacketReceivedCallback)CAReceivedPacketCallback);
+ CASetErrorHandleCallback((CAErrorHandleCallback)CAErrorHandler);
#ifndef SINGLE_THREAD
// create thread pool
u_arraylist_t *list = CAGetSelectedNetworkList();
uint32_t length = u_arraylist_length(list);
+ #ifdef WITH_PROCESS_EVENT
+ g_processEvent = NULL;
+#endif
+
uint32_t i = 0;
for (i = 0; i < length; i++)
{
cadata->errorInfo->result = result;
- CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+ CAResult_t res = CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t), false);
+ if (res != CA_STATUS_OK)
+ {
+ OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+ CADestroyData(cadata, sizeof(CAData_t));
+ coap_delete_pdu(pdu);
+ return;
+ }
+
+#ifdef WITH_PROCESS_EVENT
+ if (g_processEvent)
+ {
+ oc_event_signal(g_processEvent);
+ }
+#endif
coap_delete_pdu(pdu);
#else
(void)result;
cadata->errorInfo = errorInfo;
cadata->dataType = CA_ERROR_DATA;
- CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+ res = CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t), false);
+ if (res != CA_STATUS_OK)
+ {
+ OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+ CADestroyData(cadata, sizeof(CAData_t));
+ return;
+ }
+
+#ifdef WITH_PROCESS_EVENT
+ if (g_processEvent)
+ {
+ oc_event_signal(g_processEvent);
+ }
+#endif//WITH_PROCESS_EVENT
#endif
OIC_LOG(DEBUG, TAG, "CASendErrorInfo OUT");
}
char log_buffer[1024] = "";
sprintf(log_buffer, "CA_LOG [%5d] | %-13s | %-12s | msg size : %4d | %s", pdu->transport_hdr->udp.id , type, method, pdu->length, info->resourceUri);
+ if(NULL != info)
+ {
+ sprintf(log_buffer, "CA_LOG [%5d] | %-13s | %-12s | msg size : %4d | %s", pdu->transport_hdr->udp.id , type, method, pdu->length, info->resourceUri);
+ }
+
puts(log_buffer);
}
if (NULL != data->remoteEndpoint)
{
CALogAdapterTypeInfo(data->remoteEndpoint->adapter);
- OIC_LOG_V(INFO, ANALYZER_TAG, "Address = [%s]:[%d]", data->remoteEndpoint->addr,
+ OIC_LOG_V(DEBUG, ANALYZER_TAG, "Address = [%s]:[%d]", data->remoteEndpoint->addr,
data->remoteEndpoint->port);
}
OIC_LOG_BUFFER(INFO, ANALYZER_TAG, (const uint8_t *) info->token, info->tokenLength);
OIC_TRACE_BUFFER("OIC_CA_MSG_HANDLE:CALogPDUInfo:token",
(const uint8_t *) info->token, info->tokenLength);
- OIC_LOG_V(INFO, ANALYZER_TAG, "Res URI = [%s]", info->resourceUri);
+ OIC_LOG_V(INFO_PRIVATE, ANALYZER_TAG, "Res URI = [%s]", info->resourceUri);
OIC_TRACE_MARK(%s:CALogPDUInfo:uri:%s, TAG, info->resourceUri);
if (CA_FORMAT_APPLICATION_CBOR == info->payloadFormat)
}
size_t payloadLen = (pdu->data) ? (unsigned char *) pdu->hdr + pdu->length - pdu->data : 0;
- OIC_LOG_V(INFO, ANALYZER_TAG, "CoAP Message Full Size = [%lu]", pdu->length);
+ OIC_LOG_V(INFO, ANALYZER_TAG, "CoAP Message Full Size = [%u]", pdu->length);
OIC_LOG(INFO, ANALYZER_TAG, "CoAP Header (+ 0xFF)");
OIC_LOG_BUFFER(INFO, ANALYZER_TAG, (const uint8_t *) pdu->transport_hdr,
pdu->length - payloadLen);
- OIC_LOG_V(INFO, ANALYZER_TAG, "CoAP Header size = [%lu]", pdu->length - payloadLen);
+ OIC_LOG_V(INFO, ANALYZER_TAG, "CoAP Header size = [%" PRIuPTR "]", (size_t) pdu->length - payloadLen);
OIC_LOG_V(INFO, ANALYZER_TAG, "CoAP Payload");
OIC_LOG_BUFFER(INFO_PRIVATE, ANALYZER_TAG, pdu->data, payloadLen);
- OIC_LOG_V(INFO, ANALYZER_TAG, "CoAP Payload Size = [%lu]", payloadLen);
+ OIC_LOG_V(INFO, ANALYZER_TAG, "CoAP Payload Size = [%" PRIuPTR "]", payloadLen);
OIC_LOG(INFO, ANALYZER_TAG, "=================================================");
// samsung log
if (NULL != data->remoteEndpoint)
{
int i = 0;
- while (NULL != data->remoteEndpoint->addr[i])
+ while (data->remoteEndpoint->addr[i])
{
g_headerBuffer[g_headerIndex++] = data->remoteEndpoint->addr[i];
i++;
if (info->resourceUri)
{
size_t i = 0;
- while (NULL != info->resourceUri[i])
+ while (info->resourceUri[i])
{
g_headerBuffer[g_headerIndex++] = info->resourceUri[i];
i++;
pdu->transport_hdr->udp.token_length);
}
#endif
+
+#ifdef WITH_PROCESS_EVENT
+void CARegisterMessageProcessEvent(oc_event event)
+{
+ g_processEvent = event;
+}
+#endif // WITH_PROCESS_EVENT