#include "ocstackinternal.h"
#include "ocobserve.h"
#include "ocresourcehandler.h"
-#include "occoap.h"
-#include "utlist.h"
-#include "debug.h"
#include "ocrandom.h"
#include "ocmalloc.h"
#include "ocserverrequest.h"
+#include "utlist.h"
+#include "pdu.h"
+
// Module Name
#define MOD_NAME PCF("ocobserve")
static struct ResourceObserver * serverObsList = NULL;
-// send notifications based on the qos of the request
-// The qos passed as a parameter overrides what the client requested
-// If we want the client preference taking high priority make:
-// qos = resourceObserver->qos;
-OCQualityOfService DetermineObserverQoS(OCMethod method, ResourceObserver * resourceObserver,
- OCQualityOfService appQoS)
+/**
+ * Determine observe QOS based on the QOS of the request.
+ * The qos passed as a parameter overrides what the client requested.
+ * If we want the client preference taking high priority make:
+ * qos = resourceObserver->qos;
+ *
+ * @param method RESTful method.
+ * @param resourceObserver Observer.
+ * @param appQoS Quality of service.
+ * @return The quality of service of the observer.
+ */
+static OCQualityOfService DetermineObserverQoS(OCMethod method,
+ ResourceObserver * resourceObserver, OCQualityOfService appQoS)
{
+ if(!resourceObserver)
+ {
+ OC_LOG(ERROR, TAG, "DetermineObserverQoS called with invalid resourceObserver");
+ return OC_NA_QOS;
+ }
+
OCQualityOfService decidedQoS = appQoS;
if(appQoS == OC_NA_QOS)
{
if(resourceObserver->forceHighQos \
|| resourceObserver->lowQosCount >= MAX_OBSERVER_NON_COUNT)
#endif
- {
+ {
resourceObserver->lowQosCount = 0;
// at some point we have to to send CON to check on the
// availability of observer
OC_LOG(INFO, TAG, PCF("This time we are sending the notification as High qos"));
decidedQoS = OC_HIGH_QOS;
- }
+ }
else
{
(resourceObserver->lowQosCount)++;
#endif
{
OC_LOG(INFO, TAG, PCF("Entering SendObserverNotification"));
+ if(!resPtr)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+
OCStackResult result = OC_STACK_ERROR;
ResourceObserver * resourceObserver = serverObsList;
uint8_t numObs = 0;
OCServerRequest * request = NULL;
- OCEntityHandlerRequest ehRequest = {0};
+ OCEntityHandlerRequest ehRequest = {};
OCEntityHandlerResult ehResult = OC_EH_ERROR;
+ bool observeErrorFlag = false;
// Find clients that are observing this resource
while (resourceObserver)
{
#endif
qos = DetermineObserverQoS(method, resourceObserver, qos);
+
result = AddServerRequest(&request, 0, 0, 0, 1, OC_REST_GET,
0, resPtr->sequenceNum, qos, resourceObserver->query,
NULL, NULL,
- &resourceObserver->token, resourceObserver->addr,
- resourceObserver->resUri, 0);
- request->observeResult = OC_STACK_OK;
- if(request && result == OC_STACK_OK)
+ resourceObserver->token, resourceObserver->tokenLength,
+ resourceObserver->resUri, 0,
+ &(resourceObserver->addressInfo), resourceObserver->connectivityType);
+
+ if(request)
{
- result = FormOCEntityHandlerRequest(&ehRequest, (OCRequestHandle) request,
- request->method, (OCResourceHandle) resPtr, request->query,
- request->reqJSONPayload, request->numRcvdVendorSpecificHeaderOptions,
- request->rcvdVendorSpecificHeaderOptions, OC_OBSERVE_NO_OPTION, 0);
+ request->observeResult = OC_STACK_OK;
if(result == OC_STACK_OK)
{
- ehResult = resPtr->entityHandler(OC_REQUEST_FLAG, &ehRequest);
- if(ehResult == OC_EH_ERROR)
+ result = FormOCEntityHandlerRequest(&ehRequest, (OCRequestHandle) request,
+ request->method, (OCResourceHandle) resPtr, request->query,
+ request->reqJSONPayload,
+ request->numRcvdVendorSpecificHeaderOptions,
+ request->rcvdVendorSpecificHeaderOptions,
+ OC_OBSERVE_NO_OPTION, 0);
+ if(result == OC_STACK_OK)
{
- FindAndDeleteServerRequest(request);
+ ehResult = resPtr->entityHandler(OC_REQUEST_FLAG, &ehRequest);
+ if(ehResult == OC_EH_ERROR)
+ {
+ FindAndDeleteServerRequest(request);
+ }
}
}
}
}
else
{
- OCEntityHandlerResponse ehResponse = {0};
- unsigned char presenceResBuf[MAX_RESPONSE_LENGTH] = {0};
+ OCEntityHandlerResponse ehResponse = {};
+ char presenceResBuf[MAX_RESPONSE_LENGTH] = {};
+
//This is effectively the implementation for the presence entity handler.
OC_LOG(DEBUG, TAG, PCF("This notification is for Presence"));
+
result = AddServerRequest(&request, 0, 0, 0, 1, OC_REST_GET,
- 0, OC_OBSERVE_NO_OPTION, OC_LOW_QOS,
- NULL, NULL, NULL, &resourceObserver->token,
- resourceObserver->addr, resourceObserver->resUri, 0);
+ 0, resPtr->sequenceNum, qos, resourceObserver->query,
+ NULL, NULL,
+ resourceObserver->token, resourceObserver->tokenLength,
+ resourceObserver->resUri, 0,
+ &(resourceObserver->addressInfo), resourceObserver->connectivityType);
+
if(result == OC_STACK_OK)
{
// we create the payload here
}
}
#endif
+
+ // Since we are in a loop, set an error flag to indicate at least one error occurred.
+ if (result != OC_STACK_OK)
+ {
+ observeErrorFlag = true;
+ }
}
resourceObserver = resourceObserver->next;
}
+
if (numObs == 0)
{
OC_LOG(INFO, TAG, PCF("Resource has no observers"));
result = OC_STACK_NO_OBSERVERS;
}
+ else if (observeErrorFlag)
+ {
+ OC_LOG(ERROR, TAG, PCF("Observer notification error"));
+ result = OC_STACK_ERROR;
+ }
return result;
}
OCStackResult SendListObserverNotification (OCResource * resource,
OCObservationId *obsIdList, uint8_t numberOfIds,
- unsigned char *notificationJSONPayload, uint32_t maxAge,
+ const char *notificationJSONPayload, uint32_t maxAge,
OCQualityOfService qos)
{
+ if(!resource || !obsIdList || !notificationJSONPayload)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+
uint8_t numIds = numberOfIds;
- ResourceObserver *observation = NULL;
+ ResourceObserver *observer = NULL;
uint8_t numSentNotification = 0;
OCServerRequest * request = NULL;
OCStackResult result = OC_STACK_ERROR;
- OCEntityHandlerResponse ehResponse = {0};
+ bool observeErrorFlag = false;
OC_LOG(INFO, TAG, PCF("Entering SendListObserverNotification"));
while(numIds)
{
- OC_LOG_V(INFO, TAG, "Need to notify observation id %d", *obsIdList);
- observation = NULL;
- observation = GetObserverUsingId (*obsIdList);
- if(observation)
+ observer = GetObserverUsingId (*obsIdList);
+ if(observer)
{
- // Found observation - verify if it matches the resource handle
- if (observation->resource == resource)
+ // Found observer - verify if it matches the resource handle
+ if (observer->resource == resource)
{
- qos = DetermineObserverQoS(OC_REST_GET, observation, qos);
+ qos = DetermineObserverQoS(OC_REST_GET, observer, qos);
+
result = AddServerRequest(&request, 0, 0, 0, 1, OC_REST_GET,
- 0, resource->sequenceNum, qos, observation->query,
- NULL, NULL, &observation->token,
- observation->addr, observation->resUri, 0);
- request->observeResult = OC_STACK_OK;
- if(request && result == OC_STACK_OK)
+ 0, resource->sequenceNum, qos, observer->query,
+ NULL, NULL, observer->token, observer->tokenLength,
+ observer->resUri, 0,
+ &(observer->addressInfo), observer->connectivityType);
+
+ if(request)
{
- memset(&ehResponse, 0, sizeof(OCEntityHandlerResponse));
- ehResponse.ehResult = OC_EH_OK;
- ehResponse.payload = (unsigned char *) OCMalloc(MAX_RESPONSE_LENGTH);
- if(!ehResponse.payload)
+ request->observeResult = OC_STACK_OK;
+ if(result == OC_STACK_OK)
{
- FindAndDeleteServerRequest(request);
- continue;
+ OCEntityHandlerResponse ehResponse = {};
+ ehResponse.ehResult = OC_EH_OK;
+ ehResponse.payload = (char *) OCMalloc(MAX_RESPONSE_LENGTH + 1);
+ if(!ehResponse.payload)
+ {
+ FindAndDeleteServerRequest(request);
+ continue;
+ }
+ strncpy(ehResponse.payload, notificationJSONPayload, MAX_RESPONSE_LENGTH-1);
+ ehResponse.payload[MAX_RESPONSE_LENGTH] = '\0';
+ ehResponse.payloadSize = strlen(ehResponse.payload) + 1;
+ ehResponse.persistentBufferFlag = 0;
+ ehResponse.requestHandle = (OCRequestHandle) request;
+ ehResponse.resourceHandle = (OCResourceHandle) resource;
+ result = OCDoResponse(&ehResponse);
+ if(result == OC_STACK_OK)
+ {
+ OC_LOG_V(INFO, TAG, "Observer id %d notified.", *obsIdList);
+
+ // Increment only if OCDoResponse is successful
+ numSentNotification++;
+
+ OCFree(ehResponse.payload);
+ FindAndDeleteServerRequest(request);
+ }
+ else
+ {
+ OC_LOG_V(INFO, TAG, "Error notifying observer id %d.", *obsIdList);
+ }
}
- strcpy((char *)ehResponse.payload, (const char *)notificationJSONPayload);
- ehResponse.payloadSize = strlen((const char *)ehResponse.payload) + 1;
- ehResponse.persistentBufferFlag = 0;
- ehResponse.requestHandle = (OCRequestHandle) request;
- ehResponse.resourceHandle = (OCResourceHandle) resource;
- result = OCDoResponse(&ehResponse);
- if(result == OC_STACK_OK)
+ else
{
- OCFree(ehResponse.payload);
FindAndDeleteServerRequest(request);
}
}
- else
+ // Since we are in a loop, set an error flag to indicate
+ // at least one error occurred.
+ if (result != OC_STACK_OK)
{
- FindAndDeleteServerRequest(request);
+ observeErrorFlag = true;
}
-
- numSentNotification++;
}
}
obsIdList++;
numIds--;
}
- if(numSentNotification == numberOfIds)
+
+ if(numSentNotification == numberOfIds && !observeErrorFlag)
{
return OC_STACK_OK;
}
}
else
{
- //TODO: we need to signal that not every one in the
- // list got an update, should we also indicate who did not receive on?
- return OC_STACK_OK;
+ OC_LOG(ERROR, TAG, PCF("Observer notification error"));
+ return OC_STACK_ERROR;
}
}
resObs = GetObserverUsingId (*observationId);
} while (NULL != resObs);
- OC_LOG_V(INFO, TAG, "Observation ID is %u", *observationId);
+ OC_LOG_V(INFO, TAG, "Generated bservation ID is %u", *observationId);
return OC_STACK_OK;
exit:
OCStackResult AddObserver (const char *resUri,
const char *query,
OCObservationId obsId,
- OCCoAPToken *token,
- OCDevAddr *addr,
+ CAToken_t token,
+ uint8_t tokenLength,
OCResource *resHandle,
- OCQualityOfService qos)
+ OCQualityOfService qos,
+ const CAAddress_t *addressInfo,
+ CATransportType_t connectivityType)
{
+ // Check if resource exists and is observable.
+ if (!resHandle)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+ if (!(resHandle->resourceProperties & OC_OBSERVABLE))
+ {
+ return OC_STACK_RESOURCE_ERROR;
+ }
ResourceObserver *obsNode = NULL;
+ if(!resUri || !token || !*token)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+
obsNode = (ResourceObserver *) OCCalloc(1, sizeof(ResourceObserver));
if (obsNode)
{
obsNode->observeId = obsId;
- obsNode->resUri = (unsigned char *)OCMalloc(strlen(resUri)+1);
+ obsNode->resUri = (char *)OCMalloc(strlen(resUri)+1);
VERIFY_NON_NULL (obsNode->resUri);
memcpy (obsNode->resUri, resUri, strlen(resUri)+1);
obsNode->qos = qos;
if(query)
{
- obsNode->query = (unsigned char *)OCMalloc(strlen(query)+1);
+ obsNode->query = (char *)OCMalloc(strlen(query)+1);
VERIFY_NON_NULL (obsNode->query);
memcpy (obsNode->query, query, strlen(query)+1);
}
-
- obsNode->token.tokenLength = token->tokenLength;
- memcpy (obsNode->token.token, token->token, token->tokenLength);
-
- obsNode->addr = (OCDevAddr *)OCMalloc(sizeof(OCDevAddr));
- VERIFY_NON_NULL (obsNode->addr);
- memcpy (obsNode->addr, addr, sizeof(OCDevAddr));
-
+ // If tokenLength is zero, the return value depends on the
+ // particular library implementation (it may or may not be a null pointer).
+ if(tokenLength)
+ {
+ obsNode->token = (CAToken_t)OCMalloc(tokenLength);
+ VERIFY_NON_NULL (obsNode->token);
+ memcpy(obsNode->token, token, tokenLength);
+ }
+ obsNode->tokenLength = tokenLength;
+ obsNode->addressInfo = *addressInfo;
+ obsNode->connectivityType = connectivityType;
obsNode->resource = resHandle;
-
LL_APPEND (serverObsList, obsNode);
return OC_STACK_OK;
}
{
OCFree(obsNode->resUri);
OCFree(obsNode->query);
- OCFree(obsNode->addr);
OCFree(obsNode);
}
return OC_STACK_NO_MEMORY;
return NULL;
}
-ResourceObserver* GetObserverUsingToken (const OCCoAPToken * token)
+ResourceObserver* GetObserverUsingToken (const CAToken_t token, uint8_t tokenLength)
{
ResourceObserver *out = NULL;
- if(token)
+ if(token && *token)
{
LL_FOREACH (serverObsList, out)
{
OC_LOG(INFO, TAG,PCF("comparing tokens"));
- OC_LOG_BUFFER(INFO, TAG, token->token, token->tokenLength);
- OC_LOG_BUFFER(INFO, TAG, out->token.token, out->token.tokenLength);
- if((out->token.tokenLength == token->tokenLength) &&
- (memcmp(out->token.token, token->token, token->tokenLength) == 0))
+ OC_LOG_BUFFER(INFO, TAG, (const uint8_t *)token, tokenLength);
+ OC_LOG_BUFFER(INFO, TAG, (const uint8_t *)out->token, tokenLength);
+ if((memcmp(out->token, token, tokenLength) == 0))
{
return out;
}
return NULL;
}
-OCStackResult DeleteObserverUsingToken (OCCoAPToken * token)
+OCStackResult DeleteObserverUsingToken (CAToken_t token, uint8_t tokenLength)
{
+ if(!token || !*token)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+
ResourceObserver *obsNode = NULL;
- obsNode = GetObserverUsingToken (token);
+ obsNode = GetObserverUsingToken (token, tokenLength);
if (obsNode)
{
OC_LOG_V(INFO, TAG, PCF("deleting tokens"));
- OC_LOG_BUFFER(INFO, TAG, obsNode->token.token, obsNode->token.tokenLength);
+ OC_LOG_BUFFER(INFO, TAG, (const uint8_t *)obsNode->token, tokenLength);
LL_DELETE (serverObsList, obsNode);
OCFree(obsNode->resUri);
OCFree(obsNode->query);
- OCFree(obsNode->addr);
+ OCFree(obsNode->token);
OCFree(obsNode);
}
// it is ok if we did not find the observer...
ResourceObserver *tmp = NULL;
LL_FOREACH_SAFE (serverObsList, out, tmp)
{
- DeleteObserverUsingToken (&(out->token));
+ if(out)
+ {
+ DeleteObserverUsingToken ((out->token), out->tokenLength);
+ }
}
serverObsList = NULL;
}
+
+/*
+ * CA layer expects observe registration/de-reg/notiifcations to be passed as a header
+ * option, which breaks the protocol abstraction requirement between RI & CA, and
+ * has to be fixed in the future. The function below adds the header option for observe.
+ * It should be noted that the observe header option is assumed to be the first option
+ * in the list of user defined header options and hence it is inserted at the front
+ * of the header options list and number of options adjusted accordingly.
+ */
+OCStackResult
+CreateObserveHeaderOption (CAHeaderOption_t **caHdrOpt,
+ OCHeaderOption *ocHdrOpt,
+ uint8_t numOptions,
+ uint8_t observeFlag)
+{
+ if(!caHdrOpt)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+
+ CAHeaderOption_t *tmpHdrOpt = NULL;
+
+ tmpHdrOpt = (CAHeaderOption_t *) OCCalloc ((numOptions+1), sizeof(CAHeaderOption_t));
+ if (NULL == tmpHdrOpt)
+ {
+ return OC_STACK_NO_MEMORY;
+ }
+ tmpHdrOpt[0].protocolID = CA_COAP_ID;
+ tmpHdrOpt[0].optionID = COAP_OPTION_OBSERVE;
+ tmpHdrOpt[0].optionLength = sizeof(uint32_t);
+ tmpHdrOpt[0].optionData[0] = observeFlag;
+ for (uint8_t i = 0; i < numOptions; i++)
+ {
+ memcpy (&(tmpHdrOpt[i+1]), &(ocHdrOpt[i]), sizeof(CAHeaderOption_t));
+ }
+
+ *caHdrOpt = tmpHdrOpt;
+ return OC_STACK_OK;
+}
+
+/*
+ * CA layer passes observe information to the RI layer as a header option, which
+ * breaks the protocol abstraction requirement between RI & CA, and has to be fixed
+ * in the future. The function below removes the observe header option and processes it.
+ * It should be noted that the observe header option is always assumed to be the first
+ * option in the list of user defined header options and hence it is deleted from the
+ * front of the header options list and the number of options is adjusted accordingly.
+ */
+OCStackResult
+GetObserveHeaderOption (uint32_t * observationOption,
+ CAHeaderOption_t *options,
+ uint8_t * numOptions)
+{
+ if(!observationOption)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+ *observationOption = OC_OBSERVE_NO_OPTION;
+
+ if(!options || !numOptions)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+
+ for(uint8_t i = 0; i < *numOptions; i++)
+ {
+ if(options[i].protocolID == CA_COAP_ID &&
+ options[i].optionID == COAP_OPTION_OBSERVE)
+ {
+ *observationOption = options[i].optionData[0];
+ for(uint8_t c = i; c < *numOptions-1; c++)
+ {
+ options[i].protocolID = options[i+1].protocolID;
+ options[i].optionID = options[i+1].optionID;
+ options[i].optionLength = options[i+1].optionLength;
+ memcpy(options[i].optionData, options[i+1].optionData, options[i+1].optionLength);
+ }
+ (*numOptions)--;
+ return OC_STACK_OK;
+ }
+ }
+ return OC_STACK_OK;
+}
+