1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
23 #include "ocstackconfig.h"
24 #include "ocstackinternal.h"
25 #include "ocobserve.h"
26 #include "ocresourcehandler.h"
29 #include "ocserverrequest.h"
35 #define MOD_NAME PCF("ocobserve")
37 #define TAG PCF("OCStackObserve")
39 #define VERIFY_NON_NULL(arg) { if (!arg) {OC_LOG(FATAL, TAG, #arg " is NULL"); goto exit;} }
41 static struct ResourceObserver * serverObsList = NULL;
44 * Determine observe QOS based on the QOS of the request.
45 * The qos passed as a parameter overrides what the client requested.
46 * If we want the client preference taking high priority make:
47 * qos = resourceObserver->qos;
49 * @param method RESTful method.
50 * @param resourceObserver Observer.
51 * @param appQoS Quality of service.
52 * @return The quality of service of the observer.
54 static OCQualityOfService DetermineObserverQoS(OCMethod method,
55 ResourceObserver * resourceObserver, OCQualityOfService appQoS)
59 OC_LOG(ERROR, TAG, "DetermineObserverQoS called with invalid resourceObserver");
63 OCQualityOfService decidedQoS = appQoS;
64 if(appQoS == OC_NA_QOS)
66 decidedQoS = resourceObserver->qos;
69 if(appQoS != OC_HIGH_QOS)
71 OC_LOG_V(INFO, TAG, "Current NON count for this observer is %d",
72 resourceObserver->lowQosCount);
74 if((resourceObserver->forceHighQos \
75 || resourceObserver->lowQosCount >= MAX_OBSERVER_NON_COUNT) \
76 && method != OC_REST_PRESENCE)
78 if(resourceObserver->forceHighQos \
79 || resourceObserver->lowQosCount >= MAX_OBSERVER_NON_COUNT)
82 resourceObserver->lowQosCount = 0;
83 // at some point we have to to send CON to check on the
84 // availability of observer
85 OC_LOG(INFO, TAG, PCF("This time we are sending the notification as High qos"));
86 decidedQoS = OC_HIGH_QOS;
90 (resourceObserver->lowQosCount)++;
97 OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, uint32_t maxAge,
98 OCResourceType *resourceType, OCQualityOfService qos)
100 OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, uint32_t maxAge,
101 OCQualityOfService qos)
104 OC_LOG(INFO, TAG, PCF("Entering SendObserverNotification"));
107 return OC_STACK_INVALID_PARAM;
110 OCStackResult result = OC_STACK_ERROR;
111 ResourceObserver * resourceObserver = serverObsList;
113 OCServerRequest * request = NULL;
114 OCEntityHandlerRequest ehRequest = {};
115 OCEntityHandlerResult ehResult = OC_EH_ERROR;
116 bool observeErrorFlag = false;
118 // Find clients that are observing this resource
119 while (resourceObserver)
121 if (resourceObserver->resource == resPtr)
125 if(method != OC_REST_PRESENCE)
128 qos = DetermineObserverQoS(method, resourceObserver, qos);
130 result = AddServerRequest(&request, 0, 0, 0, 1, OC_REST_GET,
131 0, resPtr->sequenceNum, qos, resourceObserver->query,
133 resourceObserver->token, resourceObserver->tokenLength,
134 resourceObserver->resUri, 0,
135 &(resourceObserver->addressInfo), resourceObserver->connectivityType);
139 request->observeResult = OC_STACK_OK;
140 if(result == OC_STACK_OK)
142 result = FormOCEntityHandlerRequest(&ehRequest, (OCRequestHandle) request,
143 request->method, (OCResourceHandle) resPtr, request->query,
144 request->reqJSONPayload,
145 request->numRcvdVendorSpecificHeaderOptions,
146 request->rcvdVendorSpecificHeaderOptions,
147 OC_OBSERVE_NO_OPTION, 0);
148 if(result == OC_STACK_OK)
150 ehResult = resPtr->entityHandler(OC_REQUEST_FLAG, &ehRequest);
151 if(ehResult == OC_EH_ERROR)
153 FindAndDeleteServerRequest(request);
162 OCEntityHandlerResponse ehResponse = {};
163 char presenceResBuf[MAX_RESPONSE_LENGTH] = {};
165 //This is effectively the implementation for the presence entity handler.
166 OC_LOG(DEBUG, TAG, PCF("This notification is for Presence"));
168 result = AddServerRequest(&request, 0, 0, 0, 1, OC_REST_GET,
169 0, resPtr->sequenceNum, qos, resourceObserver->query,
171 resourceObserver->token, resourceObserver->tokenLength,
172 resourceObserver->resUri, 0,
173 &(resourceObserver->addressInfo), resourceObserver->connectivityType);
175 if(result == OC_STACK_OK)
177 // we create the payload here
178 if(resourceType && resourceType->resourcetypename)
180 snprintf((char *)presenceResBuf, sizeof(presenceResBuf), "%u:%u:%s",
181 resPtr->sequenceNum, maxAge, resourceType->resourcetypename);
185 snprintf((char *)presenceResBuf, sizeof(presenceResBuf), "%u:%u",
186 resPtr->sequenceNum, maxAge);
188 ehResponse.ehResult = OC_EH_OK;
189 ehResponse.payload = presenceResBuf;
190 ehResponse.payloadSize = strlen((const char *)presenceResBuf) + 1;
191 ehResponse.persistentBufferFlag = 0;
192 ehResponse.requestHandle = (OCRequestHandle) request;
193 ehResponse.resourceHandle = (OCResourceHandle) resPtr;
194 strcpy((char *)ehResponse.resourceUri, (const char *)resourceObserver->resUri);
195 result = OCDoResponse(&ehResponse);
200 // Since we are in a loop, set an error flag to indicate at least one error occurred.
201 if (result != OC_STACK_OK)
203 observeErrorFlag = true;
206 resourceObserver = resourceObserver->next;
211 OC_LOG(INFO, TAG, PCF("Resource has no observers"));
212 result = OC_STACK_NO_OBSERVERS;
214 else if (observeErrorFlag)
216 OC_LOG(ERROR, TAG, PCF("Observer notification error"));
217 result = OC_STACK_ERROR;
222 OCStackResult SendListObserverNotification (OCResource * resource,
223 OCObservationId *obsIdList, uint8_t numberOfIds,
224 const char *notificationJSONPayload, uint32_t maxAge,
225 OCQualityOfService qos)
227 if(!resource || !obsIdList || !notificationJSONPayload)
229 return OC_STACK_INVALID_PARAM;
232 uint8_t numIds = numberOfIds;
233 ResourceObserver *observer = NULL;
234 uint8_t numSentNotification = 0;
235 OCServerRequest * request = NULL;
236 OCStackResult result = OC_STACK_ERROR;
237 bool observeErrorFlag = false;
239 OC_LOG(INFO, TAG, PCF("Entering SendListObserverNotification"));
242 observer = GetObserverUsingId (*obsIdList);
245 // Found observer - verify if it matches the resource handle
246 if (observer->resource == resource)
248 qos = DetermineObserverQoS(OC_REST_GET, observer, qos);
251 result = AddServerRequest(&request, 0, 0, 0, 1, OC_REST_GET,
252 0, resource->sequenceNum, qos, observer->query,
253 NULL, NULL, observer->token, observer->tokenLength,
255 &(observer->addressInfo), observer->connectivityType);
259 request->observeResult = OC_STACK_OK;
260 if(result == OC_STACK_OK)
262 OCEntityHandlerResponse ehResponse = {};
263 ehResponse.ehResult = OC_EH_OK;
264 ehResponse.payload = (char *) OCMalloc(MAX_RESPONSE_LENGTH + 1);
265 if(!ehResponse.payload)
267 FindAndDeleteServerRequest(request);
270 strncpy(ehResponse.payload, notificationJSONPayload, MAX_RESPONSE_LENGTH-1);
271 ehResponse.payload[MAX_RESPONSE_LENGTH] = '\0';
272 ehResponse.payloadSize = strlen(ehResponse.payload) + 1;
273 ehResponse.persistentBufferFlag = 0;
274 ehResponse.requestHandle = (OCRequestHandle) request;
275 ehResponse.resourceHandle = (OCResourceHandle) resource;
276 result = OCDoResponse(&ehResponse);
277 if(result == OC_STACK_OK)
279 OC_LOG_V(INFO, TAG, "Observer id %d notified.", *obsIdList);
281 // Increment only if OCDoResponse is successful
282 numSentNotification++;
284 OCFree(ehResponse.payload);
285 FindAndDeleteServerRequest(request);
289 OC_LOG_V(INFO, TAG, "Error notifying observer id %d.", *obsIdList);
294 FindAndDeleteServerRequest(request);
297 // Since we are in a loop, set an error flag to indicate
298 // at least one error occurred.
299 if (result != OC_STACK_OK)
301 observeErrorFlag = true;
309 if(numSentNotification == numberOfIds && !observeErrorFlag)
313 else if(numSentNotification == 0)
315 return OC_STACK_NO_OBSERVERS;
319 OC_LOG(ERROR, TAG, PCF("Observer notification error"));
320 return OC_STACK_ERROR;
324 OCStackResult GenerateObserverId (OCObservationId *observationId)
326 ResourceObserver *resObs = NULL;
328 OC_LOG(INFO, TAG, PCF("Entering GenerateObserverId"));
329 VERIFY_NON_NULL (observationId);
333 *observationId = OCGetRandomByte();
334 // Check if observation Id already exists
335 resObs = GetObserverUsingId (*observationId);
336 } while (NULL != resObs);
338 OC_LOG_V(INFO, TAG, "Generated bservation ID is %u", *observationId);
342 return OC_STACK_ERROR;
345 OCStackResult AddObserver (const char *resUri,
347 OCObservationId obsId,
350 OCResource *resHandle,
351 OCQualityOfService qos,
352 const CAAddress_t *addressInfo,
353 CATransportType_t connectivityType)
355 // Check if resource exists and is observable.
358 return OC_STACK_INVALID_PARAM;
360 if (!(resHandle->resourceProperties & OC_OBSERVABLE))
362 return OC_STACK_RESOURCE_ERROR;
364 ResourceObserver *obsNode = NULL;
366 if(!resUri || !token || !*token)
368 return OC_STACK_INVALID_PARAM;
371 obsNode = (ResourceObserver *) OCCalloc(1, sizeof(ResourceObserver));
374 obsNode->observeId = obsId;
376 obsNode->resUri = (char *)OCMalloc(strlen(resUri)+1);
377 VERIFY_NON_NULL (obsNode->resUri);
378 memcpy (obsNode->resUri, resUri, strlen(resUri)+1);
383 obsNode->query = (char *)OCMalloc(strlen(query)+1);
384 VERIFY_NON_NULL (obsNode->query);
385 memcpy (obsNode->query, query, strlen(query)+1);
387 // If tokenLength is zero, the return value depends on the
388 // particular library implementation (it may or may not be a null pointer).
391 obsNode->token = (CAToken_t)OCMalloc(tokenLength);
392 VERIFY_NON_NULL (obsNode->token);
393 memcpy(obsNode->token, token, tokenLength);
395 obsNode->tokenLength = tokenLength;
396 obsNode->addressInfo = *addressInfo;
397 obsNode->connectivityType = connectivityType;
398 obsNode->resource = resHandle;
399 LL_APPEND (serverObsList, obsNode);
406 OCFree(obsNode->resUri);
407 OCFree(obsNode->query);
410 return OC_STACK_NO_MEMORY;
413 ResourceObserver* GetObserverUsingId (const OCObservationId observeId)
415 ResourceObserver *out = NULL;
419 LL_FOREACH (serverObsList, out)
421 if (out->observeId == observeId)
427 OC_LOG(INFO, TAG, PCF("Observer node not found!!"));
431 ResourceObserver* GetObserverUsingToken (const CAToken_t token, uint8_t tokenLength)
433 ResourceObserver *out = NULL;
437 LL_FOREACH (serverObsList, out)
439 OC_LOG(INFO, TAG,PCF("comparing tokens"));
440 OC_LOG_BUFFER(INFO, TAG, (const uint8_t *)token, tokenLength);
441 OC_LOG_BUFFER(INFO, TAG, (const uint8_t *)out->token, tokenLength);
442 if((memcmp(out->token, token, tokenLength) == 0))
448 OC_LOG(INFO, TAG, PCF("Observer node not found!!"));
452 OCStackResult DeleteObserverUsingToken (CAToken_t token, uint8_t tokenLength)
454 if(!token || !*token)
456 return OC_STACK_INVALID_PARAM;
459 ResourceObserver *obsNode = NULL;
461 obsNode = GetObserverUsingToken (token, tokenLength);
464 OC_LOG_V(INFO, TAG, PCF("deleting tokens"));
465 OC_LOG_BUFFER(INFO, TAG, (const uint8_t *)obsNode->token, tokenLength);
466 LL_DELETE (serverObsList, obsNode);
467 OCFree(obsNode->resUri);
468 OCFree(obsNode->query);
469 OCFree(obsNode->token);
472 // it is ok if we did not find the observer...
476 void DeleteObserverList()
478 ResourceObserver *out = NULL;
479 ResourceObserver *tmp = NULL;
480 LL_FOREACH_SAFE (serverObsList, out, tmp)
484 DeleteObserverUsingToken ((out->token), out->tokenLength);
487 serverObsList = NULL;
491 * CA layer expects observe registration/de-reg/notiifcations to be passed as a header
492 * option, which breaks the protocol abstraction requirement between RI & CA, and
493 * has to be fixed in the future. The function below adds the header option for observe.
494 * It should be noted that the observe header option is assumed to be the first option
495 * in the list of user defined header options and hence it is inserted at the front
496 * of the header options list and number of options adjusted accordingly.
499 CreateObserveHeaderOption (CAHeaderOption_t **caHdrOpt,
500 OCHeaderOption *ocHdrOpt,
506 return OC_STACK_INVALID_PARAM;
509 CAHeaderOption_t *tmpHdrOpt = NULL;
511 tmpHdrOpt = (CAHeaderOption_t *) OCCalloc ((numOptions+1), sizeof(CAHeaderOption_t));
512 if (NULL == tmpHdrOpt)
514 return OC_STACK_NO_MEMORY;
516 tmpHdrOpt[0].protocolID = CA_COAP_ID;
517 tmpHdrOpt[0].optionID = COAP_OPTION_OBSERVE;
518 tmpHdrOpt[0].optionLength = sizeof(uint32_t);
519 tmpHdrOpt[0].optionData[0] = observeFlag;
520 for (uint8_t i = 0; i < numOptions; i++)
522 memcpy (&(tmpHdrOpt[i+1]), &(ocHdrOpt[i]), sizeof(CAHeaderOption_t));
525 *caHdrOpt = tmpHdrOpt;
530 * CA layer passes observe information to the RI layer as a header option, which
531 * breaks the protocol abstraction requirement between RI & CA, and has to be fixed
532 * in the future. The function below removes the observe header option and processes it.
533 * It should be noted that the observe header option is always assumed to be the first
534 * option in the list of user defined header options and hence it is deleted from the
535 * front of the header options list and the number of options is adjusted accordingly.
538 GetObserveHeaderOption (uint32_t * observationOption,
539 CAHeaderOption_t *options,
540 uint8_t * numOptions)
542 if(!observationOption)
544 return OC_STACK_INVALID_PARAM;
546 *observationOption = OC_OBSERVE_NO_OPTION;
548 if(!options || !numOptions)
550 return OC_STACK_INVALID_PARAM;
553 for(uint8_t i = 0; i < *numOptions; i++)
555 if(options[i].protocolID == CA_COAP_ID &&
556 options[i].optionID == COAP_OPTION_OBSERVE)
558 *observationOption = options[i].optionData[0];
559 for(uint8_t c = i; c < *numOptions-1; c++)
561 options[i].protocolID = options[i+1].protocolID;
562 options[i].optionID = options[i+1].optionID;
563 options[i].optionLength = options[i+1].optionLength;
564 memcpy(options[i].optionData, options[i+1].optionData, options[i+1].optionLength);