1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
23 #include "ocstackconfig.h"
24 #include "ocstackinternal.h"
25 #include "ocobserve.h"
26 #include "ocresourcehandler.h"
28 #include "oic_malloc.h"
29 #include "oic_string.h"
30 #include "ocpayload.h"
31 #include "ocserverrequest.h"
34 #include <coap/utlist.h>
36 #include <coap/coap.h>
39 #define MOD_NAME "ocobserve"
41 #define TAG "OIC_RI_OBSERVE"
43 #define VERIFY_NON_NULL(arg) { if (!arg) {OIC_LOG(FATAL, TAG, #arg " is NULL"); goto exit;} }
45 static struct ResourceObserver * g_serverObsList = NULL;
47 * Determine observe QOS based on the QOS of the request.
48 * The qos passed as a parameter overrides what the client requested.
49 * If we want the client preference taking high priority make:
50 * qos = resourceObserver->qos;
52 * @param method RESTful method.
53 * @param resourceObserver Observer.
54 * @param appQoS Quality of service.
55 * @return The quality of service of the observer.
57 static OCQualityOfService DetermineObserverQoS(OCMethod method,
58 ResourceObserver * resourceObserver, OCQualityOfService appQoS)
60 if (!resourceObserver)
62 OIC_LOG(ERROR, TAG, "DetermineObserverQoS called with invalid resourceObserver");
66 OCQualityOfService decidedQoS = appQoS;
67 if (appQoS == OC_NA_QOS)
69 decidedQoS = resourceObserver->qos;
72 if (appQoS != OC_HIGH_QOS)
74 OIC_LOG_V(INFO, TAG, "Current NON count for this observer is %d",
75 resourceObserver->lowQosCount);
77 if ((resourceObserver->forceHighQos \
78 || resourceObserver->lowQosCount >= MAX_OBSERVER_NON_COUNT) \
79 && method != OC_REST_PRESENCE)
81 if (resourceObserver->forceHighQos \
82 || resourceObserver->lowQosCount >= MAX_OBSERVER_NON_COUNT)
85 resourceObserver->lowQosCount = 0;
86 // at some point we have to to send CON to check on the
87 // availability of observer
88 OIC_LOG(INFO, TAG, "This time we are sending the notification as High qos");
89 decidedQoS = OC_HIGH_QOS;
93 (resourceObserver->lowQosCount)++;
100 * Create a get request and pass to entityhandler to notify specific observer.
102 * @param observer Observer that need to be notified.
103 * @param qos Quality of service of resource.
105 * @return ::OC_STACK_OK on success, some other value upon failure.
107 static OCStackResult SendObserveNotification(ResourceObserver *observer,
108 OCQualityOfService qos)
110 OCStackResult result = OC_STACK_ERROR;
111 OCServerRequest * request = NULL;
112 OCEntityHandlerRequest ehRequest = {0};
113 OCEntityHandlerResult ehResult = OC_EH_ERROR;
115 result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET,
116 0, observer->resource->sequenceNum, qos,
117 observer->query, NULL, NULL,
118 observer->token, observer->tokenLength,
119 observer->resUri, 0, observer->acceptFormat,
124 request->observeResult = OC_STACK_OK;
125 if (result == OC_STACK_OK)
127 result = FormOCEntityHandlerRequest(
129 (OCRequestHandle) request->requestId,
132 (OCResourceHandle) observer->resource,
134 PAYLOAD_TYPE_REPRESENTATION,
136 request->payloadSize,
137 request->numRcvdVendorSpecificHeaderOptions,
138 request->rcvdVendorSpecificHeaderOptions,
139 OC_OBSERVE_NO_OPTION,
142 if (result == OC_STACK_OK)
144 ehResult = observer->resource->entityHandler(OC_REQUEST_FLAG, &ehRequest,
145 observer->resource->entityHandlerCallbackParam);
146 if (ehResult == OC_EH_ERROR)
148 FindAndDeleteServerRequest(request);
150 // Reset Observer TTL.
151 observer->TTL = GetTicks(MAX_OBSERVER_TTL_SECONDS * MILLISECONDS_PER_SECOND);
153 OCPayloadDestroy(ehRequest.payload);
161 OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, uint32_t maxAge,
162 OCPresenceTrigger trigger, OCResourceType *resourceType, OCQualityOfService qos)
164 OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, uint32_t maxAge,
165 OCQualityOfService qos)
168 OIC_LOG(INFO, TAG, "Entering SendObserverNotification");
171 return OC_STACK_INVALID_PARAM;
174 OCStackResult result = OC_STACK_ERROR;
175 ResourceObserver * resourceObserver = g_serverObsList;
177 OCServerRequest * request = NULL;
178 OCEntityHandlerRequest ehRequest = {0};
179 OCEntityHandlerResult ehResult = OC_EH_ERROR;
180 bool observeErrorFlag = false;
182 // Find clients that are observing this resource
183 while (resourceObserver)
185 if (resourceObserver->resource == resPtr)
189 if (method != OC_REST_PRESENCE)
192 qos = DetermineObserverQoS(method, resourceObserver, qos);
193 result = SendObserveNotification(resourceObserver, qos);
198 OCEntityHandlerResponse ehResponse = {0};
200 //This is effectively the implementation for the presence entity handler.
201 OIC_LOG(DEBUG, TAG, "This notification is for Presence");
202 result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET,
203 0, resPtr->sequenceNum, qos, resourceObserver->query,
205 resourceObserver->token, resourceObserver->tokenLength,
206 resourceObserver->resUri, 0, resourceObserver->acceptFormat,
207 &resourceObserver->devAddr);
209 if (result == OC_STACK_OK)
211 OCPresencePayload* presenceResBuf = OCPresencePayloadCreate(
212 resPtr->sequenceNum, maxAge, trigger,
213 resourceType ? resourceType->resourcetypename : NULL);
217 return OC_STACK_NO_MEMORY;
220 if (result == OC_STACK_OK)
222 ehResponse.ehResult = OC_EH_OK;
223 ehResponse.payload = (OCPayload*)presenceResBuf;
224 ehResponse.persistentBufferFlag = 0;
225 ehResponse.requestHandle = (OCRequestHandle) request->requestId;
226 ehResponse.resourceHandle = (OCResourceHandle) resPtr;
227 OICStrcpy(ehResponse.resourceUri, sizeof(ehResponse.resourceUri),
228 resourceObserver->resUri);
229 result = OCDoResponse(&ehResponse);
232 OCPresencePayloadDestroy(presenceResBuf);
237 // Since we are in a loop, set an error flag to indicate at least one error occurred.
238 if (result != OC_STACK_OK)
240 observeErrorFlag = true;
243 resourceObserver = resourceObserver->next;
248 OIC_LOG(INFO, TAG, "Resource has no observers");
249 result = OC_STACK_NO_OBSERVERS;
251 else if (observeErrorFlag)
253 OIC_LOG(ERROR, TAG, "Observer notification error");
254 result = OC_STACK_ERROR;
259 OCStackResult SendListObserverNotification (OCResource * resource,
260 OCObservationId *obsIdList, uint8_t numberOfIds,
261 const OCRepPayload *payload,
263 OCQualityOfService qos)
266 if (!resource || !obsIdList || !payload)
268 return OC_STACK_INVALID_PARAM;
271 uint8_t numIds = numberOfIds;
272 ResourceObserver *observer = NULL;
273 uint8_t numSentNotification = 0;
274 OCServerRequest * request = NULL;
275 OCStackResult result = OC_STACK_ERROR;
276 bool observeErrorFlag = false;
278 OIC_LOG(INFO, TAG, "Entering SendListObserverNotification");
281 observer = GetObserverUsingId (*obsIdList);
284 // Found observer - verify if it matches the resource handle
285 if (observer->resource == resource)
287 qos = DetermineObserverQoS(OC_REST_GET, observer, qos);
290 result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET,
291 0, resource->sequenceNum, qos, observer->query,
292 NULL, NULL, observer->token, observer->tokenLength,
293 observer->resUri, 0, observer->acceptFormat,
298 request->observeResult = OC_STACK_OK;
299 if (result == OC_STACK_OK)
301 OCEntityHandlerResponse ehResponse = {0};
302 ehResponse.ehResult = OC_EH_OK;
303 ehResponse.payload = (OCPayload*)OCRepPayloadCreate();
304 if (!ehResponse.payload)
306 FindAndDeleteServerRequest(request);
309 memcpy(ehResponse.payload, payload, sizeof(*payload));
310 ehResponse.persistentBufferFlag = 0;
311 ehResponse.requestHandle = (OCRequestHandle) request->requestId;
312 ehResponse.resourceHandle = (OCResourceHandle) resource;
313 result = OCDoResponse(&ehResponse);
314 if (result == OC_STACK_OK)
316 OIC_LOG_V(INFO, TAG, "Observer id %d notified.", *obsIdList);
318 // Increment only if OCDoResponse is successful
319 numSentNotification++;
321 OICFree(ehResponse.payload);
322 FindAndDeleteServerRequest(request);
326 OIC_LOG_V(INFO, TAG, "Error notifying observer id %d.", *obsIdList);
328 // Reset Observer TTL.
330 GetTicks(MAX_OBSERVER_TTL_SECONDS * MILLISECONDS_PER_SECOND);
334 FindAndDeleteServerRequest(request);
337 // Since we are in a loop, set an error flag to indicate
338 // at least one error occurred.
339 if (result != OC_STACK_OK)
341 observeErrorFlag = true;
349 if (numSentNotification == numberOfIds && !observeErrorFlag)
353 else if (numSentNotification == 0)
355 return OC_STACK_NO_OBSERVERS;
359 OIC_LOG(ERROR, TAG, "Observer notification error");
360 return OC_STACK_ERROR;
364 OCStackResult GenerateObserverId (OCObservationId *observationId)
366 ResourceObserver *resObs = NULL;
368 OIC_LOG(INFO, TAG, "Entering GenerateObserverId");
369 VERIFY_NON_NULL (observationId);
375 *observationId = OCGetRandomByte();
376 } while (0 == *observationId); //Make sure *observationId is not 0
377 // Check if observation Id already exists
378 resObs = GetObserverUsingId (*observationId);
379 } while (NULL != resObs);
381 OIC_LOG_V(INFO, TAG, "GeneratedObservation ID is %u", *observationId);
385 return OC_STACK_ERROR;
388 OCStackResult AddObserver (const char *resUri,
390 OCObservationId obsId,
393 OCResource *resHandle,
394 OCQualityOfService qos,
395 OCPayloadFormat acceptFormat,
396 const OCDevAddr *devAddr)
398 // Check if resource exists and is observable.
401 return OC_STACK_INVALID_PARAM;
403 if (!(resHandle->resourceProperties & OC_OBSERVABLE))
405 return OC_STACK_RESOURCE_ERROR;
408 if (!resUri || !token)
410 return OC_STACK_INVALID_PARAM;
413 ResourceObserver *obsNode = (ResourceObserver *) OICCalloc(1, sizeof(ResourceObserver));
416 obsNode->observeId = obsId;
418 obsNode->resUri = OICStrdup(resUri);
419 VERIFY_NON_NULL (obsNode->resUri);
422 obsNode->acceptFormat = acceptFormat;
425 obsNode->query = OICStrdup(query);
426 VERIFY_NON_NULL (obsNode->query);
428 // If tokenLength is zero, the return value depends on the
429 // particular library implementation (it may or may not be a null pointer).
432 obsNode->token = (CAToken_t)OICMalloc(tokenLength);
433 VERIFY_NON_NULL (obsNode->token);
434 memcpy(obsNode->token, token, tokenLength);
436 obsNode->tokenLength = tokenLength;
438 obsNode->devAddr = *devAddr;
439 obsNode->resource = resHandle;
442 if ((strcmp(resUri, OC_RSRVD_PRESENCE_URI) == 0))
449 obsNode->TTL = GetTicks(MAX_OBSERVER_TTL_SECONDS * MILLISECONDS_PER_SECOND);
452 LL_APPEND (g_serverObsList, obsNode);
460 OICFree(obsNode->resUri);
461 OICFree(obsNode->query);
464 return OC_STACK_NO_MEMORY;
468 * This function checks if the node is past its time to live and
469 * deletes it if timed-out. Calling this function with a presence callback
470 * with ttl set to 0 will not delete anything as presence nodes have
471 * their own mechanisms for timeouts. A null argument will cause the function to
474 static void CheckTimedOutObserver(ResourceObserver* observer)
476 if (!observer || observer->TTL == 0)
484 if (observer->TTL < now)
486 // Send confirmable notification message to observer.
487 OIC_LOG(INFO, TAG, "Sending High-QoS notification to observer");
488 SendObserveNotification(observer, OC_HIGH_QOS);
492 ResourceObserver* GetObserverUsingId (const OCObservationId observeId)
494 ResourceObserver *out = NULL;
498 LL_FOREACH (g_serverObsList, out)
500 if (out->observeId == observeId)
504 CheckTimedOutObserver(out);
507 OIC_LOG(INFO, TAG, "Observer node not found!!");
511 ResourceObserver* GetObserverUsingToken (const CAToken_t token, uint8_t tokenLength)
515 OIC_LOG(INFO, TAG, "Looking for token");
516 OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)token, tokenLength);
518 ResourceObserver *out = NULL;
519 LL_FOREACH (g_serverObsList, out)
521 /* de-annotate below line if want to see all token in cbList */
522 //OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)out->token, tokenLength);
523 if ((memcmp(out->token, token, tokenLength) == 0))
525 OIC_LOG(INFO, TAG, "Found in observer list");
528 CheckTimedOutObserver(out);
533 OIC_LOG(ERROR, TAG, "Passed in NULL token");
536 OIC_LOG(INFO, TAG, "Observer node not found!!");
540 OCStackResult DeleteObserverUsingToken (CAToken_t token, uint8_t tokenLength)
544 return OC_STACK_INVALID_PARAM;
547 ResourceObserver *obsNode = GetObserverUsingToken (token, tokenLength);
550 OIC_LOG_V(INFO, TAG, "deleting observer id %u with token", obsNode->observeId);
551 OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)obsNode->token, tokenLength);
552 LL_DELETE (g_serverObsList, obsNode);
553 OICFree(obsNode->resUri);
554 OICFree(obsNode->query);
555 OICFree(obsNode->token);
558 // it is ok if we did not find the observer...
562 OCStackResult DeleteObserverUsingDevAddr(const OCDevAddr *devAddr)
566 return OC_STACK_INVALID_PARAM;
569 ResourceObserver *out = NULL;
570 ResourceObserver *tmp = NULL;
571 LL_FOREACH_SAFE(g_serverObsList, out, tmp)
575 if ((strcmp(out->devAddr.addr, devAddr->addr) == 0)
576 && out->devAddr.port == devAddr->port)
578 OIC_LOG_V(INFO, TAG, "deleting observer id %u with %s:%u",
579 out->observeId, out->devAddr.addr, out->devAddr.port);
580 OCStackFeedBack(out->token, out->tokenLength, OC_OBSERVER_NOT_INTERESTED);
588 void DeleteObserverList()
590 ResourceObserver *out = NULL;
591 ResourceObserver *tmp = NULL;
592 LL_FOREACH_SAFE (g_serverObsList, out, tmp)
596 DeleteObserverUsingToken ((out->token), out->tokenLength);
599 g_serverObsList = NULL;
603 * CA layer expects observe registration/de-reg/notiifcations to be passed as a header
604 * option, which breaks the protocol abstraction requirement between RI & CA, and
605 * has to be fixed in the future. The function below adds the header option for observe.
606 * It should be noted that the observe header option is assumed to be the first option
607 * in the list of user defined header options and hence it is inserted at the front
608 * of the header options list and number of options adjusted accordingly.
611 CreateObserveHeaderOption (CAHeaderOption_t **caHdrOpt,
612 OCHeaderOption *ocHdrOpt,
618 return OC_STACK_INVALID_PARAM;
621 if (numOptions > 0 && !ocHdrOpt)
623 OIC_LOG (INFO, TAG, "options are NULL though number is non zero");
624 return OC_STACK_INVALID_PARAM;
627 CAHeaderOption_t *tmpHdrOpt = NULL;
629 tmpHdrOpt = (CAHeaderOption_t *) OICCalloc ((numOptions+1), sizeof(CAHeaderOption_t));
630 if (NULL == tmpHdrOpt)
632 return OC_STACK_NO_MEMORY;
634 tmpHdrOpt[0].protocolID = CA_COAP_ID;
635 tmpHdrOpt[0].optionID = COAP_OPTION_OBSERVE;
636 tmpHdrOpt[0].optionLength = sizeof(uint8_t);
637 tmpHdrOpt[0].optionData[0] = observeFlag;
638 for (uint8_t i = 0; i < numOptions; i++)
640 memcpy (&(tmpHdrOpt[i+1]), &(ocHdrOpt[i]), sizeof(CAHeaderOption_t));
643 *caHdrOpt = tmpHdrOpt;
648 * CA layer passes observe information to the RI layer as a header option, which
649 * breaks the protocol abstraction requirement between RI & CA, and has to be fixed
650 * in the future. The function below removes the observe header option and processes it.
651 * It should be noted that the observe header option is always assumed to be the first
652 * option in the list of user defined header options and hence it is deleted from the
653 * front of the header options list and the number of options is adjusted accordingly.
656 GetObserveHeaderOption (uint32_t * observationOption,
657 CAHeaderOption_t *options,
658 uint8_t * numOptions)
660 if (!observationOption)
662 return OC_STACK_INVALID_PARAM;
665 if (!options || !numOptions)
667 OIC_LOG (INFO, TAG, "No options present");
671 for(uint8_t i = 0; i < *numOptions; i++)
673 if (options[i].protocolID == CA_COAP_ID &&
674 options[i].optionID == COAP_OPTION_OBSERVE)
676 *observationOption = options[i].optionData[0];
677 for(uint8_t c = i; c < *numOptions-1; c++)
679 options[i] = options[i+1];