* limitations under the License.
*
******************************************************************/
+#include "uqueue.h"
#include <stddef.h>
#include <stdlib.h>
#include <stdio.h>
-
#include "logger.h"
-#include "uqueue.h"
#include "oic_malloc.h"
+/**
+ * @def NO_MESSAGES
+ * @brief Number of messages in the queue
+ */
#define NO_MESSAGES 0
-#define TAG PCF("UQUEUE")
-u_queue_t* u_queue_create()
+/**
+ * @def TAG
+ * @brief Logging tag for module name
+ */
+#define TAG "UQUEUE"
+
+u_queue_t *u_queue_create()
{
- u_queue_t* queuePtr = (u_queue_t*) OICMalloc(sizeof(u_queue_t));
+ u_queue_t *queuePtr = (u_queue_t *) OICMalloc(sizeof(u_queue_t));
if (NULL == queuePtr)
{
OIC_LOG(DEBUG, TAG, "QueueCreate FAIL");
return queuePtr;
}
-CAResult_t u_queue_add_element(u_queue_t* queue, u_queue_message_t *message)
+CAResult_t u_queue_add_element(u_queue_t *queue, u_queue_message_t *message)
{
- u_queue_element* element = NULL;
- u_queue_element* ptr = NULL;
+ u_queue_element *element = NULL;
+ u_queue_element *ptr = NULL;
if (NULL == queue)
{
return CA_STATUS_FAILED;
}
- element = (u_queue_element*) OICMalloc(sizeof(u_queue_element));
+ element = (u_queue_element *) OICMalloc(sizeof(u_queue_element));
if (NULL == element)
{
OIC_LOG(DEBUG, TAG, "QueueAddElement FAIL, memory allocation failed");
queue->element = element;
queue->count++;
OIC_LOG_V(DEBUG, TAG, "Queue Count : %d", queue->count);
-
- return CA_STATUS_OK;
}
return CA_STATUS_OK;
}
-u_queue_message_t* u_queue_get_element(u_queue_t* queue)
+u_queue_message_t *u_queue_get_element(u_queue_t *queue)
{
- u_queue_element* next = NULL;
- u_queue_element* element = NULL;
- u_queue_message_t* message = NULL;
+ u_queue_element *element = NULL;
+ u_queue_message_t *message = NULL;
if (NULL == queue)
{
- OIC_LOG(DEBUG, TAG, "QueueAddElement FAIL, Invalid Queue");
+ OIC_LOG(DEBUG, TAG, "QueueGetElement FAIL, Invalid Queue");
return NULL;
}
if (NULL == element)
{
- OIC_LOG(DEBUG, TAG, "QueueGetElement : FAIL, no messages");
return NULL;
}
- next = element->next;
- queue->element = next;
+ queue->element = element->next;
queue->count--;
message = element->message;
return message;
}
-CAResult_t u_queue_remove_element(u_queue_t* queue)
+CAResult_t u_queue_remove_element(u_queue_t *queue)
{
- u_queue_element* next = NULL;
- u_queue_element* remove = NULL;
+ u_queue_element *next = NULL;
+ u_queue_element *remove = NULL;
if (NULL == queue)
{
return CA_STATUS_OK;
}
-uint32_t u_queue_get_size(u_queue_t* queue)
+uint32_t u_queue_get_size(u_queue_t *queue)
{
if (NULL == queue)
{
return queue->count;
}
-CAResult_t u_queue_reset(u_queue_t* queue)
+CAResult_t u_queue_reset(u_queue_t *queue)
{
- CAResult_t error = CA_STATUS_FAILED;
-
if (NULL == queue)
{
OIC_LOG(DEBUG, TAG, "QueueReset FAIL, Invalid Queue");
while (NULL != queue->element)
{
- error = u_queue_remove_element(queue);
- if (error == CA_STATUS_FAILED)
- break;
+ u_queue_remove_element(queue);
}
if (NO_MESSAGES != queue->count)
}
-CAResult_t u_queue_delete(u_queue_t* queue)
+CAResult_t u_queue_delete(u_queue_t *queue)
{
CAResult_t error = CA_STATUS_FAILED;
return (CA_STATUS_OK);
}
-u_queue_message_t* u_queue_get_head(u_queue_t* queue)
+u_queue_message_t *u_queue_get_head(u_queue_t *queue)
{
if (NULL == queue)
{
}
return queue->element->message;
}
+
+CAResult_t u_queue_remove_req_elements(u_queue_t *queue,
+ QueueContextDataDestroy callback, void *ctx,
+ QueueDataDestroyFunction destroy)
+{
+ if (NULL == queue)
+ {
+ OIC_LOG(DEBUG, TAG, "QueueRemoveReqElement FAIL, Invalid Queue");
+ return CA_STATUS_FAILED;
+ }
+
+ if (NULL == callback)
+ {
+ OIC_LOG(DEBUG, TAG, "QueueRemoveReqElement FAIL, NULL callback");
+ return CA_STATUS_FAILED;
+ }
+
+ u_queue_element *cur = queue->element;
+ u_queue_element *prev = NULL;
+ u_queue_element *remove = NULL;
+
+ while (NULL != cur)
+ {
+ if (cur->message && callback(cur->message->msg, cur->message->size, ctx))
+ {
+ remove = cur;
+ if (NULL != prev)
+ {
+ prev->next = cur->next;
+ }
+ else
+ {
+ queue->element = cur->next;
+ }
+ cur = cur->next;
+ if (NULL != destroy)
+ {
+ destroy(remove->message->msg, remove->message->size);
+ }
+ else
+ {
+ OICFree(remove->message->msg);
+ }
+ OICFree(remove->message);
+ OICFree(remove);
+ queue->count--;
+ }
+ else
+ {
+ prev = cur;
+ cur = cur->next;
+ }
+ }
+ return CA_STATUS_OK;
+}
+