#define SINGLE_HANDLE
#define MAX_THREAD_POOL_SIZE 20
+#define UNUSED(x) (void)(x)
+
// thread pool handle
static ca_thread_pool_t g_threadPoolHandle = NULL;
#define CA_MAX_RT_ARRAY_SIZE 3
#endif // SINGLE_THREAD
-#define RECEIVE_QUEUE_MAX_SIZE 100
-
#define TAG "OIC_CA_MSG_HANDLE"
static CARetransmission_t g_retransmissionContext;
return ret;
}
-// NOTE: This is test method for CONPRO-1172
-static SendDirectErrorResponsetoPeer(CAData_t* cadata)
-{
- CARequestInfo_t *reqInfo = cadata->requestInfo;
-
- CAResponseInfo_t respInfo = { .result = CA_INTERNAL_SERVER_ERROR };
- respInfo.info.messageId = reqInfo->info.messageId;
- respInfo.info.numOptions = reqInfo->info.numOptions;
-
- if (respInfo.info.numOptions)
- {
- respInfo.info.options =
- (CAHeaderOption_t *)OICCalloc(respInfo.info.numOptions, sizeof(CAHeaderOption_t));
- memcpy (respInfo.info.options, reqInfo->info.options,
- sizeof(CAHeaderOption_t) * respInfo.info.numOptions);
-
- }
-
- respInfo.info.payload = NULL;
- respInfo.info.token = reqInfo->info.token;
- respInfo.info.tokenLength = reqInfo->info.tokenLength;
- respInfo.info.type = reqInfo->info.type;
- respInfo.info.resourceUri = OICStrdup (reqInfo->info.resourceUri);
- respInfo.info.acceptFormat = CA_FORMAT_UNDEFINED;
- respInfo.info.dataType = CA_RESPONSE_DATA;
-
- CAResult_t caResult = CASendResponse(cadata->remoteEndpoint, &respInfo);
- // resourceUri in the info field is cloned in the CA layer and
- // thus ownership is still here.
- OICFree (respInfo.info.resourceUri);
- OICFree (respInfo.info.options);
- if(CA_STATUS_OK != caResult)
- {
- OIC_LOG_V(ERROR, TAG, "CASendResponse error - %d", caResult);
- }
-}
-
static CAResult_t CAReceivedPacketCallback(const CASecureEndpoint_t *sep,
const void *data, uint32_t dataLen)
{
else
#endif
{
- // NOTE: This is test logic for CONPRO-1172
- size_t queueSize = CAQueueingThreadGetQueueSize(&g_receiveThread);
- if (queueSize <= RECEIVE_QUEUE_MAX_SIZE)
- {
- CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
- }
- else
- {
- OIC_LOG_V(INFO, TAG, "Receive queue size(%"PRIuPTR") is > its threshold limit(%d)", queueSize, RECEIVE_QUEUE_MAX_SIZE);
- SendDirectErrorResponsetoPeer(cadata);
- CADestroyData(cadata, sizeof(CAData_t));
- }
+ CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
}
#endif // SINGLE_THREAD
static bool CAClearQueueEndpointDataContext(void *data, uint32_t size, void *ctx)
{
- (void)size;
-
+ UNUSED(size);
if (NULL == data || NULL == ctx)
{
return false;
CAResult_t CAInitializeMessageHandler(CATransportAdapter_t transportType)
{
- CASetPacketReceivedCallback(CAReceivedPacketCallback);
- CASetErrorHandleCallback(CAErrorHandler);
+ CASetPacketReceivedCallback((CANetworkPacketReceivedCallback)CAReceivedPacketCallback);
+ CASetErrorHandleCallback((CAErrorHandleCallback)CAErrorHandler);
#ifndef SINGLE_THREAD
// create thread pool
if (NULL != data->remoteEndpoint)
{
CALogAdapterTypeInfo(data->remoteEndpoint->adapter);
- OIC_LOG_V(INFO, ANALYZER_TAG, "Address = [%s]:[%d]", data->remoteEndpoint->addr,
+ OIC_LOG_V(DEBUG, ANALYZER_TAG, "Address = [%s]:[%d]", data->remoteEndpoint->addr,
data->remoteEndpoint->port);
}