#include "ocobserve.h"
#include "ocresourcehandler.h"
#include "ocrandom.h"
-#include "ocmalloc.h"
+#include "oic_malloc.h"
+#include "oic_string.h"
+#include "ocpayload.h"
#include "ocserverrequest.h"
+#include "octhread.h"
+#include "logger.h"
-#include "utlist.h"
-#include "pdu.h"
+#include <coap/utlist.h>
+#include <coap/pdu.h>
+#include <coap/coap.h>
// Module Name
-#define MOD_NAME PCF("ocobserve")
+#define MOD_NAME "ocobserve"
-#define TAG PCF("OCStackObserve")
+#define TAG "OIC_RI_OBSERVE"
-#define VERIFY_NON_NULL(arg) { if (!arg) {OC_LOG(FATAL, TAG, #arg " is NULL"); goto exit;} }
+#define VERIFY_NON_NULL(arg) { if (!arg) {OIC_LOG(FATAL, TAG, #arg " is NULL"); goto exit;} }
-static struct ResourceObserver * serverObsList = NULL;
+static struct ResourceObserver * g_serverObsList = NULL;
+static oc_mutex g_serverObsListMutex = NULL;
-// send notifications based on the qos of the request
-// The qos passed as a parameter overrides what the client requested
-// If we want the client preference taking high priority make:
-// qos = resourceObserver->qos;
-OCQualityOfService DetermineObserverQoS(OCMethod method, ResourceObserver * resourceObserver,
- OCQualityOfService appQoS)
+static ResourceObserver* GetObserverUsingIdAsOwner (const OCObservationId observeId);
+
+static ResourceObserver* CloneObserverNode (ResourceObserver* observer)
+{
+ ResourceObserver* dupObsNode = NULL;
+ if (observer)
+ {
+ dupObsNode = (ResourceObserver *) OICCalloc(1, sizeof(ResourceObserver));
+ VERIFY_NON_NULL(dupObsNode);
+ memcpy(dupObsNode, observer, sizeof(ResourceObserver));
+
+ if (observer->resUri)
+ {
+ dupObsNode->resUri = OICStrdup(observer->resUri);
+ VERIFY_NON_NULL(dupObsNode->resUri);
+ }
+
+ if (observer->query)
+ {
+ dupObsNode->query = OICStrdup(observer->query);
+ VERIFY_NON_NULL(dupObsNode->query);
+ }
+
+ if (observer->token)
+ {
+ dupObsNode->token = (CAToken_t)OICMalloc(observer->tokenLength);
+ VERIFY_NON_NULL(dupObsNode->token);
+ memcpy(dupObsNode->token, observer->token, observer->tokenLength);
+ }
+
+ dupObsNode->next = NULL;
+ }
+
+ return dupObsNode;
+
+exit:
+ FreeObserver(dupObsNode);
+ return NULL;
+}
+
+static void FreeObserverList (ResourceObserver* list)
+{
+ ResourceObserver* head = list;
+ ResourceObserver* del = NULL;
+ while (NULL != head)
+ {
+ del = head;
+ head = head->next;
+
+ OICFree(del->resUri);
+ OICFree(del->query);
+ OICFree(del->token);
+ OICFree(del);
+ }
+}
+
+static ResourceObserver* CloneObserverList (ResourceObserver* obsList)
{
+ ResourceObserver* dupList = NULL;
+ ResourceObserver* out = NULL;
+
+ LL_FOREACH(obsList, out)
+ {
+ ResourceObserver *obsNode = CloneObserverNode(out);
+ if (NULL == obsNode)
+ {
+ FreeObserverList(dupList);
+ dupList = NULL;
+ break;
+ }
+
+ LL_APPEND(dupList, obsNode);
+ }
+
+ return dupList;
+}
+
+/**
+ * Determine observe QOS based on the QOS of the request.
+ * The qos passed as a parameter overrides what the client requested.
+ * If we want the client preference taking high priority make:
+ * qos = resourceObserver->qos;
+ *
+ * @param method RESTful method.
+ * @param resourceObserver Observer.
+ * @param appQoS Quality of service.
+ * @return The quality of service of the observer.
+ */
+static OCQualityOfService DetermineObserverQoS(OCMethod method,
+ ResourceObserver * resourceObserver, OCQualityOfService appQoS)
+{
+ if (!resourceObserver)
+ {
+ OIC_LOG(ERROR, TAG, "DetermineObserverQoS called with invalid resourceObserver");
+ return OC_NA_QOS;
+ }
+
OCQualityOfService decidedQoS = appQoS;
- if(appQoS == OC_NA_QOS)
+ if (appQoS == OC_NA_QOS)
{
decidedQoS = resourceObserver->qos;
}
- if(appQoS != OC_HIGH_QOS)
+ if (appQoS != OC_HIGH_QOS)
{
- OC_LOG_V(INFO, TAG, "Current NON count for this observer is %d",
+ OIC_LOG_V(INFO, TAG, "Current NON count for this observer is %d",
resourceObserver->lowQosCount);
- #ifdef WITH_PRESENCE
- if((resourceObserver->forceHighQos \
+#ifdef WITH_PRESENCE
+ if ((resourceObserver->forceHighQos \
|| resourceObserver->lowQosCount >= MAX_OBSERVER_NON_COUNT) \
&& method != OC_REST_PRESENCE)
- #else
- if(resourceObserver->forceHighQos \
+#else
+ if (resourceObserver->forceHighQos \
|| resourceObserver->lowQosCount >= MAX_OBSERVER_NON_COUNT)
- #endif
- {
+#endif
+ {
resourceObserver->lowQosCount = 0;
// at some point we have to to send CON to check on the
// availability of observer
- OC_LOG(INFO, TAG, PCF("This time we are sending the notification as High qos"));
+ OIC_LOG(INFO, TAG, "This time we are sending the notification as High qos");
decidedQoS = OC_HIGH_QOS;
- }
+ }
else
{
(resourceObserver->lowQosCount)++;
return decidedQoS;
}
+/**
+ * Create a get request and pass to entityhandler to notify specific observer.
+ *
+ * @param observer Observer that need to be notified.
+ * @param qos Quality of service of resource.
+ *
+ * @return ::OC_STACK_OK on success, some other value upon failure.
+ */
+static OCStackResult SendObserveNotification(ResourceObserver *observer,
+ OCQualityOfService qos)
+{
+ OCStackResult result = OC_STACK_ERROR;
+ OCServerRequest * request = NULL;
+ OCEntityHandlerRequest ehRequest = {0};
+ OCEntityHandlerResult ehResult = OC_EH_ERROR;
+
+ result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET,
+ 0, observer->resource->sequenceNum, qos,
+ observer->query, NULL, NULL,
+ observer->token, observer->tokenLength,
+ observer->resUri, 0, observer->acceptFormat,
+ &observer->devAddr);
+
+ if (request)
+ {
+ request->observeResult = OC_STACK_OK;
+ if (result == OC_STACK_OK)
+ {
+ result = FormOCEntityHandlerRequest(
+ &ehRequest,
+ (OCRequestHandle) request->requestId,
+ request->method,
+ &request->devAddr,
+ (OCResourceHandle) observer->resource,
+ request->query,
+ PAYLOAD_TYPE_REPRESENTATION,
+ request->payload,
+ request->payloadSize,
+ request->numRcvdVendorSpecificHeaderOptions,
+ request->rcvdVendorSpecificHeaderOptions,
+ OC_OBSERVE_NO_OPTION,
+ 0,
+ request->coapID);
+ if (result == OC_STACK_OK)
+ {
+ ehResult = observer->resource->entityHandler(OC_REQUEST_FLAG, &ehRequest,
+ observer->resource->entityHandlerCallbackParam);
+
+ // Clear server request on error case
+ if (!OCResultToSuccess(EntityHandlerCodeToOCStackCode(ehResult)))
+ {
+ FindAndDeleteServerRequest(request);
+ }
+ // Reset Observer TTL.
+ observer->TTL = GetTicks(MAX_OBSERVER_TTL_SECONDS * MILLISECONDS_PER_SECOND);
+ }
+ OCPayloadDestroy(ehRequest.payload);
+ }
+ }
+
+ return result;
+}
+
#ifdef WITH_PRESENCE
OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, uint32_t maxAge,
- OCResourceType *resourceType, OCQualityOfService qos)
+ OCPresenceTrigger trigger, OCResourceType *resourceType, OCQualityOfService qos)
#else
OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, uint32_t maxAge,
OCQualityOfService qos)
#endif
{
- OC_LOG(INFO, TAG, PCF("Entering SendObserverNotification"));
+ OIC_LOG(INFO, TAG, "Entering SendObserverNotification");
+ if (!resPtr)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+
OCStackResult result = OC_STACK_ERROR;
- ResourceObserver * resourceObserver = serverObsList;
+ ResourceObserver * resourceObserver = NULL;
uint8_t numObs = 0;
OCServerRequest * request = NULL;
- OCEntityHandlerRequest ehRequest = {0};
- OCEntityHandlerResult ehResult = OC_EH_ERROR;
+ bool observeErrorFlag = false;
// Find clients that are observing this resource
+ oc_mutex_lock(g_serverObsListMutex);
+ resourceObserver = g_serverObsList;
while (resourceObserver)
{
if (resourceObserver->resource == resPtr)
{
numObs++;
- #ifdef WITH_PRESENCE
- if(method != OC_REST_PRESENCE)
+#ifdef WITH_PRESENCE
+ if (method != OC_REST_PRESENCE)
{
- #endif
+#endif
qos = DetermineObserverQoS(method, resourceObserver, qos);
-
- result = AddServerRequest(&request, 0, 0, 0, 1, OC_REST_GET,
- 0, resPtr->sequenceNum, qos, resourceObserver->query,
- NULL, NULL,
- &resourceObserver->token, resourceObserver->resUri, 0,
- &(resourceObserver->addressInfo), resourceObserver->connectivityType);
-
- if(request)
- {
- request->observeResult = OC_STACK_OK;
- if(result == OC_STACK_OK)
- {
- result = FormOCEntityHandlerRequest(&ehRequest, (OCRequestHandle) request,
- request->method, (OCResourceHandle) resPtr, request->query,
- request->reqJSONPayload, request->numRcvdVendorSpecificHeaderOptions,
- request->rcvdVendorSpecificHeaderOptions, OC_OBSERVE_NO_OPTION, 0);
- if(result == OC_STACK_OK)
- {
- ehResult = resPtr->entityHandler(OC_REQUEST_FLAG, &ehRequest);
- if(ehResult == OC_EH_ERROR)
- {
- FindAndDeleteServerRequest(request);
- }
- }
- }
- }
- #ifdef WITH_PRESENCE
+ result = SendObserveNotification(resourceObserver, qos);
+#ifdef WITH_PRESENCE
}
else
{
OCEntityHandlerResponse ehResponse = {0};
- unsigned char presenceResBuf[MAX_RESPONSE_LENGTH] = {0};
+
//This is effectively the implementation for the presence entity handler.
- OC_LOG(DEBUG, TAG, PCF("This notification is for Presence"));
- result = AddServerRequest(&request, 0, 0, 0, 1, OC_REST_GET,
+ OIC_LOG(DEBUG, TAG, "This notification is for Presence");
+ result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET,
0, resPtr->sequenceNum, qos, resourceObserver->query,
NULL, NULL,
- &resourceObserver->token, resourceObserver->resUri, 0,
- &(resourceObserver->addressInfo), resourceObserver->connectivityType);
+ resourceObserver->token, resourceObserver->tokenLength,
+ resourceObserver->resUri, 0, resourceObserver->acceptFormat,
+ &resourceObserver->devAddr);
- if(result == OC_STACK_OK)
+ if (result == OC_STACK_OK)
{
- // we create the payload here
- if(resourceType && resourceType->resourcetypename)
+ OCPresencePayload* presenceResBuf = OCPresencePayloadCreate(
+ resPtr->sequenceNum, maxAge, trigger,
+ resourceType ? resourceType->resourcetypename : NULL);
+
+ if (!presenceResBuf)
{
- snprintf((char *)presenceResBuf, sizeof(presenceResBuf), "%u:%u:%s",
- resPtr->sequenceNum, maxAge, resourceType->resourcetypename);
+ oc_mutex_unlock(g_serverObsListMutex);
+ return OC_STACK_NO_MEMORY;
}
- else
+
+ if (result == OC_STACK_OK)
{
- snprintf((char *)presenceResBuf, sizeof(presenceResBuf), "%u:%u",
- resPtr->sequenceNum, maxAge);
+ ehResponse.ehResult = OC_EH_OK;
+ ehResponse.payload = (OCPayload*)presenceResBuf;
+ ehResponse.persistentBufferFlag = 0;
+ ehResponse.requestHandle = (OCRequestHandle) request->requestId;
+ ehResponse.resourceHandle = (OCResourceHandle) resPtr;
+ OICStrcpy(ehResponse.resourceUri, sizeof(ehResponse.resourceUri),
+ resourceObserver->resUri);
+ result = OCDoResponse(&ehResponse);
+ if (result != OC_STACK_OK)
+ {
+ OIC_LOG(ERROR, TAG, "Failed to send presence notification!");
+ FindAndDeleteServerRequest(request);
+ }
}
- ehResponse.ehResult = OC_EH_OK;
- ehResponse.payload = presenceResBuf;
- ehResponse.payloadSize = strlen((const char *)presenceResBuf) + 1;
- ehResponse.persistentBufferFlag = 0;
- ehResponse.requestHandle = (OCRequestHandle) request;
- ehResponse.resourceHandle = (OCResourceHandle) resPtr;
- strcpy((char *)ehResponse.resourceUri, (const char *)resourceObserver->resUri);
- result = OCDoResponse(&ehResponse);
+
+ OCPresencePayloadDestroy(presenceResBuf);
}
}
- #endif
+#endif
+
+ // Since we are in a loop, set an error flag to indicate at least one error occurred.
+ if (result != OC_STACK_OK)
+ {
+ observeErrorFlag = true;
+ }
}
resourceObserver = resourceObserver->next;
}
+
+ oc_mutex_unlock(g_serverObsListMutex);
+
if (numObs == 0)
{
- OC_LOG(INFO, TAG, PCF("Resource has no observers"));
+ OIC_LOG(INFO, TAG, "Resource has no observers");
result = OC_STACK_NO_OBSERVERS;
}
+ else if (observeErrorFlag)
+ {
+ OIC_LOG(ERROR, TAG, "Observer notification error");
+ result = OC_STACK_ERROR;
+ }
return result;
}
OCStackResult SendListObserverNotification (OCResource * resource,
OCObservationId *obsIdList, uint8_t numberOfIds,
- unsigned char *notificationJSONPayload, uint32_t maxAge,
+ const OCRepPayload *payload,
+ uint32_t maxAge,
OCQualityOfService qos)
{
+ (void)maxAge;
+ if (!resource || !obsIdList || !payload)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+
uint8_t numIds = numberOfIds;
- ResourceObserver *observation = NULL;
+ ResourceObserver *observer = NULL;
uint8_t numSentNotification = 0;
OCServerRequest * request = NULL;
OCStackResult result = OC_STACK_ERROR;
- OCEntityHandlerResponse ehResponse = {0};
+ bool observeErrorFlag = false;
- OC_LOG(INFO, TAG, PCF("Entering SendListObserverNotification"));
+ OIC_LOG(INFO, TAG, "Entering SendListObserverNotification");
while(numIds)
{
- OC_LOG_V(INFO, TAG, "Need to notify observation id %d", *obsIdList);
- observation = GetObserverUsingId (*obsIdList);
- if(observation)
+ oc_mutex_lock(g_serverObsListMutex);
+ observer = GetObserverUsingIdAsOwner (*obsIdList);
+ if (observer)
{
- // Found observation - verify if it matches the resource handle
- if (observation->resource == resource)
+ // Found observer - verify if it matches the resource handle
+ if (observer->resource == resource)
{
- qos = DetermineObserverQoS(OC_REST_GET, observation, qos);
+ qos = DetermineObserverQoS(OC_REST_GET, observer, qos);
- result = AddServerRequest(&request, 0, 0, 0, 1, OC_REST_GET,
- 0, resource->sequenceNum, qos, observation->query,
- NULL, NULL, &observation->token,
- observation->resUri, 0,
- &(observation->addressInfo), observation->connectivityType);
+ result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET,
+ 0, resource->sequenceNum, qos, observer->query,
+ NULL, NULL, observer->token, observer->tokenLength,
+ observer->resUri, 0, observer->acceptFormat,
+ &observer->devAddr);
- if(request)
+ if (request)
{
request->observeResult = OC_STACK_OK;
- if(result == OC_STACK_OK)
+ if (result == OC_STACK_OK)
{
- memset(&ehResponse, 0, sizeof(OCEntityHandlerResponse));
+ OCEntityHandlerResponse ehResponse = {0};
ehResponse.ehResult = OC_EH_OK;
- ehResponse.payload = (unsigned char *) OCMalloc(MAX_RESPONSE_LENGTH);
- if(!ehResponse.payload)
+ ehResponse.payload = (OCPayload*)OCRepPayloadCreate();
+ if (!ehResponse.payload)
{
FindAndDeleteServerRequest(request);
+ oc_mutex_unlock(g_serverObsListMutex);
continue;
}
- strcpy((char *)ehResponse.payload, (const char *)notificationJSONPayload);
- ehResponse.payloadSize = strlen((const char *)ehResponse.payload) + 1;
+ memcpy(ehResponse.payload, payload, sizeof(*payload));
ehResponse.persistentBufferFlag = 0;
- ehResponse.requestHandle = (OCRequestHandle) request;
+ ehResponse.requestHandle = (OCRequestHandle) request->requestId;
ehResponse.resourceHandle = (OCResourceHandle) resource;
result = OCDoResponse(&ehResponse);
- if(result == OC_STACK_OK)
+ if (result == OC_STACK_OK)
{
- OCFree(ehResponse.payload);
+ OIC_LOG_V(INFO, TAG, "Observer id %d notified.", *obsIdList);
+
+ // Increment only if OCDoResponse is successful
+ numSentNotification++;
+ }
+ else
+ {
+ OIC_LOG_V(INFO, TAG, "Error notifying observer id %d.", *obsIdList);
FindAndDeleteServerRequest(request);
}
+
+ // Reset Observer TTL.
+ observer->TTL =
+ GetTicks(MAX_OBSERVER_TTL_SECONDS * MILLISECONDS_PER_SECOND);
+
+ OICFree(ehResponse.payload);
}
else
{
}
}
- numSentNotification++;
+ // Since we are in a loop, set an error flag to indicate
+ // at least one error occurred.
+ if (result != OC_STACK_OK)
+ {
+ observeErrorFlag = true;
+ }
}
}
+
+ oc_mutex_unlock(g_serverObsListMutex);
obsIdList++;
numIds--;
}
- if(numSentNotification == numberOfIds)
+
+ if (numSentNotification == numberOfIds && !observeErrorFlag)
{
return OC_STACK_OK;
}
- else if(numSentNotification == 0)
+ else if (numSentNotification == 0)
{
return OC_STACK_NO_OBSERVERS;
}
else
{
- //TODO: we need to signal that not every one in the
- // list got an update, should we also indicate who did not receive on?
- return OC_STACK_OK;
+ OIC_LOG(ERROR, TAG, "Observer notification error");
+ return OC_STACK_ERROR;
}
}
OCStackResult GenerateObserverId (OCObservationId *observationId)
{
- ResourceObserver *resObs = NULL;
+ bool found = false;
- OC_LOG(INFO, TAG, PCF("Entering GenerateObserverId"));
+ OIC_LOG(INFO, TAG, "Entering GenerateObserverId");
VERIFY_NON_NULL (observationId);
do
{
- *observationId = OCGetRandomByte();
+ do
+ {
+ *observationId = OCGetRandomByte();
+ } while (0 == *observationId); //Make sure *observationId is not 0
// Check if observation Id already exists
- resObs = GetObserverUsingId (*observationId);
- } while (NULL != resObs);
+ found = IsObserverAvailable (*observationId);
+ } while (found);
- OC_LOG_V(INFO, TAG, "Observation ID is %u", *observationId);
+ OIC_LOG_V(INFO, TAG, "GeneratedObservation ID is %u", *observationId);
return OC_STACK_OK;
exit:
OCStackResult AddObserver (const char *resUri,
const char *query,
OCObservationId obsId,
- CAToken_t *token,
+ CAToken_t token,
+ uint8_t tokenLength,
OCResource *resHandle,
OCQualityOfService qos,
- CAAddress_t *addressInfo,
- CAConnectivityType_t connectivityType)
+ OCPayloadFormat acceptFormat,
+ const OCDevAddr *devAddr)
{
- ResourceObserver *obsNode = NULL;
+ // Check if resource exists and is observable.
+ if (!resHandle)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+ if (!(resHandle->resourceProperties & OC_OBSERVABLE))
+ {
+ return OC_STACK_RESOURCE_ERROR;
+ }
- obsNode = (ResourceObserver *) OCCalloc(1, sizeof(ResourceObserver));
+ if (!resUri || !token)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+
+ ResourceObserver *obsNode = (ResourceObserver *) OICCalloc(1, sizeof(ResourceObserver));
if (obsNode)
{
obsNode->observeId = obsId;
- obsNode->resUri = (unsigned char *)OCMalloc(strlen(resUri)+1);
+ obsNode->resUri = OICStrdup(resUri);
VERIFY_NON_NULL (obsNode->resUri);
- memcpy (obsNode->resUri, resUri, strlen(resUri)+1);
obsNode->qos = qos;
- if(query)
+ obsNode->acceptFormat = acceptFormat;
+ if (query)
{
- obsNode->query = (unsigned char *)OCMalloc(strlen(query)+1);
+ obsNode->query = OICStrdup(query);
VERIFY_NON_NULL (obsNode->query);
- memcpy (obsNode->query, query, strlen(query)+1);
}
+ // If tokenLength is zero, the return value depends on the
+ // particular library implementation (it may or may not be a null pointer).
+ if (tokenLength)
+ {
+ obsNode->token = (CAToken_t)OICMalloc(tokenLength);
+ VERIFY_NON_NULL (obsNode->token);
+ memcpy(obsNode->token, token, tokenLength);
+ }
+ obsNode->tokenLength = tokenLength;
- obsNode->token = (CAToken_t)OCMalloc(CA_MAX_TOKEN_LEN+1);
- VERIFY_NON_NULL (obsNode->token);
- memset(obsNode->token, 0, CA_MAX_TOKEN_LEN + 1);
- memcpy(obsNode->token, *token, CA_MAX_TOKEN_LEN);
-
- obsNode->addressInfo = *addressInfo;
- obsNode->connectivityType = connectivityType;
+ obsNode->devAddr = *devAddr;
obsNode->resource = resHandle;
- LL_APPEND (serverObsList, obsNode);
+
+#ifdef WITH_PRESENCE
+ if ((strcmp(resUri, OC_RSRVD_PRESENCE_URI) == 0))
+ {
+ obsNode->TTL = 0;
+ }
+ else
+#endif
+ {
+ obsNode->TTL = GetTicks(MAX_OBSERVER_TTL_SECONDS * MILLISECONDS_PER_SECOND);
+ }
+
+ oc_mutex_lock(g_serverObsListMutex);
+ LL_APPEND (g_serverObsList, obsNode);
+ oc_mutex_unlock(g_serverObsListMutex);
+
return OC_STACK_OK;
}
exit:
if (obsNode)
{
- OCFree(obsNode->resUri);
- OCFree(obsNode->query);
- OCFree(obsNode);
+ OICFree(obsNode->resUri);
+ OICFree(obsNode->query);
+ OICFree(obsNode);
}
return OC_STACK_NO_MEMORY;
}
+/*
+ * This function checks if the node is past its time to live and
+ * deletes it if timed-out. Calling this function with a presence callback
+ * with ttl set to 0 will not delete anything as presence nodes have
+ * their own mechanisms for timeouts. A null argument will cause the function to
+ * silently return.
+ */
+static void CheckTimedOutObserver(ResourceObserver* observer)
+{
+ if (!observer || observer->TTL == 0)
+ {
+ return;
+ }
+
+ coap_tick_t now;
+ coap_ticks(&now);
+
+ if (observer->TTL < now)
+ {
+ // Send confirmable notification message to observer.
+ OIC_LOG(INFO, TAG, "Sending High-QoS notification to observer");
+ SendObserveNotification(observer, OC_HIGH_QOS);
+ }
+}
+
ResourceObserver* GetObserverUsingId (const OCObservationId observeId)
{
ResourceObserver *out = NULL;
if (observeId)
{
- LL_FOREACH (serverObsList, out)
+ oc_mutex_lock(g_serverObsListMutex);
+ LL_FOREACH (g_serverObsList, out)
+ {
+ if (out->observeId == observeId)
+ {
+ oc_mutex_unlock(g_serverObsListMutex);
+ return CloneObserverNode(out);
+ }
+ CheckTimedOutObserver(out);
+ }
+ oc_mutex_unlock(g_serverObsListMutex);
+ }
+ OIC_LOG(INFO, TAG, "Observer node not found!!");
+ return NULL;
+}
+
+static ResourceObserver* GetObserverUsingIdAsOwner (const OCObservationId observeId)
+{
+ ResourceObserver *out = NULL;
+
+ if (observeId)
+ {
+ LL_FOREACH (g_serverObsList, out)
{
if (out->observeId == observeId)
{
return out;
}
+ CheckTimedOutObserver(out);
}
}
- OC_LOG(INFO, TAG, PCF("Observer node not found!!"));
+ OIC_LOG(INFO, TAG, "Observer node not found!!");
return NULL;
}
-ResourceObserver* GetObserverUsingToken (const CAToken_t * token)
+bool IsObserverAvailable (const OCObservationId observeId)
{
ResourceObserver *out = NULL;
- if(token)
+ if (observeId)
+ {
+ oc_mutex_lock(g_serverObsListMutex);
+ LL_FOREACH (g_serverObsList, out)
+ {
+ if (out->observeId == observeId)
+ {
+ oc_mutex_unlock(g_serverObsListMutex);
+ return true;
+ }
+ }
+ oc_mutex_unlock(g_serverObsListMutex);
+ }
+
+ return false;
+}
+
+ResourceObserver* GetObserverUsingToken (const CAToken_t token, uint8_t tokenLength)
+{
+ if (token)
{
- LL_FOREACH (serverObsList, out)
+ OIC_LOG(INFO, TAG, "Looking for token");
+ OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)token, tokenLength);
+
+ ResourceObserver *out = NULL;
+ oc_mutex_lock(g_serverObsListMutex);
+ LL_FOREACH (g_serverObsList, out)
+ {
+ /* de-annotate below line if want to see all token in cbList */
+ //OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)out->token, tokenLength);
+ if ((memcmp(out->token, token, tokenLength) == 0))
+ {
+ OIC_LOG(INFO, TAG, "Found in observer list");
+ ResourceObserver *observer = CloneObserverNode(out);
+ oc_mutex_unlock(g_serverObsListMutex);
+ return observer;
+ }
+ CheckTimedOutObserver(out);
+ }
+ oc_mutex_unlock(g_serverObsListMutex);
+ }
+ else
+ {
+ OIC_LOG(ERROR, TAG, "Passed in NULL token");
+ }
+
+ OIC_LOG(INFO, TAG, "Observer node not found!!");
+ return NULL;
+}
+
+static ResourceObserver* GetObserverUsingTokenAsOwner (const CAToken_t token, uint8_t tokenLength)
+{
+ if (token)
+ {
+ OIC_LOG(INFO, TAG, "Looking for token");
+ OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)token, tokenLength);
+
+ ResourceObserver *out = NULL;
+ LL_FOREACH (g_serverObsList, out)
{
- OC_LOG(INFO, TAG,PCF("comparing tokens"));
- OC_LOG_BUFFER(INFO, TAG, (const uint8_t *)token, CA_MAX_TOKEN_LEN);
- OC_LOG_BUFFER(INFO, TAG, (const uint8_t *)out->token, CA_MAX_TOKEN_LEN);
- if((memcmp(out->token, *token, CA_MAX_TOKEN_LEN) == 0))
+ /* de-annotate below line if want to see all token in cbList */
+ //OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)out->token, tokenLength);
+ if ((memcmp(out->token, token, tokenLength) == 0))
{
+ OIC_LOG(INFO, TAG, "Found in observer list");
return out;
}
+ CheckTimedOutObserver(out);
}
}
- OC_LOG(INFO, TAG, PCF("Observer node not found!!"));
+ else
+ {
+ OIC_LOG(ERROR, TAG, "Passed in NULL token");
+ }
+
+ OIC_LOG(INFO, TAG, "Observer node not found!!");
return NULL;
}
-OCStackResult DeleteObserverUsingToken (CAToken_t * token)
+OCStackResult DeleteObserverUsingToken (CAToken_t token, uint8_t tokenLength)
{
- ResourceObserver *obsNode = NULL;
+ if (!token)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
- obsNode = GetObserverUsingToken (token);
+ oc_mutex_lock(g_serverObsListMutex);
+ ResourceObserver *obsNode = GetObserverUsingTokenAsOwner (token, tokenLength);
if (obsNode)
{
- OC_LOG_V(INFO, TAG, PCF("deleting tokens"));
- OC_LOG_BUFFER(INFO, TAG, (const uint8_t *)obsNode->token, CA_MAX_TOKEN_LEN);
- LL_DELETE (serverObsList, obsNode);
- OCFree(obsNode->resUri);
- OCFree(obsNode->query);
- OCFree(obsNode);
+ OIC_LOG_V(INFO, TAG, "deleting observer id %u with token", obsNode->observeId);
+ OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)obsNode->token, tokenLength);
+
+ LL_DELETE (g_serverObsList, obsNode);
+ FreeObserver(obsNode);
}
+ oc_mutex_unlock(g_serverObsListMutex);
+
// it is ok if we did not find the observer...
return OC_STACK_OK;
}
-void DeleteObserverList()
+OCStackResult DeleteObserverUsingDevAddr(const OCDevAddr *devAddr)
{
+ if (!devAddr)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+
+ oc_mutex_lock(g_serverObsListMutex);
+ ResourceObserver* obsDupList = CloneObserverList(g_serverObsList);
+ oc_mutex_unlock(g_serverObsListMutex);
+
ResourceObserver *out = NULL;
ResourceObserver *tmp = NULL;
- LL_FOREACH_SAFE (serverObsList, out, tmp)
+ LL_FOREACH_SAFE(obsDupList, out, tmp)
+ {
+ if (out)
+ {
+ if ((strcmp(out->devAddr.addr, devAddr->addr) == 0)
+ && out->devAddr.port == devAddr->port)
+ {
+ OIC_LOG_V(INFO, TAG, "deleting observer id %u with %s:%u",
+ out->observeId, out->devAddr.addr, out->devAddr.port);
+ OCStackFeedBack(out->token, out->tokenLength, OC_OBSERVER_NOT_INTERESTED);
+ }
+ }
+ }
+
+ FreeObserverList(obsDupList);
+ return OC_STACK_OK;
+}
+
+void DeleteObserverList()
+{
+ oc_mutex_lock(g_serverObsListMutex);
+
+ ResourceObserver* head = g_serverObsList;
+ ResourceObserver* del = NULL;
+ while (NULL != head)
{
- DeleteObserverUsingToken (&(out->token));
+ del = head;
+ head = head->next;
+
+ OIC_LOG_V(INFO, TAG, "deleting observer id %u with token", del->observeId);
+ OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)del->token, del->tokenLength);
+
+ FreeObserver(del);
}
- serverObsList = NULL;
+
+ g_serverObsList = NULL;
+ oc_mutex_unlock(g_serverObsListMutex);
}
+/*
+ * CA layer expects observe registration/de-reg/notiifcations to be passed as a header
+ * option, which breaks the protocol abstraction requirement between RI & CA, and
+ * has to be fixed in the future. The function below adds the header option for observe.
+ * It should be noted that the observe header option is assumed to be the first option
+ * in the list of user defined header options and hence it is inserted at the front
+ * of the header options list and number of options adjusted accordingly.
+ */
OCStackResult
CreateObserveHeaderOption (CAHeaderOption_t **caHdrOpt,
OCHeaderOption *ocHdrOpt,
uint8_t numOptions,
uint8_t observeFlag)
{
+ if (!caHdrOpt)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+
+ if (numOptions > 0 && !ocHdrOpt)
+ {
+ OIC_LOG (INFO, TAG, "options are NULL though number is non zero");
+ return OC_STACK_INVALID_PARAM;
+ }
+
CAHeaderOption_t *tmpHdrOpt = NULL;
- tmpHdrOpt = (CAHeaderOption_t *) OCMalloc ((numOptions+1)*sizeof(CAHeaderOption_t));
+ tmpHdrOpt = (CAHeaderOption_t *) OICCalloc ((numOptions+1), sizeof(CAHeaderOption_t));
if (NULL == tmpHdrOpt)
{
return OC_STACK_NO_MEMORY;
}
tmpHdrOpt[0].protocolID = CA_COAP_ID;
tmpHdrOpt[0].optionID = COAP_OPTION_OBSERVE;
- tmpHdrOpt[0].optionLength = sizeof(uint32_t);
+ tmpHdrOpt[0].optionLength = sizeof(uint8_t);
tmpHdrOpt[0].optionData[0] = observeFlag;
for (uint8_t i = 0; i < numOptions; i++)
{
return OC_STACK_OK;
}
+/*
+ * CA layer passes observe information to the RI layer as a header option, which
+ * breaks the protocol abstraction requirement between RI & CA, and has to be fixed
+ * in the future. The function below removes the observe header option and processes it.
+ * It should be noted that the observe header option is always assumed to be the first
+ * option in the list of user defined header options and hence it is deleted from the
+ * front of the header options list and the number of options is adjusted accordingly.
+ */
OCStackResult
GetObserveHeaderOption (uint32_t * observationOption,
CAHeaderOption_t *options,
uint8_t * numOptions)
{
- *observationOption = OC_OBSERVE_NO_OPTION;
- uint8_t i = 0;
- uint8_t c = 0;
- for(i = 0; i < *numOptions; i++)
+ if (!observationOption)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+
+ if (!options || !numOptions)
+ {
+ OIC_LOG (INFO, TAG, "No options present");
+ return OC_STACK_OK;
+ }
+
+ for(uint8_t i = 0; i < *numOptions; i++)
{
- if(options[i].protocolID == CA_COAP_ID &&
+ if (options[i].protocolID == CA_COAP_ID &&
options[i].optionID == COAP_OPTION_OBSERVE)
{
*observationOption = options[i].optionData[0];
- for(c = i; c < *numOptions-1; c++)
+ for(uint8_t c = i; c < *numOptions-1; c++)
{
- options[i].protocolID = options[i+1].protocolID;
- options[i].optionID = options[i+1].optionID;
- options[i].optionLength = options[i+1].optionLength;
- memcpy(options[i].optionData, options[i+1].optionData, options[i+1].optionLength);
+ options[i] = options[i+1];
}
(*numOptions)--;
return OC_STACK_OK;
}
return OC_STACK_OK;
}
+
+OCStackResult InitializeObseverList()
+{
+ OIC_LOG(DEBUG, TAG, "InitializeObseverList IN");
+
+ if (NULL == g_serverObsListMutex)
+ {
+ g_serverObsListMutex = oc_mutex_new();
+ }
+
+ OIC_LOG(DEBUG, TAG, "InitializeObseverList OUT");
+ return OC_STACK_OK;
+}
+
+void TerminateObserverList()
+{
+ OIC_LOG(DEBUG, TAG, "TerminateObserverList IN");
+
+ DeleteObserverList();
+
+ if (NULL != g_serverObsListMutex)
+ {
+ oc_mutex_free(g_serverObsListMutex);
+ g_serverObsListMutex = NULL;
+ }
+
+ OIC_LOG(DEBUG, TAG, "TerminateObserverList OUT");
+}
+
+void FreeObserver (ResourceObserver* obsNode)
+{
+ if (NULL != obsNode)
+ {
+ OICFree(obsNode->resUri);
+ OICFree(obsNode->query);
+ OICFree(obsNode->token);
+ OICFree(obsNode);
+ }
+}