1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
23 #include "ocstackconfig.h"
24 #include "ocstackinternal.h"
25 #include "ocobserve.h"
26 #include "ocresourcehandler.h"
28 #include "oic_malloc.h"
29 #include "oic_string.h"
30 #include "ocpayload.h"
31 #include "ocserverrequest.h"
35 #include <coap/utlist.h>
37 #include <coap/coap.h>
40 #define MOD_NAME "ocobserve"
42 #define TAG "OIC_RI_OBSERVE"
44 #define VERIFY_NON_NULL(arg) { if (!arg) {OIC_LOG(FATAL, TAG, #arg " is NULL"); goto exit;} }
46 static struct ResourceObserver * g_serverObsList = NULL;
47 static oc_mutex g_serverObsListMutex = NULL;
49 static ResourceObserver* GetObserverUsingIdAsOwner (const OCObservationId observeId);
51 static ResourceObserver* CloneObserverNode (ResourceObserver* observer)
53 ResourceObserver* dupObsNode = NULL;
56 dupObsNode = (ResourceObserver *) OICCalloc(1, sizeof(ResourceObserver));
57 VERIFY_NON_NULL(dupObsNode);
58 memcpy(dupObsNode, observer, sizeof(ResourceObserver));
62 dupObsNode->resUri = OICStrdup(observer->resUri);
63 VERIFY_NON_NULL(dupObsNode->resUri);
68 dupObsNode->query = OICStrdup(observer->query);
69 VERIFY_NON_NULL(dupObsNode->query);
74 dupObsNode->token = (CAToken_t)OICMalloc(observer->tokenLength);
75 VERIFY_NON_NULL(dupObsNode->token);
76 memcpy(dupObsNode->token, observer->token, observer->tokenLength);
79 dupObsNode->next = NULL;
85 FreeObserver(dupObsNode);
89 static void FreeObserverList (ResourceObserver* list)
91 ResourceObserver* head = list;
92 ResourceObserver* del = NULL;
105 static ResourceObserver* CloneObserverList (ResourceObserver* obsList)
107 ResourceObserver* dupList = NULL;
108 ResourceObserver* out = NULL;
110 LL_FOREACH(obsList, out)
112 ResourceObserver *obsNode = CloneObserverNode(out);
115 FreeObserverList(dupList);
120 LL_APPEND(dupList, obsNode);
127 * Determine observe QOS based on the QOS of the request.
128 * The qos passed as a parameter overrides what the client requested.
129 * If we want the client preference taking high priority make:
130 * qos = resourceObserver->qos;
132 * @param method RESTful method.
133 * @param resourceObserver Observer.
134 * @param appQoS Quality of service.
135 * @return The quality of service of the observer.
137 static OCQualityOfService DetermineObserverQoS(OCMethod method,
138 ResourceObserver * resourceObserver, OCQualityOfService appQoS)
140 if (!resourceObserver)
142 OIC_LOG(ERROR, TAG, "DetermineObserverQoS called with invalid resourceObserver");
146 OCQualityOfService decidedQoS = appQoS;
147 if (appQoS == OC_NA_QOS)
149 decidedQoS = resourceObserver->qos;
152 if (appQoS != OC_HIGH_QOS)
154 OIC_LOG_V(INFO, TAG, "Current NON count for this observer is %d",
155 resourceObserver->lowQosCount);
157 if ((resourceObserver->forceHighQos \
158 || resourceObserver->lowQosCount >= MAX_OBSERVER_NON_COUNT) \
159 && method != OC_REST_PRESENCE)
161 if (resourceObserver->forceHighQos \
162 || resourceObserver->lowQosCount >= MAX_OBSERVER_NON_COUNT)
165 resourceObserver->lowQosCount = 0;
166 // at some point we have to to send CON to check on the
167 // availability of observer
168 OIC_LOG(INFO, TAG, "This time we are sending the notification as High qos");
169 decidedQoS = OC_HIGH_QOS;
173 (resourceObserver->lowQosCount)++;
180 * Create a get request and pass to entityhandler to notify specific observer.
182 * @param observer Observer that need to be notified.
183 * @param qos Quality of service of resource.
185 * @return ::OC_STACK_OK on success, some other value upon failure.
187 static OCStackResult SendObserveNotification(ResourceObserver *observer,
188 OCQualityOfService qos)
190 OCStackResult result = OC_STACK_ERROR;
191 OCServerRequest * request = NULL;
192 OCEntityHandlerRequest ehRequest = {0};
193 OCEntityHandlerResult ehResult = OC_EH_ERROR;
195 result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET,
196 0, observer->resource->sequenceNum, qos,
197 observer->query, NULL, NULL,
198 observer->token, observer->tokenLength,
199 observer->resUri, 0, observer->acceptFormat,
204 request->observeResult = OC_STACK_OK;
205 if (result == OC_STACK_OK)
207 result = FormOCEntityHandlerRequest(
209 (OCRequestHandle) request->requestId,
212 (OCResourceHandle) observer->resource,
214 PAYLOAD_TYPE_REPRESENTATION,
216 request->payloadSize,
217 request->numRcvdVendorSpecificHeaderOptions,
218 request->rcvdVendorSpecificHeaderOptions,
219 OC_OBSERVE_NO_OPTION,
222 if (result == OC_STACK_OK)
224 ehResult = observer->resource->entityHandler(OC_REQUEST_FLAG, &ehRequest,
225 observer->resource->entityHandlerCallbackParam);
227 // Clear server request on error case
228 if (!OCResultToSuccess(EntityHandlerCodeToOCStackCode(ehResult)))
230 FindAndDeleteServerRequest(request);
232 // Reset Observer TTL.
233 observer->TTL = GetTicks(MAX_OBSERVER_TTL_SECONDS * MILLISECONDS_PER_SECOND);
235 OCPayloadDestroy(ehRequest.payload);
243 OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, uint32_t maxAge,
244 OCPresenceTrigger trigger, OCResourceType *resourceType, OCQualityOfService qos)
246 OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, uint32_t maxAge,
247 OCQualityOfService qos)
250 OIC_LOG(INFO, TAG, "Entering SendObserverNotification");
253 return OC_STACK_INVALID_PARAM;
256 OCStackResult result = OC_STACK_ERROR;
257 ResourceObserver * resourceObserver = NULL;
259 OCServerRequest * request = NULL;
260 bool observeErrorFlag = false;
262 // Find clients that are observing this resource
263 oc_mutex_lock(g_serverObsListMutex);
264 resourceObserver = g_serverObsList;
265 while (resourceObserver)
267 if (resourceObserver->resource == resPtr)
271 if (method != OC_REST_PRESENCE)
274 qos = DetermineObserverQoS(method, resourceObserver, qos);
275 result = SendObserveNotification(resourceObserver, qos);
280 OCEntityHandlerResponse ehResponse = {0};
282 //This is effectively the implementation for the presence entity handler.
283 OIC_LOG(DEBUG, TAG, "This notification is for Presence");
284 result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET,
285 0, resPtr->sequenceNum, qos, resourceObserver->query,
287 resourceObserver->token, resourceObserver->tokenLength,
288 resourceObserver->resUri, 0, resourceObserver->acceptFormat,
289 &resourceObserver->devAddr);
291 if (result == OC_STACK_OK)
293 OCPresencePayload* presenceResBuf = OCPresencePayloadCreate(
294 resPtr->sequenceNum, maxAge, trigger,
295 resourceType ? resourceType->resourcetypename : NULL);
299 oc_mutex_unlock(g_serverObsListMutex);
300 return OC_STACK_NO_MEMORY;
303 if (result == OC_STACK_OK)
305 ehResponse.ehResult = OC_EH_OK;
306 ehResponse.payload = (OCPayload*)presenceResBuf;
307 ehResponse.persistentBufferFlag = 0;
308 ehResponse.requestHandle = (OCRequestHandle) request->requestId;
309 ehResponse.resourceHandle = (OCResourceHandle) resPtr;
310 OICStrcpy(ehResponse.resourceUri, sizeof(ehResponse.resourceUri),
311 resourceObserver->resUri);
312 result = OCDoResponse(&ehResponse);
313 if (result != OC_STACK_OK)
315 OIC_LOG(ERROR, TAG, "Failed to send presence notification!");
316 FindAndDeleteServerRequest(request);
320 OCPresencePayloadDestroy(presenceResBuf);
325 // Since we are in a loop, set an error flag to indicate at least one error occurred.
326 if (result != OC_STACK_OK)
328 observeErrorFlag = true;
331 resourceObserver = resourceObserver->next;
334 oc_mutex_unlock(g_serverObsListMutex);
338 OIC_LOG(INFO, TAG, "Resource has no observers");
339 result = OC_STACK_NO_OBSERVERS;
341 else if (observeErrorFlag)
343 OIC_LOG(ERROR, TAG, "Observer notification error");
344 result = OC_STACK_ERROR;
349 OCStackResult SendListObserverNotification (OCResource * resource,
350 OCObservationId *obsIdList, uint8_t numberOfIds,
351 const OCRepPayload *payload,
353 OCQualityOfService qos)
356 if (!resource || !obsIdList || !payload)
358 return OC_STACK_INVALID_PARAM;
361 uint8_t numIds = numberOfIds;
362 ResourceObserver *observer = NULL;
363 uint8_t numSentNotification = 0;
364 OCServerRequest * request = NULL;
365 OCStackResult result = OC_STACK_ERROR;
366 bool observeErrorFlag = false;
368 OIC_LOG(INFO, TAG, "Entering SendListObserverNotification");
371 oc_mutex_lock(g_serverObsListMutex);
372 observer = GetObserverUsingIdAsOwner (*obsIdList);
375 // Found observer - verify if it matches the resource handle
376 if (observer->resource == resource)
378 qos = DetermineObserverQoS(OC_REST_GET, observer, qos);
381 result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET,
382 0, resource->sequenceNum, qos, observer->query,
383 NULL, NULL, observer->token, observer->tokenLength,
384 observer->resUri, 0, observer->acceptFormat,
389 request->observeResult = OC_STACK_OK;
390 if (result == OC_STACK_OK)
392 OCEntityHandlerResponse ehResponse = {0};
393 ehResponse.ehResult = OC_EH_OK;
394 ehResponse.payload = (OCPayload*)OCRepPayloadCreate();
395 if (!ehResponse.payload)
397 FindAndDeleteServerRequest(request);
398 oc_mutex_unlock(g_serverObsListMutex);
401 memcpy(ehResponse.payload, payload, sizeof(*payload));
402 ehResponse.persistentBufferFlag = 0;
403 ehResponse.requestHandle = (OCRequestHandle) request->requestId;
404 ehResponse.resourceHandle = (OCResourceHandle) resource;
405 result = OCDoResponse(&ehResponse);
406 if (result == OC_STACK_OK)
408 OIC_LOG_V(INFO, TAG, "Observer id %d notified.", *obsIdList);
410 // Increment only if OCDoResponse is successful
411 numSentNotification++;
415 OIC_LOG_V(INFO, TAG, "Error notifying observer id %d.", *obsIdList);
416 FindAndDeleteServerRequest(request);
419 // Reset Observer TTL.
421 GetTicks(MAX_OBSERVER_TTL_SECONDS * MILLISECONDS_PER_SECOND);
423 OICFree(ehResponse.payload);
427 FindAndDeleteServerRequest(request);
431 // Since we are in a loop, set an error flag to indicate
432 // at least one error occurred.
433 if (result != OC_STACK_OK)
435 observeErrorFlag = true;
440 oc_mutex_unlock(g_serverObsListMutex);
445 if (numSentNotification == numberOfIds && !observeErrorFlag)
449 else if (numSentNotification == 0)
451 return OC_STACK_NO_OBSERVERS;
455 OIC_LOG(ERROR, TAG, "Observer notification error");
456 return OC_STACK_ERROR;
460 OCStackResult GenerateObserverId (OCObservationId *observationId)
464 OIC_LOG(INFO, TAG, "Entering GenerateObserverId");
465 VERIFY_NON_NULL (observationId);
471 *observationId = OCGetRandomByte();
472 } while (0 == *observationId); //Make sure *observationId is not 0
473 // Check if observation Id already exists
474 found = IsObserverAvailable (*observationId);
477 OIC_LOG_V(INFO, TAG, "GeneratedObservation ID is %u", *observationId);
481 return OC_STACK_ERROR;
484 OCStackResult AddObserver (const char *resUri,
486 OCObservationId obsId,
489 OCResource *resHandle,
490 OCQualityOfService qos,
491 OCPayloadFormat acceptFormat,
492 const OCDevAddr *devAddr)
494 // Check if resource exists and is observable.
497 return OC_STACK_INVALID_PARAM;
499 if (!(resHandle->resourceProperties & OC_OBSERVABLE))
501 return OC_STACK_RESOURCE_ERROR;
504 if (!resUri || !token)
506 return OC_STACK_INVALID_PARAM;
509 ResourceObserver *obsNode = (ResourceObserver *) OICCalloc(1, sizeof(ResourceObserver));
512 obsNode->observeId = obsId;
514 obsNode->resUri = OICStrdup(resUri);
515 VERIFY_NON_NULL (obsNode->resUri);
518 obsNode->acceptFormat = acceptFormat;
521 obsNode->query = OICStrdup(query);
522 VERIFY_NON_NULL (obsNode->query);
524 // If tokenLength is zero, the return value depends on the
525 // particular library implementation (it may or may not be a null pointer).
528 obsNode->token = (CAToken_t)OICMalloc(tokenLength);
529 VERIFY_NON_NULL (obsNode->token);
530 memcpy(obsNode->token, token, tokenLength);
532 obsNode->tokenLength = tokenLength;
534 obsNode->devAddr = *devAddr;
535 obsNode->resource = resHandle;
538 if ((strcmp(resUri, OC_RSRVD_PRESENCE_URI) == 0))
545 obsNode->TTL = GetTicks(MAX_OBSERVER_TTL_SECONDS * MILLISECONDS_PER_SECOND);
548 oc_mutex_lock(g_serverObsListMutex);
549 LL_APPEND (g_serverObsList, obsNode);
550 oc_mutex_unlock(g_serverObsListMutex);
558 OICFree(obsNode->resUri);
559 OICFree(obsNode->query);
562 return OC_STACK_NO_MEMORY;
566 * This function checks if the node is past its time to live and
567 * deletes it if timed-out. Calling this function with a presence callback
568 * with ttl set to 0 will not delete anything as presence nodes have
569 * their own mechanisms for timeouts. A null argument will cause the function to
572 static void CheckTimedOutObserver(ResourceObserver* observer)
574 if (!observer || observer->TTL == 0)
582 if (observer->TTL < now)
584 // Send confirmable notification message to observer.
585 OIC_LOG(INFO, TAG, "Sending High-QoS notification to observer");
586 SendObserveNotification(observer, OC_HIGH_QOS);
590 ResourceObserver* GetObserverUsingId (const OCObservationId observeId)
592 ResourceObserver *out = NULL;
596 oc_mutex_lock(g_serverObsListMutex);
597 LL_FOREACH (g_serverObsList, out)
599 if (out->observeId == observeId)
601 oc_mutex_unlock(g_serverObsListMutex);
602 return CloneObserverNode(out);
604 CheckTimedOutObserver(out);
606 oc_mutex_unlock(g_serverObsListMutex);
608 OIC_LOG(INFO, TAG, "Observer node not found!!");
612 static ResourceObserver* GetObserverUsingIdAsOwner (const OCObservationId observeId)
614 ResourceObserver *out = NULL;
618 LL_FOREACH (g_serverObsList, out)
620 if (out->observeId == observeId)
624 CheckTimedOutObserver(out);
627 OIC_LOG(INFO, TAG, "Observer node not found!!");
631 bool IsObserverAvailable (const OCObservationId observeId)
633 ResourceObserver *out = NULL;
637 oc_mutex_lock(g_serverObsListMutex);
638 LL_FOREACH (g_serverObsList, out)
640 if (out->observeId == observeId)
642 oc_mutex_unlock(g_serverObsListMutex);
646 oc_mutex_unlock(g_serverObsListMutex);
652 ResourceObserver* GetObserverUsingToken (const CAToken_t token, uint8_t tokenLength)
656 OIC_LOG(INFO, TAG, "Looking for token");
657 OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)token, tokenLength);
659 ResourceObserver *out = NULL;
660 oc_mutex_lock(g_serverObsListMutex);
661 LL_FOREACH (g_serverObsList, out)
663 /* de-annotate below line if want to see all token in cbList */
664 //OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)out->token, tokenLength);
665 if ((memcmp(out->token, token, tokenLength) == 0))
667 OIC_LOG(INFO, TAG, "Found in observer list");
668 ResourceObserver *observer = CloneObserverNode(out);
669 oc_mutex_unlock(g_serverObsListMutex);
672 CheckTimedOutObserver(out);
674 oc_mutex_unlock(g_serverObsListMutex);
678 OIC_LOG(ERROR, TAG, "Passed in NULL token");
681 OIC_LOG(INFO, TAG, "Observer node not found!!");
685 static ResourceObserver* GetObserverUsingTokenAsOwner (const CAToken_t token, uint8_t tokenLength)
689 OIC_LOG(INFO, TAG, "Looking for token");
690 OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)token, tokenLength);
692 ResourceObserver *out = NULL;
693 LL_FOREACH (g_serverObsList, out)
695 /* de-annotate below line if want to see all token in cbList */
696 //OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)out->token, tokenLength);
697 if ((memcmp(out->token, token, tokenLength) == 0))
699 OIC_LOG(INFO, TAG, "Found in observer list");
702 CheckTimedOutObserver(out);
707 OIC_LOG(ERROR, TAG, "Passed in NULL token");
710 OIC_LOG(INFO, TAG, "Observer node not found!!");
714 OCStackResult DeleteObserverUsingToken (CAToken_t token, uint8_t tokenLength)
718 return OC_STACK_INVALID_PARAM;
721 oc_mutex_lock(g_serverObsListMutex);
722 ResourceObserver *obsNode = GetObserverUsingTokenAsOwner (token, tokenLength);
725 OIC_LOG_V(INFO, TAG, "deleting observer id %u with token", obsNode->observeId);
726 OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)obsNode->token, tokenLength);
728 LL_DELETE (g_serverObsList, obsNode);
729 FreeObserver(obsNode);
731 oc_mutex_unlock(g_serverObsListMutex);
733 // it is ok if we did not find the observer...
737 OCStackResult DeleteObserverUsingDevAddr(const OCDevAddr *devAddr)
741 return OC_STACK_INVALID_PARAM;
744 oc_mutex_lock(g_serverObsListMutex);
745 ResourceObserver* obsDupList = CloneObserverList(g_serverObsList);
746 oc_mutex_unlock(g_serverObsListMutex);
748 ResourceObserver *out = NULL;
749 ResourceObserver *tmp = NULL;
750 LL_FOREACH_SAFE(obsDupList, out, tmp)
754 if ((strcmp(out->devAddr.addr, devAddr->addr) == 0)
755 && out->devAddr.port == devAddr->port)
757 OIC_LOG_V(INFO, TAG, "deleting observer id %u with %s:%u",
758 out->observeId, out->devAddr.addr, out->devAddr.port);
759 OCStackFeedBack(out->token, out->tokenLength, OC_OBSERVER_NOT_INTERESTED);
764 FreeObserverList(obsDupList);
768 void DeleteObserverList()
770 oc_mutex_lock(g_serverObsListMutex);
772 ResourceObserver* head = g_serverObsList;
773 ResourceObserver* del = NULL;
779 OIC_LOG_V(INFO, TAG, "deleting observer id %u with token", del->observeId);
780 OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)del->token, del->tokenLength);
785 g_serverObsList = NULL;
786 oc_mutex_unlock(g_serverObsListMutex);
790 * CA layer expects observe registration/de-reg/notiifcations to be passed as a header
791 * option, which breaks the protocol abstraction requirement between RI & CA, and
792 * has to be fixed in the future. The function below adds the header option for observe.
793 * It should be noted that the observe header option is assumed to be the first option
794 * in the list of user defined header options and hence it is inserted at the front
795 * of the header options list and number of options adjusted accordingly.
798 CreateObserveHeaderOption (CAHeaderOption_t **caHdrOpt,
799 OCHeaderOption *ocHdrOpt,
805 return OC_STACK_INVALID_PARAM;
808 if (numOptions > 0 && !ocHdrOpt)
810 OIC_LOG (INFO, TAG, "options are NULL though number is non zero");
811 return OC_STACK_INVALID_PARAM;
814 CAHeaderOption_t *tmpHdrOpt = NULL;
816 tmpHdrOpt = (CAHeaderOption_t *) OICCalloc ((numOptions+1), sizeof(CAHeaderOption_t));
817 if (NULL == tmpHdrOpt)
819 return OC_STACK_NO_MEMORY;
821 tmpHdrOpt[0].protocolID = CA_COAP_ID;
822 tmpHdrOpt[0].optionID = COAP_OPTION_OBSERVE;
823 tmpHdrOpt[0].optionLength = sizeof(uint8_t);
824 tmpHdrOpt[0].optionData[0] = observeFlag;
825 for (uint8_t i = 0; i < numOptions; i++)
827 memcpy (&(tmpHdrOpt[i+1]), &(ocHdrOpt[i]), sizeof(CAHeaderOption_t));
830 *caHdrOpt = tmpHdrOpt;
835 * CA layer passes observe information to the RI layer as a header option, which
836 * breaks the protocol abstraction requirement between RI & CA, and has to be fixed
837 * in the future. The function below removes the observe header option and processes it.
838 * It should be noted that the observe header option is always assumed to be the first
839 * option in the list of user defined header options and hence it is deleted from the
840 * front of the header options list and the number of options is adjusted accordingly.
843 GetObserveHeaderOption (uint32_t * observationOption,
844 CAHeaderOption_t *options,
845 uint8_t * numOptions)
847 if (!observationOption)
849 return OC_STACK_INVALID_PARAM;
852 if (!options || !numOptions)
854 OIC_LOG (INFO, TAG, "No options present");
858 for(uint8_t i = 0; i < *numOptions; i++)
860 if (options[i].protocolID == CA_COAP_ID &&
861 options[i].optionID == COAP_OPTION_OBSERVE)
863 *observationOption = options[i].optionData[0];
864 for(uint8_t c = i; c < *numOptions-1; c++)
866 options[i] = options[i+1];
875 OCStackResult InitializeObseverList()
877 OIC_LOG(DEBUG, TAG, "InitializeObseverList IN");
879 if (NULL == g_serverObsListMutex)
881 g_serverObsListMutex = oc_mutex_new();
884 OIC_LOG(DEBUG, TAG, "InitializeObseverList OUT");
888 void TerminateObserverList()
890 OIC_LOG(DEBUG, TAG, "TerminateObserverList IN");
892 DeleteObserverList();
894 if (NULL != g_serverObsListMutex)
896 oc_mutex_free(g_serverObsListMutex);
897 g_serverObsListMutex = NULL;
900 OIC_LOG(DEBUG, TAG, "TerminateObserverList OUT");
903 void FreeObserver (ResourceObserver* obsNode)
907 OICFree(obsNode->resUri);
908 OICFree(obsNode->query);
909 OICFree(obsNode->token);