Merge remote-tracking branch 'origin/routing-manager'
[platform/upstream/iotivity.git] / resource / csdk / stack / src / ocobserve.c
index fc6737d..6749105 100644 (file)
 #include "ocstackinternal.h"
 #include "ocobserve.h"
 #include "ocresourcehandler.h"
-#include "occoap.h"
-#include "utlist.h"
-#include "debug.h"
 #include "ocrandom.h"
-#include "ocmalloc.h"
+#include "oic_malloc.h"
+#include "oic_string.h"
+#include "ocpayload.h"
 #include "ocserverrequest.h"
+#include "logger.h"
+
+#include "utlist.h"
+#include "pdu.h"
+
 
 // Module Name
-#define MOD_NAME PCF("ocobserve")
+#define MOD_NAME "ocobserve"
 
-#define TAG  PCF("OCStackObserve")
+#define TAG  "OCStackObserve"
 
 #define VERIFY_NON_NULL(arg) { if (!arg) {OC_LOG(FATAL, TAG, #arg " is NULL"); goto exit;} }
 
 static struct ResourceObserver * serverObsList = NULL;
-
-// send notifications based on the qos of the request
-// The qos passed as a parameter overrides what the client requested
-// If we want the client preference taking high priority make:
-// qos = resourceObserver->qos;
-OCQualityOfService DetermineObserverQoS(OCMethod method, ResourceObserver * resourceObserver,
-        OCQualityOfService appQoS)
+/**
+ * Determine observe QOS based on the QOS of the request.
+ * The qos passed as a parameter overrides what the client requested.
+ * If we want the client preference taking high priority make:
+ *     qos = resourceObserver->qos;
+ *
+ * @param method RESTful method.
+ * @param resourceObserver Observer.
+ * @param appQoS Quality of service.
+ * @return The quality of service of the observer.
+ */
+static OCQualityOfService DetermineObserverQoS(OCMethod method,
+        ResourceObserver * resourceObserver, OCQualityOfService appQoS)
 {
+    if(!resourceObserver)
+    {
+        OC_LOG(ERROR, TAG, "DetermineObserverQoS called with invalid resourceObserver");
+        return OC_NA_QOS;
+    }
+
     OCQualityOfService decidedQoS = appQoS;
     if(appQoS == OC_NA_QOS)
     {
@@ -57,21 +73,21 @@ OCQualityOfService DetermineObserverQoS(OCMethod method, ResourceObserver * reso
     {
         OC_LOG_V(INFO, TAG, "Current NON count for this observer is %d",
                 resourceObserver->lowQosCount);
-        #ifdef WITH_PRESENCE
+#ifdef WITH_PRESENCE
         if((resourceObserver->forceHighQos \
                 || resourceObserver->lowQosCount >= MAX_OBSERVER_NON_COUNT) \
                 && method != OC_REST_PRESENCE)
-        #else
+#else
         if(resourceObserver->forceHighQos \
                 || resourceObserver->lowQosCount >= MAX_OBSERVER_NON_COUNT)
-        #endif
-            {
+#endif
+        {
             resourceObserver->lowQosCount = 0;
             // at some point we have to to send CON to check on the
             // availability of observer
-            OC_LOG(INFO, TAG, PCF("This time we are sending the  notification as High qos"));
+            OC_LOG(INFO, TAG, "This time we are sending the  notification as High qos");
             decidedQoS = OC_HIGH_QOS;
-            }
+        }
         else
         {
             (resourceObserver->lowQosCount)++;
@@ -82,19 +98,25 @@ OCQualityOfService DetermineObserverQoS(OCMethod method, ResourceObserver * reso
 
 #ifdef WITH_PRESENCE
 OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, uint32_t maxAge,
-        OCResourceType *resourceType, OCQualityOfService qos)
+        OCPresenceTrigger trigger, OCResourceType *resourceType, OCQualityOfService qos)
 #else
 OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, uint32_t maxAge,
         OCQualityOfService qos)
 #endif
 {
-    OC_LOG(INFO, TAG, PCF("Entering SendObserverNotification"));
+    OC_LOG(INFO, TAG, "Entering SendObserverNotification");
+    if(!resPtr)
+    {
+        return OC_STACK_INVALID_PARAM;
+    }
+
     OCStackResult result = OC_STACK_ERROR;
     ResourceObserver * resourceObserver = serverObsList;
     uint8_t numObs = 0;
     OCServerRequest * request = NULL;
     OCEntityHandlerRequest ehRequest = {0};
     OCEntityHandlerResult ehResult = OC_EH_ERROR;
+    bool observeErrorFlag = false;
 
     // Find clients that are observing this resource
     while (resourceObserver)
@@ -102,144 +124,203 @@ OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr,
         if (resourceObserver->resource == resPtr)
         {
             numObs++;
-            #ifdef WITH_PRESENCE
+#ifdef WITH_PRESENCE
             if(method != OC_REST_PRESENCE)
             {
-            #endif
+#endif
                 qos = DetermineObserverQoS(method, resourceObserver, qos);
-                result = AddServerRequest(&request, 0, 0, 0, 1, OC_REST_GET,
+
+                result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET,
                         0, resPtr->sequenceNum, qos, resourceObserver->query,
                         NULL, NULL,
-                        &resourceObserver->token, resourceObserver->addr,
-                        resourceObserver->resUri, 0);
-                request->observeResult = OC_STACK_OK;
-                if(request && result == OC_STACK_OK)
+                        resourceObserver->token, resourceObserver->tokenLength,
+                        resourceObserver->resUri, 0, resourceObserver->acceptFormat,
+                        &resourceObserver->devAddr);
+
+                if(request)
                 {
-                    result = FormOCEntityHandlerRequest(&ehRequest, (OCRequestHandle) request,
-                                request->method, (OCResourceHandle) resPtr, request->query,
-                                request->reqJSONPayload, request->numRcvdVendorSpecificHeaderOptions,
-                                request->rcvdVendorSpecificHeaderOptions, OC_OBSERVE_NO_OPTION, 0);
+                    request->observeResult = OC_STACK_OK;
                     if(result == OC_STACK_OK)
                     {
-                        ehResult = resPtr->entityHandler(OC_REQUEST_FLAG, &ehRequest);
-                        if(ehResult == OC_EH_ERROR)
+                        result = FormOCEntityHandlerRequest(
+                                    &ehRequest,
+                                    (OCRequestHandle) request,
+                                    request->method,
+                                    &request->devAddr,
+                                    (OCResourceHandle) resPtr,
+                                    request->query,
+                                    PAYLOAD_TYPE_REPRESENTATION,
+                                    request->payload,
+                                    request->payloadSize,
+                                    request->numRcvdVendorSpecificHeaderOptions,
+                                    request->rcvdVendorSpecificHeaderOptions,
+                                    OC_OBSERVE_NO_OPTION,
+                                    0);
+                        if(result == OC_STACK_OK)
                         {
-                            FindAndDeleteServerRequest(request);
+                            ehResult = resPtr->entityHandler(OC_REQUEST_FLAG, &ehRequest,
+                                                resPtr->entityHandlerCallbackParam);
+                            if(ehResult == OC_EH_ERROR)
+                            {
+                                FindAndDeleteServerRequest(request);
+                            }
                         }
+                        OCPayloadDestroy(ehRequest.payload);
                     }
                 }
-            #ifdef WITH_PRESENCE
+#ifdef WITH_PRESENCE
             }
             else
             {
                 OCEntityHandlerResponse ehResponse = {0};
-                unsigned char presenceResBuf[MAX_RESPONSE_LENGTH] = {0};
+
                 //This is effectively the implementation for the presence entity handler.
-                OC_LOG(DEBUG, TAG, PCF("This notification is for Presence"));
-                result = AddServerRequest(&request, 0, 0, 0, 1, OC_REST_GET,
-                        0, OC_OBSERVE_NO_OPTION, OC_LOW_QOS,
-                        NULL, NULL, NULL, &resourceObserver->token,
-                        resourceObserver->addr, resourceObserver->resUri, 0);
+                OC_LOG(DEBUG, TAG, "This notification is for Presence");
+                result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET,
+                        0, resPtr->sequenceNum, qos, resourceObserver->query,
+                        NULL, NULL,
+                        resourceObserver->token, resourceObserver->tokenLength,
+                        resourceObserver->resUri, 0, resourceObserver->acceptFormat,
+                        &resourceObserver->devAddr);
+
                 if(result == OC_STACK_OK)
                 {
-                    // we create the payload here
-                    if(resourceType && resourceType->resourcetypename)
+                    OCPresencePayload* presenceResBuf = OCPresencePayloadCreate(
+                            resPtr->sequenceNum, maxAge, trigger,
+                            resourceType ? resourceType->resourcetypename : NULL);
+
+                    if(!presenceResBuf)
                     {
-                        sprintf((char *)presenceResBuf, "%u:%u:%s",
-                                resPtr->sequenceNum, maxAge, resourceType->resourcetypename);
+                        return OC_STACK_NO_MEMORY;
                     }
-                    else
+
+                    if(result == OC_STACK_OK)
                     {
-                        sprintf((char *)presenceResBuf, "%u:%u", resPtr->sequenceNum, maxAge);
+                        ehResponse.ehResult = OC_EH_OK;
+                        ehResponse.payload = (OCPayload*)presenceResBuf;
+                        ehResponse.persistentBufferFlag = 0;
+                        ehResponse.requestHandle = (OCRequestHandle) request;
+                        ehResponse.resourceHandle = (OCResourceHandle) resPtr;
+                        OICStrcpy(ehResponse.resourceUri, sizeof(ehResponse.resourceUri),
+                                resourceObserver->resUri);
+                        result = OCDoResponse(&ehResponse);
                     }
-                    memset(&ehResponse, 0, sizeof(OCEntityHandlerResponse));
-                    ehResponse.ehResult = OC_EH_OK;
-                    ehResponse.payload = presenceResBuf;
-                    ehResponse.payloadSize = strlen((const char *)presenceResBuf) + 1;
-                    ehResponse.persistentBufferFlag = 0;
-                    ehResponse.requestHandle = (OCRequestHandle) request;
-                    ehResponse.resourceHandle = (OCResourceHandle) resPtr;
-                    strcpy((char *)ehResponse.resourceUri, (const char *)resourceObserver->resUri);
-                    result = OCDoResponse(&ehResponse);
+
+                    OCPresencePayloadDestroy(presenceResBuf);
                 }
             }
-            #endif
+#endif
+
+            // Since we are in a loop, set an error flag to indicate at least one error occurred.
+            if (result != OC_STACK_OK)
+            {
+                observeErrorFlag = true;
+            }
         }
         resourceObserver = resourceObserver->next;
     }
+
     if (numObs == 0)
     {
-        OC_LOG(INFO, TAG, PCF("Resource has no observers"));
+        OC_LOG(INFO, TAG, "Resource has no observers");
         result = OC_STACK_NO_OBSERVERS;
     }
+    else if (observeErrorFlag)
+    {
+        OC_LOG(ERROR, TAG, "Observer notification error");
+        result = OC_STACK_ERROR;
+    }
     return result;
 }
 
 OCStackResult SendListObserverNotification (OCResource * resource,
         OCObservationId  *obsIdList, uint8_t numberOfIds,
-        unsigned char *notificationJSONPayload, uint32_t maxAge,
+        const OCRepPayload *payload,
+        uint32_t maxAge,
         OCQualityOfService qos)
 {
+    (void)maxAge;
+    if(!resource || !obsIdList || !payload)
+    {
+        return OC_STACK_INVALID_PARAM;
+    }
+
     uint8_t numIds = numberOfIds;
-    ResourceObserver *observation = NULL;
+    ResourceObserver *observer = NULL;
     uint8_t numSentNotification = 0;
     OCServerRequest * request = NULL;
     OCStackResult result = OC_STACK_ERROR;
-    OCEntityHandlerResponse ehResponse;
-    memset(&ehResponse, 0, sizeof(OCEntityHandlerResponse));
+    bool observeErrorFlag = false;
 
-    OC_LOG(INFO, TAG, PCF("Entering SendListObserverNotification"));
+    OC_LOG(INFO, TAG, "Entering SendListObserverNotification");
     while(numIds)
     {
-        OC_LOG_V(INFO, TAG, "Need to notify observation id %d", *obsIdList);
-        observation = NULL;
-        observation = GetObserverUsingId (*obsIdList);
-        if(observation)
+        observer = GetObserverUsingId (*obsIdList);
+        if(observer)
         {
-            // Found observation - verify if it matches the resource handle
-            if (observation->resource == resource)
+            // Found observer - verify if it matches the resource handle
+            if (observer->resource == resource)
             {
-                qos = DetermineObserverQoS(OC_REST_GET, observation, qos);
-
-                result = AddServerRequest(&request, 0, 0, 0, 1, OC_REST_GET,
-                        0, resource->sequenceNum, qos, observation->query,
-                        NULL, NULL, &observation->token,
-                        observation->addr, observation->resUri, 0);
-                request->observeResult = OC_STACK_OK;
-                if(request && result == OC_STACK_OK)
+                qos = DetermineObserverQoS(OC_REST_GET, observer, qos);
+
+
+                result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET,
+                        0, resource->sequenceNum, qos, observer->query,
+                        NULL, NULL, observer->token, observer->tokenLength,
+                        observer->resUri, 0, observer->acceptFormat,
+                        &observer->devAddr);
+
+                if(request)
                 {
-                    memset(&ehResponse, 0, sizeof(OCEntityHandlerResponse));
-                    ehResponse.ehResult = OC_EH_OK;
-                    ehResponse.payload = (unsigned char *) OCMalloc(MAX_RESPONSE_LENGTH);
-                    if(!ehResponse.payload)
+                    request->observeResult = OC_STACK_OK;
+                    if(result == OC_STACK_OK)
                     {
-                        FindAndDeleteServerRequest(request);
-                        continue;
+                        OCEntityHandlerResponse ehResponse = {0};
+                        ehResponse.ehResult = OC_EH_OK;
+                        ehResponse.payload = (OCPayload*)OCRepPayloadCreate();
+                        if(!ehResponse.payload)
+                        {
+                            FindAndDeleteServerRequest(request);
+                            continue;
+                        }
+                        memcpy(ehResponse.payload, payload, sizeof(*payload));
+                        ehResponse.persistentBufferFlag = 0;
+                        ehResponse.requestHandle = (OCRequestHandle) request;
+                        ehResponse.resourceHandle = (OCResourceHandle) resource;
+                        result = OCDoResponse(&ehResponse);
+                        if(result == OC_STACK_OK)
+                        {
+                            OC_LOG_V(INFO, TAG, "Observer id %d notified.", *obsIdList);
+
+                            // Increment only if OCDoResponse is successful
+                            numSentNotification++;
+
+                            OICFree(ehResponse.payload);
+                            FindAndDeleteServerRequest(request);
+                        }
+                        else
+                        {
+                            OC_LOG_V(INFO, TAG, "Error notifying observer id %d.", *obsIdList);
+                        }
                     }
-                    strcpy((char *)ehResponse.payload, (const char *)notificationJSONPayload);
-                    ehResponse.payloadSize = strlen((const char *)ehResponse.payload) + 1;
-                    ehResponse.persistentBufferFlag = 0;
-                    ehResponse.requestHandle = (OCRequestHandle) request;
-                    ehResponse.resourceHandle = (OCResourceHandle) resource;
-                    result = OCDoResponse(&ehResponse);
-                    if(result == OC_STACK_OK)
+                    else
                     {
-                        OCFree(ehResponse.payload);
                         FindAndDeleteServerRequest(request);
                     }
                 }
-                else
+                // Since we are in a loop, set an error flag to indicate
+                // at least one error occurred.
+                if (result != OC_STACK_OK)
                 {
-                    FindAndDeleteServerRequest(request);
+                    observeErrorFlag = true;
                 }
-
-                numSentNotification++;
             }
         }
         obsIdList++;
         numIds--;
     }
-    if(numSentNotification == numberOfIds)
+
+    if(numSentNotification == numberOfIds && !observeErrorFlag)
     {
         return OC_STACK_OK;
     }
@@ -249,9 +330,8 @@ OCStackResult SendListObserverNotification (OCResource * resource,
     }
     else
     {
-        //TODO: we need to signal that not every one in the
-        // list got an update, should we also indicate who did not receive on?
-        return OC_STACK_OK;
+        OC_LOG(ERROR, TAG, "Observer notification error");
+        return OC_STACK_ERROR;
     }
 }
 
@@ -259,7 +339,7 @@ OCStackResult GenerateObserverId (OCObservationId *observationId)
 {
     ResourceObserver *resObs = NULL;
 
-    OC_LOG(INFO, TAG, PCF("Entering GenerateObserverId"));
+    OC_LOG(INFO, TAG, "Entering GenerateObserverId");
     VERIFY_NON_NULL (observationId);
 
     do
@@ -269,7 +349,7 @@ OCStackResult GenerateObserverId (OCObservationId *observationId)
         resObs = GetObserverUsingId (*observationId);
     } while (NULL != resObs);
 
-    OC_LOG_V(INFO, TAG, "Observation ID is %u", *observationId);
+    OC_LOG_V(INFO, TAG, "GeneratedObservation ID is %u", *observationId);
 
     return OC_STACK_OK;
 exit:
@@ -279,51 +359,68 @@ exit:
 OCStackResult AddObserver (const char         *resUri,
                            const char         *query,
                            OCObservationId    obsId,
-                           OCCoAPToken        *token,
-                           OCDevAddr          *addr,
+                           CAToken_t          token,
+                           uint8_t            tokenLength,
                            OCResource         *resHandle,
-                           OCQualityOfService qos)
+                           OCQualityOfService qos,
+                           OCPayloadFormat    acceptFormat,
+                           const OCDevAddr    *devAddr)
 {
+    // Check if resource exists and is observable.
+    if (!resHandle)
+    {
+        return OC_STACK_INVALID_PARAM;
+    }
+    if (!(resHandle->resourceProperties & OC_OBSERVABLE))
+    {
+        return OC_STACK_RESOURCE_ERROR;
+    }
     ResourceObserver *obsNode = NULL;
 
-    obsNode = (ResourceObserver *) OCMalloc(sizeof(ResourceObserver));
+    if(!resUri || !token || !*token)
+    {
+        return OC_STACK_INVALID_PARAM;
+    }
+
+    obsNode = (ResourceObserver *) OICCalloc(1, sizeof(ResourceObserver));
     if (obsNode)
     {
-        memset(obsNode, 0, sizeof(ResourceObserver));
         obsNode->observeId = obsId;
 
-        obsNode->resUri = (unsigned char *)OCMalloc(strlen(resUri)+1);
+        obsNode->resUri = OICStrdup(resUri);
         VERIFY_NON_NULL (obsNode->resUri);
-        memcpy (obsNode->resUri, resUri, strlen(resUri)+1);
 
         obsNode->qos = qos;
+        obsNode->acceptFormat = acceptFormat;
         if(query)
         {
-            obsNode->query = (unsigned char *)OCMalloc(strlen(query)+1);
+            obsNode->query = OICStrdup(query);
             VERIFY_NON_NULL (obsNode->query);
-            memcpy (obsNode->query, query, strlen(query)+1);
         }
+        // If tokenLength is zero, the return value depends on the
+        // particular library implementation (it may or may not be a null pointer).
+        if(tokenLength)
+        {
+            obsNode->token = (CAToken_t)OICMalloc(tokenLength);
+            VERIFY_NON_NULL (obsNode->token);
+            memcpy(obsNode->token, token, tokenLength);
+        }
+        obsNode->tokenLength = tokenLength;
 
-        obsNode->token.tokenLength = token->tokenLength;
-        memcpy (obsNode->token.token, token->token, token->tokenLength);
-
-        obsNode->addr = (OCDevAddr *)OCMalloc(sizeof(OCDevAddr));
-        VERIFY_NON_NULL (obsNode->addr);
-        memcpy (obsNode->addr, addr, sizeof(OCDevAddr));
-
+        obsNode->devAddr = *devAddr;
         obsNode->resource = resHandle;
 
         LL_APPEND (serverObsList, obsNode);
+
         return OC_STACK_OK;
     }
 
 exit:
     if (obsNode)
     {
-        OCFree(obsNode->resUri);
-        OCFree(obsNode->query);
-        OCFree(obsNode->addr);
-        OCFree(obsNode);
+        OICFree(obsNode->resUri);
+        OICFree(obsNode->query);
+        OICFree(obsNode);
     }
     return OC_STACK_NO_MEMORY;
 }
@@ -342,46 +439,57 @@ ResourceObserver* GetObserverUsingId (const OCObservationId observeId)
             }
         }
     }
-    OC_LOG(INFO, TAG, PCF("Observer node not found!!"));
+    OC_LOG(INFO, TAG, "Observer node not found!!");
     return NULL;
 }
 
-ResourceObserver* GetObserverUsingToken (const OCCoAPToken * token)
+ResourceObserver* GetObserverUsingToken (const CAToken_t token, uint8_t tokenLength)
 {
     ResourceObserver *out = NULL;
 
-    if(token)
+    if(token && *token)
     {
+        OC_LOG(INFO, TAG, "Looking for token");
+        OC_LOG_BUFFER(INFO, TAG, (const uint8_t *)token, tokenLength);
+        OC_LOG(INFO, TAG, "\tFound token:");
+
         LL_FOREACH (serverObsList, out)
         {
-            OC_LOG(INFO, TAG,PCF("comparing tokens"));
-            OC_LOG_BUFFER(INFO, TAG, token->token, token->tokenLength);
-            OC_LOG_BUFFER(INFO, TAG, out->token.token, out->token.tokenLength);
-            if((out->token.tokenLength == token->tokenLength) &&
-               (memcmp(out->token.token, token->token, token->tokenLength) == 0))
+            OC_LOG_BUFFER(INFO, TAG, (const uint8_t *)out->token, tokenLength);
+            if((memcmp(out->token, token, tokenLength) == 0))
             {
                 return out;
             }
         }
     }
-    OC_LOG(INFO, TAG, PCF("Observer node not found!!"));
+    else
+    {
+        OC_LOG(ERROR, TAG, "Passed in NULL token");
+    }
+
+    OC_LOG(INFO, TAG, "Observer node not found!!");
     return NULL;
 }
 
-OCStackResult DeleteObserverUsingToken (OCCoAPToken * token)
+OCStackResult DeleteObserverUsingToken (CAToken_t token, uint8_t tokenLength)
 {
+    if(!token || !*token)
+    {
+        return OC_STACK_INVALID_PARAM;
+    }
+
     ResourceObserver *obsNode = NULL;
 
-    obsNode = GetObserverUsingToken (token);
+    obsNode = GetObserverUsingToken (token, tokenLength);
     if (obsNode)
     {
-        OC_LOG_V(INFO, TAG, PCF("deleting tokens"));
-        OC_LOG_BUFFER(INFO, TAG, obsNode->token.token, obsNode->token.tokenLength);
+        OC_LOG_V(INFO, TAG, "deleting observer id  %u with token", obsNode->observeId);
+        OC_LOG_BUFFER(INFO, TAG, (const uint8_t *)obsNode->token, tokenLength);
         LL_DELETE (serverObsList, obsNode);
-        OCFree(obsNode->resUri);
-        OCFree(obsNode->query);
-        OCFree(obsNode->addr);
-        OCFree(obsNode);
+        OICFree(obsNode->resUri);
+        OICFree(obsNode->query);
+        OICFree(obsNode->token);
+        OICFree(obsNode);
     }
     // it is ok if we did not find the observer...
     return OC_STACK_OK;
@@ -393,7 +501,92 @@ void DeleteObserverList()
     ResourceObserver *tmp = NULL;
     LL_FOREACH_SAFE (serverObsList, out, tmp)
     {
-        DeleteObserverUsingToken (&(out->token));
+        if(out)
+        {
+            DeleteObserverUsingToken ((out->token), out->tokenLength);
+        }
     }
     serverObsList = NULL;
 }
+
+/*
+ * CA layer expects observe registration/de-reg/notiifcations to be passed as a header
+ * option, which breaks the protocol abstraction requirement between RI & CA, and
+ * has to be fixed in the future. The function below adds the header option for observe.
+ * It should be noted that the observe header option is assumed to be the first option
+ * in the list of user defined header options and hence it is inserted at the front
+ * of the header options list and number of options adjusted accordingly.
+ */
+OCStackResult
+CreateObserveHeaderOption (CAHeaderOption_t **caHdrOpt,
+                           OCHeaderOption *ocHdrOpt,
+                           uint8_t numOptions,
+                           uint8_t observeFlag)
+{
+    if(!caHdrOpt || !ocHdrOpt)
+    {
+        return OC_STACK_INVALID_PARAM;
+    }
+
+    CAHeaderOption_t *tmpHdrOpt = NULL;
+
+    tmpHdrOpt = (CAHeaderOption_t *) OICCalloc ((numOptions+1), sizeof(CAHeaderOption_t));
+    if (NULL == tmpHdrOpt)
+    {
+        return OC_STACK_NO_MEMORY;
+    }
+    tmpHdrOpt[0].protocolID = CA_COAP_ID;
+    tmpHdrOpt[0].optionID = COAP_OPTION_OBSERVE;
+    tmpHdrOpt[0].optionLength = sizeof(uint8_t);
+    tmpHdrOpt[0].optionData[0] = observeFlag;
+    for (uint8_t i = 0; i < numOptions; i++)
+    {
+        memcpy (&(tmpHdrOpt[i+1]), &(ocHdrOpt[i]), sizeof(CAHeaderOption_t));
+    }
+
+    *caHdrOpt = tmpHdrOpt;
+    return OC_STACK_OK;
+}
+
+/*
+ * CA layer passes observe information to the RI layer as a header option, which
+ * breaks the protocol abstraction requirement between RI & CA, and has to be fixed
+ * in the future. The function below removes the observe header option and processes it.
+ * It should be noted that the observe header option is always assumed to be the first
+ * option in the list of user defined header options and hence it is deleted from the
+ * front of the header options list and the number of options is adjusted accordingly.
+ */
+OCStackResult
+GetObserveHeaderOption (uint32_t * observationOption,
+                        CAHeaderOption_t *options,
+                        uint8_t * numOptions)
+{
+    if(!observationOption)
+    {
+        return OC_STACK_INVALID_PARAM;
+    }
+
+    if(!options || !numOptions)
+    {
+        return OC_STACK_INVALID_PARAM;
+    }
+
+    *observationOption = OC_OBSERVE_NO_OPTION;
+
+    for(uint8_t i = 0; i < *numOptions; i++)
+    {
+        if(options[i].protocolID == CA_COAP_ID &&
+                options[i].optionID == COAP_OPTION_OBSERVE)
+        {
+            *observationOption = options[i].optionData[0];
+            for(uint8_t c = i; c < *numOptions-1; c++)
+            {
+                options[i] = options[i+1];
+            }
+            (*numOptions)--;
+            return OC_STACK_OK;
+        }
+    }
+    return OC_STACK_OK;
+}
+