1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
23 #include "ocstackconfig.h"
24 #include "ocstackinternal.h"
25 #include "ocobserve.h"
26 #include "ocresourcehandler.h"
28 #include "oic_malloc.h"
29 #include "oic_string.h"
30 #include "ocpayload.h"
31 #include "ocserverrequest.h"
35 #include <coap/utlist.h>
37 #include <coap/coap.h>
40 #define MOD_NAME "ocobserve"
42 #define TAG "OIC_RI_OBSERVE"
44 #define VERIFY_NON_NULL(arg) { if (!arg) {OIC_LOG(FATAL, TAG, #arg " is NULL"); goto exit;} }
46 static struct ResourceObserver * g_serverObsList = NULL;
47 static oc_mutex g_serverObsListMutex = NULL;
49 static ResourceObserver* GetObserverUsingIdAsOwner (const OCObservationId observeId);
51 static ResourceObserver* CloneObserverNode (ResourceObserver* observer)
53 ResourceObserver* dupObsNode = NULL;
56 dupObsNode = (ResourceObserver *) OICCalloc(1, sizeof(ResourceObserver));
57 VERIFY_NON_NULL(dupObsNode);
58 memcpy(dupObsNode, observer, sizeof(ResourceObserver));
62 dupObsNode->resUri = OICStrdup(observer->resUri);
63 VERIFY_NON_NULL(dupObsNode->resUri);
68 dupObsNode->query = OICStrdup(observer->query);
69 VERIFY_NON_NULL(dupObsNode->query);
74 dupObsNode->token = (CAToken_t)OICMalloc(observer->tokenLength);
75 VERIFY_NON_NULL(dupObsNode->token);
76 memcpy(dupObsNode->token, observer->token, observer->tokenLength);
79 dupObsNode->next = NULL;
85 FreeObserver(dupObsNode);
89 static void FreeObserverList (ResourceObserver* list)
91 ResourceObserver* head = list;
92 ResourceObserver* del = NULL;
105 static ResourceObserver* CloneObserverList (ResourceObserver* obsList)
107 ResourceObserver* dupList = NULL;
108 ResourceObserver* out = NULL;
110 LL_FOREACH(obsList, out)
112 ResourceObserver *obsNode = CloneObserverNode(out);
115 FreeObserverList(dupList);
120 LL_APPEND(dupList, obsNode);
127 * Determine observe QOS based on the QOS of the request.
128 * The qos passed as a parameter overrides what the client requested.
129 * If we want the client preference taking high priority make:
130 * qos = resourceObserver->qos;
132 * @param method RESTful method.
133 * @param resourceObserver Observer.
134 * @param appQoS Quality of service.
135 * @return The quality of service of the observer.
137 static OCQualityOfService DetermineObserverQoS(OCMethod method,
138 ResourceObserver * resourceObserver, OCQualityOfService appQoS)
140 if (!resourceObserver)
142 OIC_LOG(ERROR, TAG, "DetermineObserverQoS called with invalid resourceObserver");
146 OCQualityOfService decidedQoS = appQoS;
147 if (appQoS == OC_NA_QOS)
149 decidedQoS = resourceObserver->qos;
152 if (appQoS != OC_HIGH_QOS)
154 OIC_LOG_V(INFO, TAG, "Current NON count for this observer is %d",
155 resourceObserver->lowQosCount);
157 if ((resourceObserver->forceHighQos \
158 || resourceObserver->lowQosCount >= MAX_OBSERVER_NON_COUNT) \
159 && method != OC_REST_PRESENCE)
161 if (resourceObserver->forceHighQos \
162 || resourceObserver->lowQosCount >= MAX_OBSERVER_NON_COUNT)
165 resourceObserver->lowQosCount = 0;
166 // at some point we have to to send CON to check on the
167 // availability of observer
168 OIC_LOG(INFO, TAG, "This time we are sending the notification as High qos");
169 decidedQoS = OC_HIGH_QOS;
173 (resourceObserver->lowQosCount)++;
180 * Create a get request and pass to entityhandler to notify specific observer.
182 * @param observer Observer that need to be notified.
183 * @param qos Quality of service of resource.
185 * @return ::OC_STACK_OK on success, some other value upon failure.
187 static OCStackResult SendObserveNotification(ResourceObserver *observer,
188 OCQualityOfService qos)
190 OCStackResult result = OC_STACK_ERROR;
191 OCServerRequest * request = NULL;
192 OCEntityHandlerRequest ehRequest = {0};
193 OCEntityHandlerResult ehResult = OC_EH_ERROR;
195 result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET,
196 0, observer->resource->sequenceNum, qos,
197 observer->query, NULL, NULL,
198 observer->token, observer->tokenLength,
199 observer->resUri, 0, observer->acceptFormat,
204 request->observeResult = OC_STACK_OK;
205 if (result == OC_STACK_OK)
207 result = FormOCEntityHandlerRequest(
209 (OCRequestHandle) request->requestId,
212 (OCResourceHandle) observer->resource,
214 PAYLOAD_TYPE_REPRESENTATION,
216 request->payloadSize,
217 request->numRcvdVendorSpecificHeaderOptions,
218 request->rcvdVendorSpecificHeaderOptions,
219 OC_OBSERVE_NO_OPTION,
222 if (result == OC_STACK_OK)
224 ehResult = observer->resource->entityHandler(OC_REQUEST_FLAG, &ehRequest,
225 observer->resource->entityHandlerCallbackParam);
226 if (ehResult == OC_EH_ERROR)
228 FindAndDeleteServerRequest(request);
230 // Reset Observer TTL.
231 observer->TTL = GetTicks(MAX_OBSERVER_TTL_SECONDS * MILLISECONDS_PER_SECOND);
233 OCPayloadDestroy(ehRequest.payload);
241 OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, uint32_t maxAge,
242 OCPresenceTrigger trigger, OCResourceType *resourceType, OCQualityOfService qos)
244 OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, uint32_t maxAge,
245 OCQualityOfService qos)
248 OIC_LOG(INFO, TAG, "Entering SendObserverNotification");
251 return OC_STACK_INVALID_PARAM;
254 OCStackResult result = OC_STACK_ERROR;
255 ResourceObserver * resourceObserver = NULL;
257 OCServerRequest * request = NULL;
258 bool observeErrorFlag = false;
260 // Find clients that are observing this resource
261 oc_mutex_lock(g_serverObsListMutex);
262 resourceObserver = g_serverObsList;
263 while (resourceObserver)
265 if (resourceObserver->resource == resPtr)
269 if (method != OC_REST_PRESENCE)
272 qos = DetermineObserverQoS(method, resourceObserver, qos);
273 result = SendObserveNotification(resourceObserver, qos);
278 OCEntityHandlerResponse ehResponse = {0};
280 //This is effectively the implementation for the presence entity handler.
281 OIC_LOG(DEBUG, TAG, "This notification is for Presence");
282 result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET,
283 0, resPtr->sequenceNum, qos, resourceObserver->query,
285 resourceObserver->token, resourceObserver->tokenLength,
286 resourceObserver->resUri, 0, resourceObserver->acceptFormat,
287 &resourceObserver->devAddr);
289 if (result == OC_STACK_OK)
291 OCPresencePayload* presenceResBuf = OCPresencePayloadCreate(
292 resPtr->sequenceNum, maxAge, trigger,
293 resourceType ? resourceType->resourcetypename : NULL);
297 oc_mutex_unlock(g_serverObsListMutex);
298 return OC_STACK_NO_MEMORY;
301 if (result == OC_STACK_OK)
303 ehResponse.ehResult = OC_EH_OK;
304 ehResponse.payload = (OCPayload*)presenceResBuf;
305 ehResponse.persistentBufferFlag = 0;
306 ehResponse.requestHandle = (OCRequestHandle) request->requestId;
307 ehResponse.resourceHandle = (OCResourceHandle) resPtr;
308 OICStrcpy(ehResponse.resourceUri, sizeof(ehResponse.resourceUri),
309 resourceObserver->resUri);
310 result = OCDoResponse(&ehResponse);
313 OCPresencePayloadDestroy(presenceResBuf);
318 // Since we are in a loop, set an error flag to indicate at least one error occurred.
319 if (result != OC_STACK_OK)
321 observeErrorFlag = true;
324 resourceObserver = resourceObserver->next;
327 oc_mutex_unlock(g_serverObsListMutex);
331 OIC_LOG(INFO, TAG, "Resource has no observers");
332 result = OC_STACK_NO_OBSERVERS;
334 else if (observeErrorFlag)
336 OIC_LOG(ERROR, TAG, "Observer notification error");
337 result = OC_STACK_ERROR;
342 OCStackResult SendListObserverNotification (OCResource * resource,
343 OCObservationId *obsIdList, uint8_t numberOfIds,
344 const OCRepPayload *payload,
346 OCQualityOfService qos)
349 if (!resource || !obsIdList || !payload)
351 return OC_STACK_INVALID_PARAM;
354 uint8_t numIds = numberOfIds;
355 ResourceObserver *observer = NULL;
356 uint8_t numSentNotification = 0;
357 OCServerRequest * request = NULL;
358 OCStackResult result = OC_STACK_ERROR;
359 bool observeErrorFlag = false;
361 OIC_LOG(INFO, TAG, "Entering SendListObserverNotification");
364 oc_mutex_lock(g_serverObsListMutex);
365 observer = GetObserverUsingIdAsOwner (*obsIdList);
368 // Found observer - verify if it matches the resource handle
369 if (observer->resource == resource)
371 qos = DetermineObserverQoS(OC_REST_GET, observer, qos);
374 result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET,
375 0, resource->sequenceNum, qos, observer->query,
376 NULL, NULL, observer->token, observer->tokenLength,
377 observer->resUri, 0, observer->acceptFormat,
382 request->observeResult = OC_STACK_OK;
383 if (result == OC_STACK_OK)
385 OCEntityHandlerResponse ehResponse = {0};
386 ehResponse.ehResult = OC_EH_OK;
387 ehResponse.payload = (OCPayload*)OCRepPayloadCreate();
388 if (!ehResponse.payload)
390 FindAndDeleteServerRequest(request);
391 oc_mutex_unlock(g_serverObsListMutex);
394 memcpy(ehResponse.payload, payload, sizeof(*payload));
395 ehResponse.persistentBufferFlag = 0;
396 ehResponse.requestHandle = (OCRequestHandle) request->requestId;
397 ehResponse.resourceHandle = (OCResourceHandle) resource;
398 result = OCDoResponse(&ehResponse);
399 if (result == OC_STACK_OK)
401 OIC_LOG_V(INFO, TAG, "Observer id %d notified.", *obsIdList);
403 // Increment only if OCDoResponse is successful
404 numSentNotification++;
406 OICFree(ehResponse.payload);
407 FindAndDeleteServerRequest(request);
411 OIC_LOG_V(INFO, TAG, "Error notifying observer id %d.", *obsIdList);
413 // Reset Observer TTL.
415 GetTicks(MAX_OBSERVER_TTL_SECONDS * MILLISECONDS_PER_SECOND);
419 FindAndDeleteServerRequest(request);
423 // Since we are in a loop, set an error flag to indicate
424 // at least one error occurred.
425 if (result != OC_STACK_OK)
427 observeErrorFlag = true;
432 oc_mutex_unlock(g_serverObsListMutex);
437 if (numSentNotification == numberOfIds && !observeErrorFlag)
441 else if (numSentNotification == 0)
443 return OC_STACK_NO_OBSERVERS;
447 OIC_LOG(ERROR, TAG, "Observer notification error");
448 return OC_STACK_ERROR;
452 OCStackResult GenerateObserverId (OCObservationId *observationId)
456 OIC_LOG(INFO, TAG, "Entering GenerateObserverId");
457 VERIFY_NON_NULL (observationId);
463 *observationId = OCGetRandomByte();
464 } while (0 == *observationId); //Make sure *observationId is not 0
465 // Check if observation Id already exists
466 found = IsObserverAvailable (*observationId);
469 OIC_LOG_V(INFO, TAG, "GeneratedObservation ID is %u", *observationId);
473 return OC_STACK_ERROR;
476 OCStackResult AddObserver (const char *resUri,
478 OCObservationId obsId,
481 OCResource *resHandle,
482 OCQualityOfService qos,
483 OCPayloadFormat acceptFormat,
484 const OCDevAddr *devAddr)
486 // Check if resource exists and is observable.
489 return OC_STACK_INVALID_PARAM;
491 if (!(resHandle->resourceProperties & OC_OBSERVABLE))
493 return OC_STACK_RESOURCE_ERROR;
496 if (!resUri || !token)
498 return OC_STACK_INVALID_PARAM;
501 ResourceObserver *obsNode = (ResourceObserver *) OICCalloc(1, sizeof(ResourceObserver));
504 obsNode->observeId = obsId;
506 obsNode->resUri = OICStrdup(resUri);
507 VERIFY_NON_NULL (obsNode->resUri);
510 obsNode->acceptFormat = acceptFormat;
513 obsNode->query = OICStrdup(query);
514 VERIFY_NON_NULL (obsNode->query);
516 // If tokenLength is zero, the return value depends on the
517 // particular library implementation (it may or may not be a null pointer).
520 obsNode->token = (CAToken_t)OICMalloc(tokenLength);
521 VERIFY_NON_NULL (obsNode->token);
522 memcpy(obsNode->token, token, tokenLength);
524 obsNode->tokenLength = tokenLength;
526 obsNode->devAddr = *devAddr;
527 obsNode->resource = resHandle;
530 if ((strcmp(resUri, OC_RSRVD_PRESENCE_URI) == 0))
537 obsNode->TTL = GetTicks(MAX_OBSERVER_TTL_SECONDS * MILLISECONDS_PER_SECOND);
540 oc_mutex_lock(g_serverObsListMutex);
541 LL_APPEND (g_serverObsList, obsNode);
542 oc_mutex_unlock(g_serverObsListMutex);
550 OICFree(obsNode->resUri);
551 OICFree(obsNode->query);
554 return OC_STACK_NO_MEMORY;
558 * This function checks if the node is past its time to live and
559 * deletes it if timed-out. Calling this function with a presence callback
560 * with ttl set to 0 will not delete anything as presence nodes have
561 * their own mechanisms for timeouts. A null argument will cause the function to
564 static void CheckTimedOutObserver(ResourceObserver* observer)
566 if (!observer || observer->TTL == 0)
574 if (observer->TTL < now)
576 // Send confirmable notification message to observer.
577 OIC_LOG(INFO, TAG, "Sending High-QoS notification to observer");
578 SendObserveNotification(observer, OC_HIGH_QOS);
582 ResourceObserver* GetObserverUsingId (const OCObservationId observeId)
584 ResourceObserver *out = NULL;
588 oc_mutex_lock(g_serverObsListMutex);
589 LL_FOREACH (g_serverObsList, out)
591 if (out->observeId == observeId)
593 oc_mutex_unlock(g_serverObsListMutex);
594 return CloneObserverNode(out);
596 CheckTimedOutObserver(out);
598 oc_mutex_unlock(g_serverObsListMutex);
600 OIC_LOG(INFO, TAG, "Observer node not found!!");
604 static ResourceObserver* GetObserverUsingIdAsOwner (const OCObservationId observeId)
606 ResourceObserver *out = NULL;
610 LL_FOREACH (g_serverObsList, out)
612 if (out->observeId == observeId)
616 CheckTimedOutObserver(out);
619 OIC_LOG(INFO, TAG, "Observer node not found!!");
623 bool IsObserverAvailable (const OCObservationId observeId)
625 ResourceObserver *out = NULL;
629 oc_mutex_lock(g_serverObsListMutex);
630 LL_FOREACH (g_serverObsList, out)
632 if (out->observeId == observeId)
634 oc_mutex_unlock(g_serverObsListMutex);
638 oc_mutex_unlock(g_serverObsListMutex);
644 ResourceObserver* GetObserverUsingToken (const CAToken_t token, uint8_t tokenLength)
648 OIC_LOG(INFO, TAG, "Looking for token");
649 OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)token, tokenLength);
651 ResourceObserver *out = NULL;
652 oc_mutex_lock(g_serverObsListMutex);
653 LL_FOREACH (g_serverObsList, out)
655 /* de-annotate below line if want to see all token in cbList */
656 //OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)out->token, tokenLength);
657 if ((memcmp(out->token, token, tokenLength) == 0))
659 OIC_LOG(INFO, TAG, "Found in observer list");
660 oc_mutex_unlock(g_serverObsListMutex);
661 return CloneObserverNode(out);
663 CheckTimedOutObserver(out);
665 oc_mutex_unlock(g_serverObsListMutex);
669 OIC_LOG(ERROR, TAG, "Passed in NULL token");
672 OIC_LOG(INFO, TAG, "Observer node not found!!");
676 static ResourceObserver* GetObserverUsingTokenAsOwner (const CAToken_t token, uint8_t tokenLength)
680 OIC_LOG(INFO, TAG, "Looking for token");
681 OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)token, tokenLength);
683 ResourceObserver *out = NULL;
684 LL_FOREACH (g_serverObsList, out)
686 /* de-annotate below line if want to see all token in cbList */
687 //OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)out->token, tokenLength);
688 if ((memcmp(out->token, token, tokenLength) == 0))
690 OIC_LOG(INFO, TAG, "Found in observer list");
693 CheckTimedOutObserver(out);
698 OIC_LOG(ERROR, TAG, "Passed in NULL token");
701 OIC_LOG(INFO, TAG, "Observer node not found!!");
705 OCStackResult DeleteObserverUsingToken (CAToken_t token, uint8_t tokenLength)
709 return OC_STACK_INVALID_PARAM;
712 oc_mutex_lock(g_serverObsListMutex);
713 ResourceObserver *obsNode = GetObserverUsingTokenAsOwner (token, tokenLength);
716 OIC_LOG_V(INFO, TAG, "deleting observer id %u with token", obsNode->observeId);
717 OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)obsNode->token, tokenLength);
719 LL_DELETE (g_serverObsList, obsNode);
720 FreeObserver(obsNode);
722 oc_mutex_unlock(g_serverObsListMutex);
724 // it is ok if we did not find the observer...
728 OCStackResult DeleteObserverUsingDevAddr(const OCDevAddr *devAddr)
732 return OC_STACK_INVALID_PARAM;
735 oc_mutex_lock(g_serverObsListMutex);
736 ResourceObserver* obsDupList = CloneObserverList(g_serverObsList);
737 oc_mutex_unlock(g_serverObsListMutex);
739 ResourceObserver *out = NULL;
740 ResourceObserver *tmp = NULL;
741 LL_FOREACH_SAFE(obsDupList, out, tmp)
745 if ((strcmp(out->devAddr.addr, devAddr->addr) == 0)
746 && out->devAddr.port == devAddr->port)
748 OIC_LOG_V(INFO, TAG, "deleting observer id %u with %s:%u",
749 out->observeId, out->devAddr.addr, out->devAddr.port);
750 OCStackFeedBack(out->token, out->tokenLength, OC_OBSERVER_NOT_INTERESTED);
755 FreeObserverList(obsDupList);
759 void DeleteObserverList()
761 oc_mutex_lock(g_serverObsListMutex);
763 ResourceObserver* head = g_serverObsList;
764 ResourceObserver* del = NULL;
770 OIC_LOG_V(INFO, TAG, "deleting observer id %u with token", del->observeId);
771 OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)del->token, del->tokenLength);
776 g_serverObsList = NULL;
777 oc_mutex_unlock(g_serverObsListMutex);
781 * CA layer expects observe registration/de-reg/notiifcations to be passed as a header
782 * option, which breaks the protocol abstraction requirement between RI & CA, and
783 * has to be fixed in the future. The function below adds the header option for observe.
784 * It should be noted that the observe header option is assumed to be the first option
785 * in the list of user defined header options and hence it is inserted at the front
786 * of the header options list and number of options adjusted accordingly.
789 CreateObserveHeaderOption (CAHeaderOption_t **caHdrOpt,
790 OCHeaderOption *ocHdrOpt,
796 return OC_STACK_INVALID_PARAM;
799 if (numOptions > 0 && !ocHdrOpt)
801 OIC_LOG (INFO, TAG, "options are NULL though number is non zero");
802 return OC_STACK_INVALID_PARAM;
805 CAHeaderOption_t *tmpHdrOpt = NULL;
807 tmpHdrOpt = (CAHeaderOption_t *) OICCalloc ((numOptions+1), sizeof(CAHeaderOption_t));
808 if (NULL == tmpHdrOpt)
810 return OC_STACK_NO_MEMORY;
812 tmpHdrOpt[0].protocolID = CA_COAP_ID;
813 tmpHdrOpt[0].optionID = COAP_OPTION_OBSERVE;
814 tmpHdrOpt[0].optionLength = sizeof(uint8_t);
815 tmpHdrOpt[0].optionData[0] = observeFlag;
816 for (uint8_t i = 0; i < numOptions; i++)
818 memcpy (&(tmpHdrOpt[i+1]), &(ocHdrOpt[i]), sizeof(CAHeaderOption_t));
821 *caHdrOpt = tmpHdrOpt;
826 * CA layer passes observe information to the RI layer as a header option, which
827 * breaks the protocol abstraction requirement between RI & CA, and has to be fixed
828 * in the future. The function below removes the observe header option and processes it.
829 * It should be noted that the observe header option is always assumed to be the first
830 * option in the list of user defined header options and hence it is deleted from the
831 * front of the header options list and the number of options is adjusted accordingly.
834 GetObserveHeaderOption (uint32_t * observationOption,
835 CAHeaderOption_t *options,
836 uint8_t * numOptions)
838 if (!observationOption)
840 return OC_STACK_INVALID_PARAM;
843 if (!options || !numOptions)
845 OIC_LOG (INFO, TAG, "No options present");
849 for(uint8_t i = 0; i < *numOptions; i++)
851 if (options[i].protocolID == CA_COAP_ID &&
852 options[i].optionID == COAP_OPTION_OBSERVE)
854 *observationOption = options[i].optionData[0];
855 for(uint8_t c = i; c < *numOptions-1; c++)
857 options[i] = options[i+1];
866 OCStackResult InitializeObseverList()
868 OIC_LOG(DEBUG, TAG, "InitializeObseverList IN");
870 if (NULL == g_serverObsListMutex)
872 g_serverObsListMutex = oc_mutex_new();
875 OIC_LOG(DEBUG, TAG, "InitializeObseverList OUT");
879 void TerminateObserverList()
881 OIC_LOG(DEBUG, TAG, "TerminateObserverList IN");
883 DeleteObserverList();
885 if (NULL != g_serverObsListMutex)
887 oc_mutex_free(g_serverObsListMutex);
888 g_serverObsListMutex = NULL;
891 OIC_LOG(DEBUG, TAG, "TerminateObserverList OUT");
894 void FreeObserver (ResourceObserver* obsNode)
898 OICFree(obsNode->resUri);
899 OICFree(obsNode->query);
900 OICFree(obsNode->token);