1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
23 #include "ocstackconfig.h"
24 #include "ocstackinternal.h"
25 #include "ocobserve.h"
26 #include "ocresourcehandler.h"
29 #include "ocserverrequest.h"
35 #define MOD_NAME PCF("ocobserve")
37 #define TAG PCF("OCStackObserve")
39 #define VERIFY_NON_NULL(arg) { if (!arg) {OC_LOG(FATAL, TAG, #arg " is NULL"); goto exit;} }
41 static struct ResourceObserver * serverObsList = NULL;
43 // send notifications based on the qos of the request
44 // The qos passed as a parameter overrides what the client requested
45 // If we want the client preference taking high priority make:
46 // qos = resourceObserver->qos;
47 OCQualityOfService DetermineObserverQoS(OCMethod method, ResourceObserver * resourceObserver,
48 OCQualityOfService appQoS)
52 OC_LOG(ERROR, TAG, "DetermineObserverQoS called with invalid resourceObserver");
56 OCQualityOfService decidedQoS = appQoS;
57 if(appQoS == OC_NA_QOS)
59 decidedQoS = resourceObserver->qos;
62 if(appQoS != OC_HIGH_QOS)
64 OC_LOG_V(INFO, TAG, "Current NON count for this observer is %d",
65 resourceObserver->lowQosCount);
67 if((resourceObserver->forceHighQos \
68 || resourceObserver->lowQosCount >= MAX_OBSERVER_NON_COUNT) \
69 && method != OC_REST_PRESENCE)
71 if(resourceObserver->forceHighQos \
72 || resourceObserver->lowQosCount >= MAX_OBSERVER_NON_COUNT)
75 resourceObserver->lowQosCount = 0;
76 // at some point we have to to send CON to check on the
77 // availability of observer
78 OC_LOG(INFO, TAG, PCF("This time we are sending the notification as High qos"));
79 decidedQoS = OC_HIGH_QOS;
83 (resourceObserver->lowQosCount)++;
90 OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, uint32_t maxAge,
91 OCResourceType *resourceType, OCQualityOfService qos)
93 OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, uint32_t maxAge,
94 OCQualityOfService qos)
97 OC_LOG(INFO, TAG, PCF("Entering SendObserverNotification"));
100 return OC_STACK_INVALID_PARAM;
103 OCStackResult result = OC_STACK_ERROR;
104 ResourceObserver * resourceObserver = serverObsList;
106 OCServerRequest * request = NULL;
107 OCEntityHandlerRequest ehRequest = {};
108 OCEntityHandlerResult ehResult = OC_EH_ERROR;
110 // Find clients that are observing this resource
111 while (resourceObserver)
113 if (resourceObserver->resource == resPtr)
117 if(method != OC_REST_PRESENCE)
120 qos = DetermineObserverQoS(method, resourceObserver, qos);
122 result = AddServerRequest(&request, 0, 0, 0, 1, OC_REST_GET,
123 0, resPtr->sequenceNum, qos, resourceObserver->query,
125 &resourceObserver->token, resourceObserver->tokenLength,
126 resourceObserver->resUri, 0,
127 &(resourceObserver->addressInfo), resourceObserver->connectivityType);
131 request->observeResult = OC_STACK_OK;
132 if(result == OC_STACK_OK)
134 result = FormOCEntityHandlerRequest(&ehRequest, (OCRequestHandle) request,
135 request->method, (OCResourceHandle) resPtr, request->query,
136 request->reqJSONPayload,
137 request->numRcvdVendorSpecificHeaderOptions,
138 request->rcvdVendorSpecificHeaderOptions,
139 OC_OBSERVE_NO_OPTION, 0);
140 if(result == OC_STACK_OK)
142 ehResult = resPtr->entityHandler(OC_REQUEST_FLAG, &ehRequest);
143 if(ehResult == OC_EH_ERROR)
145 FindAndDeleteServerRequest(request);
154 OCEntityHandlerResponse ehResponse = {};
155 char presenceResBuf[MAX_RESPONSE_LENGTH] = {};
156 //This is effectively the implementation for the presence entity handler.
157 OC_LOG(DEBUG, TAG, PCF("This notification is for Presence"));
158 result = AddServerRequest(&request, 0, 0, 0, 1, OC_REST_GET,
159 0, resPtr->sequenceNum, qos, resourceObserver->query,
161 &resourceObserver->token, resourceObserver->tokenLength,
162 resourceObserver->resUri, 0,
163 &(resourceObserver->addressInfo), resourceObserver->connectivityType);
165 if(result == OC_STACK_OK)
167 // we create the payload here
168 if(resourceType && resourceType->resourcetypename)
170 snprintf((char *)presenceResBuf, sizeof(presenceResBuf), "%u:%u:%s",
171 resPtr->sequenceNum, maxAge, resourceType->resourcetypename);
175 snprintf((char *)presenceResBuf, sizeof(presenceResBuf), "%u:%u",
176 resPtr->sequenceNum, maxAge);
178 ehResponse.ehResult = OC_EH_OK;
179 ehResponse.payload = presenceResBuf;
180 ehResponse.payloadSize = strlen((const char *)presenceResBuf) + 1;
181 ehResponse.persistentBufferFlag = 0;
182 ehResponse.requestHandle = (OCRequestHandle) request;
183 ehResponse.resourceHandle = (OCResourceHandle) resPtr;
184 strcpy((char *)ehResponse.resourceUri, (const char *)resourceObserver->resUri);
185 result = OCDoResponse(&ehResponse);
190 resourceObserver = resourceObserver->next;
194 OC_LOG(INFO, TAG, PCF("Resource has no observers"));
195 result = OC_STACK_NO_OBSERVERS;
200 OCStackResult SendListObserverNotification (OCResource * resource,
201 OCObservationId *obsIdList, uint8_t numberOfIds,
202 const char *notificationJSONPayload, uint32_t maxAge,
203 OCQualityOfService qos)
205 if(!resource || !obsIdList || !notificationJSONPayload)
207 return OC_STACK_INVALID_PARAM;
210 uint8_t numIds = numberOfIds;
211 ResourceObserver *observation = NULL;
212 uint8_t numSentNotification = 0;
213 OCServerRequest * request = NULL;
214 OCStackResult result = OC_STACK_ERROR;
216 OC_LOG(INFO, TAG, PCF("Entering SendListObserverNotification"));
219 OC_LOG_V(INFO, TAG, "Need to notify observation id %d", *obsIdList);
220 observation = GetObserverUsingId (*obsIdList);
223 // Found observation - verify if it matches the resource handle
224 if (observation->resource == resource)
226 qos = DetermineObserverQoS(OC_REST_GET, observation, qos);
229 result = AddServerRequest(&request, 0, 0, 0, 1, OC_REST_GET,
230 0, resource->sequenceNum, qos, observation->query,
231 NULL, NULL, &observation->token, observation->tokenLength,
232 observation->resUri, 0,
233 &(observation->addressInfo), observation->connectivityType);
237 request->observeResult = OC_STACK_OK;
238 if(result == OC_STACK_OK)
240 OCEntityHandlerResponse ehResponse = {};
241 ehResponse.ehResult = OC_EH_OK;
242 ehResponse.payload = (char *) OCMalloc(MAX_RESPONSE_LENGTH);
243 if(!ehResponse.payload)
245 FindAndDeleteServerRequest(request);
248 strncpy(ehResponse.payload, notificationJSONPayload, MAX_RESPONSE_LENGTH-1);
249 ehResponse.payloadSize = strlen(ehResponse.payload) + 1;
250 ehResponse.persistentBufferFlag = 0;
251 ehResponse.requestHandle = (OCRequestHandle) request;
252 ehResponse.resourceHandle = (OCResourceHandle) resource;
253 result = OCDoResponse(&ehResponse);
254 if(result == OC_STACK_OK)
256 OCFree(ehResponse.payload);
257 FindAndDeleteServerRequest(request);
262 FindAndDeleteServerRequest(request);
266 numSentNotification++;
272 if(numSentNotification == numberOfIds)
276 else if(numSentNotification == 0)
278 return OC_STACK_NO_OBSERVERS;
282 //TODO: we need to signal that not every one in the
283 // list got an update, should we also indicate who did not receive on?
288 OCStackResult GenerateObserverId (OCObservationId *observationId)
290 ResourceObserver *resObs = NULL;
292 OC_LOG(INFO, TAG, PCF("Entering GenerateObserverId"));
293 VERIFY_NON_NULL (observationId);
297 *observationId = OCGetRandomByte();
298 // Check if observation Id already exists
299 resObs = GetObserverUsingId (*observationId);
300 } while (NULL != resObs);
302 OC_LOG_V(INFO, TAG, "Observation ID is %u", *observationId);
306 return OC_STACK_ERROR;
309 OCStackResult AddObserver (const char *resUri,
311 OCObservationId obsId,
314 OCResource *resHandle,
315 OCQualityOfService qos,
316 const CAAddress_t *addressInfo,
317 CAConnectivityType_t connectivityType)
319 // Check if resource exists and is observable.
322 return OC_STACK_INVALID_PARAM;
324 if (!(resHandle->resourceProperties & OC_OBSERVABLE))
326 return OC_STACK_RESOURCE_ERROR;
328 ResourceObserver *obsNode = NULL;
330 if(!resUri || !token || !*token)
332 return OC_STACK_INVALID_PARAM;
335 obsNode = (ResourceObserver *) OCCalloc(1, sizeof(ResourceObserver));
338 obsNode->observeId = obsId;
340 obsNode->resUri = (char *)OCMalloc(strlen(resUri)+1);
341 VERIFY_NON_NULL (obsNode->resUri);
342 memcpy (obsNode->resUri, resUri, strlen(resUri)+1);
347 obsNode->query = (char *)OCMalloc(strlen(query)+1);
348 VERIFY_NON_NULL (obsNode->query);
349 memcpy (obsNode->query, query, strlen(query)+1);
351 // If tokenLength is zero, the return value depends on the
352 // particular library implementation (it may or may not be a null pointer).
355 obsNode->token = (CAToken_t)OCMalloc(tokenLength);
356 VERIFY_NON_NULL (obsNode->token);
357 memcpy(obsNode->token, *token, tokenLength);
359 obsNode->tokenLength = tokenLength;
360 obsNode->addressInfo = *addressInfo;
361 obsNode->connectivityType = connectivityType;
362 obsNode->resource = resHandle;
363 LL_APPEND (serverObsList, obsNode);
370 OCFree(obsNode->resUri);
371 OCFree(obsNode->query);
374 return OC_STACK_NO_MEMORY;
377 ResourceObserver* GetObserverUsingId (const OCObservationId observeId)
379 ResourceObserver *out = NULL;
383 LL_FOREACH (serverObsList, out)
385 if (out->observeId == observeId)
391 OC_LOG(INFO, TAG, PCF("Observer node not found!!"));
395 ResourceObserver* GetObserverUsingToken (const CAToken_t * token, uint8_t tokenLength)
397 ResourceObserver *out = NULL;
401 LL_FOREACH (serverObsList, out)
403 OC_LOG(INFO, TAG,PCF("comparing tokens"));
404 OC_LOG_BUFFER(INFO, TAG, (const uint8_t *)token, tokenLength);
405 OC_LOG_BUFFER(INFO, TAG, (const uint8_t *)out->token, tokenLength);
406 if((memcmp(out->token, *token, tokenLength) == 0))
412 OC_LOG(INFO, TAG, PCF("Observer node not found!!"));
416 OCStackResult DeleteObserverUsingToken (CAToken_t * token, uint8_t tokenLength)
418 if(!token || !*token)
420 return OC_STACK_INVALID_PARAM;
423 ResourceObserver *obsNode = NULL;
425 obsNode = GetObserverUsingToken (token, tokenLength);
428 OC_LOG_V(INFO, TAG, PCF("deleting tokens"));
429 OC_LOG_BUFFER(INFO, TAG, (const uint8_t *)obsNode->token, tokenLength);
430 LL_DELETE (serverObsList, obsNode);
431 OCFree(obsNode->resUri);
432 OCFree(obsNode->query);
433 OCFree(obsNode->token);
436 // it is ok if we did not find the observer...
440 void DeleteObserverList()
442 ResourceObserver *out = NULL;
443 ResourceObserver *tmp = NULL;
444 LL_FOREACH_SAFE (serverObsList, out, tmp)
448 DeleteObserverUsingToken (&(out->token), out->tokenLength);
451 serverObsList = NULL;
455 * CA layer expects observe registration/de-reg/notiifcations to be passed as a header
456 * option, which breaks the protocol abstraction requirement between RI & CA, and
457 * has to be fixed in the future. The function below adds the header option for observe.
458 * It should be noted that the observe header option is assumed to be the first option
459 * in the list of user defined header options and hence it is inserted at the front
460 * of the header options list and number of options adjusted accordingly.
463 CreateObserveHeaderOption (CAHeaderOption_t **caHdrOpt,
464 OCHeaderOption *ocHdrOpt,
470 return OC_STACK_INVALID_PARAM;
473 CAHeaderOption_t *tmpHdrOpt = NULL;
475 tmpHdrOpt = (CAHeaderOption_t *) OCMalloc ((numOptions+1)*sizeof(CAHeaderOption_t));
476 if (NULL == tmpHdrOpt)
478 return OC_STACK_NO_MEMORY;
480 tmpHdrOpt[0].protocolID = CA_COAP_ID;
481 tmpHdrOpt[0].optionID = COAP_OPTION_OBSERVE;
482 tmpHdrOpt[0].optionLength = sizeof(uint32_t);
483 tmpHdrOpt[0].optionData[0] = observeFlag;
484 for (uint8_t i = 0; i < numOptions; i++)
486 memcpy (&(tmpHdrOpt[i+1]), &(ocHdrOpt[i]), sizeof(CAHeaderOption_t));
489 *caHdrOpt = tmpHdrOpt;
494 * CA layer passes observe information to the RI layer as a header option, which
495 * breaks the protocol abstraction requirement between RI & CA, and has to be fixed
496 * in the future. The function below removes the observe header option and processes it.
497 * It should be noted that the observe header option is always assumed to be the first
498 * option in the list of user defined header options and hence it is deleted from the
499 * front of the header options list and the number of options is adjusted accordingly.
502 GetObserveHeaderOption (uint32_t * observationOption,
503 CAHeaderOption_t *options,
504 uint8_t * numOptions)
506 if(!observationOption)
508 return OC_STACK_INVALID_PARAM;
510 *observationOption = OC_OBSERVE_NO_OPTION;
512 if(!options || !numOptions)
514 return OC_STACK_INVALID_PARAM;
517 for(uint8_t i = 0; i < *numOptions; i++)
519 if(options[i].protocolID == CA_COAP_ID &&
520 options[i].optionID == COAP_OPTION_OBSERVE)
522 *observationOption = options[i].optionData[0];
523 for(uint8_t c = i; c < *numOptions-1; c++)
525 options[i].protocolID = options[i+1].protocolID;
526 options[i].optionID = options[i+1].optionID;
527 options[i].optionLength = options[i+1].optionLength;
528 memcpy(options[i].optionData, options[i+1].optionData, options[i+1].optionLength);