X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=resource%2Fcsdk%2Fstack%2Fsrc%2Focobserve.c;h=4fa079bf40582bb1c050eb3148d8b570e5f12f37;hb=e536bc9edf0ad1fea100e07755462dc0914304eb;hp=2051f7388f7d887539b284865092fd0355c41cbd;hpb=1f38dc188968757d7eec20816b7964b052fe5a32;p=platform%2Fupstream%2Fiotivity.git diff --git a/resource/csdk/stack/src/ocobserve.c b/resource/csdk/stack/src/ocobserve.c index 2051f73..4fa079b 100644 --- a/resource/csdk/stack/src/ocobserve.c +++ b/resource/csdk/stack/src/ocobserve.c @@ -29,11 +29,12 @@ #include "oic_string.h" #include "ocpayload.h" #include "ocserverrequest.h" +#include "octhread.h" #include "logger.h" -#include "utlist.h" -#include "pdu.h" - +#include +#include +#include // Module Name #define MOD_NAME "ocobserve" @@ -43,6 +44,85 @@ #define VERIFY_NON_NULL(arg) { if (!arg) {OIC_LOG(FATAL, TAG, #arg " is NULL"); goto exit;} } static struct ResourceObserver * g_serverObsList = NULL; +static oc_mutex g_serverObsListMutex = NULL; + +static ResourceObserver* GetObserverUsingIdAsOwner (const OCObservationId observeId); + +static ResourceObserver* CloneObserverNode (ResourceObserver* observer) +{ + ResourceObserver* dupObsNode = NULL; + if (observer) + { + dupObsNode = (ResourceObserver *) OICCalloc(1, sizeof(ResourceObserver)); + VERIFY_NON_NULL(dupObsNode); + memcpy(dupObsNode, observer, sizeof(ResourceObserver)); + + if (observer->resUri) + { + dupObsNode->resUri = OICStrdup(observer->resUri); + VERIFY_NON_NULL(dupObsNode->resUri); + } + + if (observer->query) + { + dupObsNode->query = OICStrdup(observer->query); + VERIFY_NON_NULL(dupObsNode->query); + } + + if (observer->token) + { + dupObsNode->token = (CAToken_t)OICMalloc(observer->tokenLength); + VERIFY_NON_NULL(dupObsNode->token); + memcpy(dupObsNode->token, observer->token, observer->tokenLength); + } + + dupObsNode->next = NULL; + } + + return dupObsNode; + +exit: + FreeObserver(dupObsNode); + return NULL; +} + +static void FreeObserverList (ResourceObserver* list) +{ + ResourceObserver* head = list; + ResourceObserver* del = NULL; + while (NULL != head) + { + del = head; + head = head->next; + + OICFree(del->resUri); + OICFree(del->query); + OICFree(del->token); + OICFree(del); + } +} + +static ResourceObserver* CloneObserverList (ResourceObserver* obsList) +{ + ResourceObserver* dupList = NULL; + ResourceObserver* out = NULL; + + LL_FOREACH(obsList, out) + { + ResourceObserver *obsNode = CloneObserverNode(out); + if (NULL == obsNode) + { + FreeObserverList(dupList); + dupList = NULL; + break; + } + + LL_APPEND(dupList, obsNode); + } + + return dupList; +} + /** * Determine observe QOS based on the QOS of the request. * The qos passed as a parameter overrides what the client requested. @@ -96,6 +176,69 @@ static OCQualityOfService DetermineObserverQoS(OCMethod method, return decidedQoS; } +/** + * Create a get request and pass to entityhandler to notify specific observer. + * + * @param observer Observer that need to be notified. + * @param qos Quality of service of resource. + * + * @return ::OC_STACK_OK on success, some other value upon failure. + */ +static OCStackResult SendObserveNotification(ResourceObserver *observer, + OCQualityOfService qos) +{ + OCStackResult result = OC_STACK_ERROR; + OCServerRequest * request = NULL; + OCEntityHandlerRequest ehRequest = {0}; + OCEntityHandlerResult ehResult = OC_EH_ERROR; + + result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET, + 0, observer->resource->sequenceNum, qos, + observer->query, NULL, NULL, + observer->token, observer->tokenLength, + observer->resUri, 0, observer->acceptFormat, + &observer->devAddr); + + if (request) + { + request->observeResult = OC_STACK_OK; + if (result == OC_STACK_OK) + { + result = FormOCEntityHandlerRequest( + &ehRequest, + (OCRequestHandle) request->requestId, + request->method, + &request->devAddr, + (OCResourceHandle) observer->resource, + request->query, + PAYLOAD_TYPE_REPRESENTATION, + request->payload, + request->payloadSize, + request->numRcvdVendorSpecificHeaderOptions, + request->rcvdVendorSpecificHeaderOptions, + OC_OBSERVE_NO_OPTION, + 0, + request->coapID); + if (result == OC_STACK_OK) + { + ehResult = observer->resource->entityHandler(OC_REQUEST_FLAG, &ehRequest, + observer->resource->entityHandlerCallbackParam); + + // Clear server request on error case + if (!OCResultToSuccess(EntityHandlerCodeToOCStackCode(ehResult))) + { + FindAndDeleteServerRequest(request); + } + // Reset Observer TTL. + observer->TTL = GetTicks(MAX_OBSERVER_TTL_SECONDS * MILLISECONDS_PER_SECOND); + } + OCPayloadDestroy(ehRequest.payload); + } + } + + return result; +} + #ifdef WITH_PRESENCE OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, uint32_t maxAge, OCPresenceTrigger trigger, OCResourceType *resourceType, OCQualityOfService qos) @@ -111,14 +254,14 @@ OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, } OCStackResult result = OC_STACK_ERROR; - ResourceObserver * resourceObserver = g_serverObsList; + ResourceObserver * resourceObserver = NULL; uint8_t numObs = 0; OCServerRequest * request = NULL; - OCEntityHandlerRequest ehRequest = {0}; - OCEntityHandlerResult ehResult = OC_EH_ERROR; bool observeErrorFlag = false; // Find clients that are observing this resource + oc_mutex_lock(g_serverObsListMutex); + resourceObserver = g_serverObsList; while (resourceObserver) { if (resourceObserver->resource == resPtr) @@ -129,46 +272,7 @@ OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, { #endif qos = DetermineObserverQoS(method, resourceObserver, qos); - - result = AddServerRequest(&request, 0, 0, 1, OC_REST_GET, - 0, resPtr->sequenceNum, qos, resourceObserver->query, - NULL, NULL, - resourceObserver->token, resourceObserver->tokenLength, - resourceObserver->resUri, 0, resourceObserver->acceptFormat, - &resourceObserver->devAddr); - - if (request) - { - request->observeResult = OC_STACK_OK; - if (result == OC_STACK_OK) - { - result = FormOCEntityHandlerRequest( - &ehRequest, - (OCRequestHandle) request, - request->method, - &request->devAddr, - (OCResourceHandle) resPtr, - request->query, - PAYLOAD_TYPE_REPRESENTATION, - request->payload, - request->payloadSize, - request->numRcvdVendorSpecificHeaderOptions, - request->rcvdVendorSpecificHeaderOptions, - OC_OBSERVE_NO_OPTION, - 0, - request->coapID); - if (result == OC_STACK_OK) - { - ehResult = resPtr->entityHandler(OC_REQUEST_FLAG, &ehRequest, - resPtr->entityHandlerCallbackParam); - if (ehResult == OC_EH_ERROR) - { - FindAndDeleteServerRequest(request); - } - } - OCPayloadDestroy(ehRequest.payload); - } - } + result = SendObserveNotification(resourceObserver, qos); #ifdef WITH_PRESENCE } else @@ -192,6 +296,7 @@ OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, if (!presenceResBuf) { + oc_mutex_unlock(g_serverObsListMutex); return OC_STACK_NO_MEMORY; } @@ -200,11 +305,16 @@ OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, ehResponse.ehResult = OC_EH_OK; ehResponse.payload = (OCPayload*)presenceResBuf; ehResponse.persistentBufferFlag = 0; - ehResponse.requestHandle = (OCRequestHandle) request; + ehResponse.requestHandle = (OCRequestHandle) request->requestId; ehResponse.resourceHandle = (OCResourceHandle) resPtr; OICStrcpy(ehResponse.resourceUri, sizeof(ehResponse.resourceUri), resourceObserver->resUri); result = OCDoResponse(&ehResponse); + if (result != OC_STACK_OK) + { + OIC_LOG(ERROR, TAG, "Failed to send presence notification!"); + FindAndDeleteServerRequest(request); + } } OCPresencePayloadDestroy(presenceResBuf); @@ -221,6 +331,8 @@ OCStackResult SendAllObserverNotification (OCMethod method, OCResource *resPtr, resourceObserver = resourceObserver->next; } + oc_mutex_unlock(g_serverObsListMutex); + if (numObs == 0) { OIC_LOG(INFO, TAG, "Resource has no observers"); @@ -256,7 +368,8 @@ OCStackResult SendListObserverNotification (OCResource * resource, OIC_LOG(INFO, TAG, "Entering SendListObserverNotification"); while(numIds) { - observer = GetObserverUsingId (*obsIdList); + oc_mutex_lock(g_serverObsListMutex); + observer = GetObserverUsingIdAsOwner (*obsIdList); if (observer) { // Found observer - verify if it matches the resource handle @@ -282,11 +395,12 @@ OCStackResult SendListObserverNotification (OCResource * resource, if (!ehResponse.payload) { FindAndDeleteServerRequest(request); + oc_mutex_unlock(g_serverObsListMutex); continue; } memcpy(ehResponse.payload, payload, sizeof(*payload)); ehResponse.persistentBufferFlag = 0; - ehResponse.requestHandle = (OCRequestHandle) request; + ehResponse.requestHandle = (OCRequestHandle) request->requestId; ehResponse.resourceHandle = (OCResourceHandle) resource; result = OCDoResponse(&ehResponse); if (result == OC_STACK_OK) @@ -295,20 +409,25 @@ OCStackResult SendListObserverNotification (OCResource * resource, // Increment only if OCDoResponse is successful numSentNotification++; - - OICFree(ehResponse.payload); - FindAndDeleteServerRequest(request); } else { OIC_LOG_V(INFO, TAG, "Error notifying observer id %d.", *obsIdList); + FindAndDeleteServerRequest(request); } + + // Reset Observer TTL. + observer->TTL = + GetTicks(MAX_OBSERVER_TTL_SECONDS * MILLISECONDS_PER_SECOND); + + OICFree(ehResponse.payload); } else { FindAndDeleteServerRequest(request); } } + // Since we are in a loop, set an error flag to indicate // at least one error occurred. if (result != OC_STACK_OK) @@ -317,6 +436,8 @@ OCStackResult SendListObserverNotification (OCResource * resource, } } } + + oc_mutex_unlock(g_serverObsListMutex); obsIdList++; numIds--; } @@ -338,17 +459,20 @@ OCStackResult SendListObserverNotification (OCResource * resource, OCStackResult GenerateObserverId (OCObservationId *observationId) { - ResourceObserver *resObs = NULL; + bool found = false; OIC_LOG(INFO, TAG, "Entering GenerateObserverId"); VERIFY_NON_NULL (observationId); do { - *observationId = OCGetRandomByte(); + do + { + *observationId = OCGetRandomByte(); + } while (0 == *observationId); //Make sure *observationId is not 0 // Check if observation Id already exists - resObs = GetObserverUsingId (*observationId); - } while (NULL != resObs); + found = IsObserverAvailable (*observationId); + } while (found); OIC_LOG_V(INFO, TAG, "GeneratedObservation ID is %u", *observationId); @@ -410,7 +534,20 @@ OCStackResult AddObserver (const char *resUri, obsNode->devAddr = *devAddr; obsNode->resource = resHandle; +#ifdef WITH_PRESENCE + if ((strcmp(resUri, OC_RSRVD_PRESENCE_URI) == 0)) + { + obsNode->TTL = 0; + } + else +#endif + { + obsNode->TTL = GetTicks(MAX_OBSERVER_TTL_SECONDS * MILLISECONDS_PER_SECOND); + } + + oc_mutex_lock(g_serverObsListMutex); LL_APPEND (g_serverObsList, obsNode); + oc_mutex_unlock(g_serverObsListMutex); return OC_STACK_OK; } @@ -425,41 +562,144 @@ exit: return OC_STACK_NO_MEMORY; } +/* + * This function checks if the node is past its time to live and + * deletes it if timed-out. Calling this function with a presence callback + * with ttl set to 0 will not delete anything as presence nodes have + * their own mechanisms for timeouts. A null argument will cause the function to + * silently return. + */ +static void CheckTimedOutObserver(ResourceObserver* observer) +{ + if (!observer || observer->TTL == 0) + { + return; + } + + coap_tick_t now; + coap_ticks(&now); + + if (observer->TTL < now) + { + // Send confirmable notification message to observer. + OIC_LOG(INFO, TAG, "Sending High-QoS notification to observer"); + SendObserveNotification(observer, OC_HIGH_QOS); + } +} + ResourceObserver* GetObserverUsingId (const OCObservationId observeId) { ResourceObserver *out = NULL; if (observeId) { + oc_mutex_lock(g_serverObsListMutex); + LL_FOREACH (g_serverObsList, out) + { + if (out->observeId == observeId) + { + oc_mutex_unlock(g_serverObsListMutex); + return CloneObserverNode(out); + } + CheckTimedOutObserver(out); + } + oc_mutex_unlock(g_serverObsListMutex); + } + OIC_LOG(INFO, TAG, "Observer node not found!!"); + return NULL; +} + +static ResourceObserver* GetObserverUsingIdAsOwner (const OCObservationId observeId) +{ + ResourceObserver *out = NULL; + + if (observeId) + { LL_FOREACH (g_serverObsList, out) { if (out->observeId == observeId) { return out; } + CheckTimedOutObserver(out); } } OIC_LOG(INFO, TAG, "Observer node not found!!"); return NULL; } -ResourceObserver* GetObserverUsingToken (const CAToken_t token, uint8_t tokenLength) +bool IsObserverAvailable (const OCObservationId observeId) { ResourceObserver *out = NULL; + if (observeId) + { + oc_mutex_lock(g_serverObsListMutex); + LL_FOREACH (g_serverObsList, out) + { + if (out->observeId == observeId) + { + oc_mutex_unlock(g_serverObsListMutex); + return true; + } + } + oc_mutex_unlock(g_serverObsListMutex); + } + + return false; +} + +ResourceObserver* GetObserverUsingToken (const CAToken_t token, uint8_t tokenLength) +{ + if (token) + { + OIC_LOG(INFO, TAG, "Looking for token"); + OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)token, tokenLength); + + ResourceObserver *out = NULL; + oc_mutex_lock(g_serverObsListMutex); + LL_FOREACH (g_serverObsList, out) + { + /* de-annotate below line if want to see all token in cbList */ + //OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)out->token, tokenLength); + if ((memcmp(out->token, token, tokenLength) == 0)) + { + OIC_LOG(INFO, TAG, "Found in observer list"); + ResourceObserver *observer = CloneObserverNode(out); + oc_mutex_unlock(g_serverObsListMutex); + return observer; + } + CheckTimedOutObserver(out); + } + oc_mutex_unlock(g_serverObsListMutex); + } + else + { + OIC_LOG(ERROR, TAG, "Passed in NULL token"); + } + + OIC_LOG(INFO, TAG, "Observer node not found!!"); + return NULL; +} + +static ResourceObserver* GetObserverUsingTokenAsOwner (const CAToken_t token, uint8_t tokenLength) +{ if (token) { OIC_LOG(INFO, TAG, "Looking for token"); OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)token, tokenLength); - OIC_LOG(INFO, TAG, "\tFound token:"); + ResourceObserver *out = NULL; LL_FOREACH (g_serverObsList, out) { - OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)out->token, tokenLength); + /* de-annotate below line if want to see all token in cbList */ + //OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)out->token, tokenLength); if ((memcmp(out->token, token, tokenLength) == 0)) { + OIC_LOG(INFO, TAG, "Found in observer list"); return out; } + CheckTimedOutObserver(out); } } else @@ -478,33 +718,72 @@ OCStackResult DeleteObserverUsingToken (CAToken_t token, uint8_t tokenLength) return OC_STACK_INVALID_PARAM; } - ResourceObserver *obsNode = GetObserverUsingToken (token, tokenLength); + oc_mutex_lock(g_serverObsListMutex); + ResourceObserver *obsNode = GetObserverUsingTokenAsOwner (token, tokenLength); if (obsNode) { OIC_LOG_V(INFO, TAG, "deleting observer id %u with token", obsNode->observeId); OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)obsNode->token, tokenLength); + LL_DELETE (g_serverObsList, obsNode); - OICFree(obsNode->resUri); - OICFree(obsNode->query); - OICFree(obsNode->token); - OICFree(obsNode); + FreeObserver(obsNode); } + oc_mutex_unlock(g_serverObsListMutex); + // it is ok if we did not find the observer... return OC_STACK_OK; } -void DeleteObserverList() +OCStackResult DeleteObserverUsingDevAddr(const OCDevAddr *devAddr) { + if (!devAddr) + { + return OC_STACK_INVALID_PARAM; + } + + oc_mutex_lock(g_serverObsListMutex); + ResourceObserver* obsDupList = CloneObserverList(g_serverObsList); + oc_mutex_unlock(g_serverObsListMutex); + ResourceObserver *out = NULL; ResourceObserver *tmp = NULL; - LL_FOREACH_SAFE (g_serverObsList, out, tmp) + LL_FOREACH_SAFE(obsDupList, out, tmp) { if (out) { - DeleteObserverUsingToken ((out->token), out->tokenLength); + if ((strcmp(out->devAddr.addr, devAddr->addr) == 0) + && out->devAddr.port == devAddr->port) + { + OIC_LOG_V(INFO, TAG, "deleting observer id %u with %s:%u", + out->observeId, out->devAddr.addr, out->devAddr.port); + OCStackFeedBack(out->token, out->tokenLength, OC_OBSERVER_NOT_INTERESTED); + } } } + + FreeObserverList(obsDupList); + return OC_STACK_OK; +} + +void DeleteObserverList() +{ + oc_mutex_lock(g_serverObsListMutex); + + ResourceObserver* head = g_serverObsList; + ResourceObserver* del = NULL; + while (NULL != head) + { + del = head; + head = head->next; + + OIC_LOG_V(INFO, TAG, "deleting observer id %u with token", del->observeId); + OIC_LOG_BUFFER(INFO, TAG, (const uint8_t *)del->token, del->tokenLength); + + FreeObserver(del); + } + g_serverObsList = NULL; + oc_mutex_unlock(g_serverObsListMutex); } /* @@ -593,3 +872,41 @@ GetObserveHeaderOption (uint32_t * observationOption, return OC_STACK_OK; } +OCStackResult InitializeObseverList() +{ + OIC_LOG(DEBUG, TAG, "InitializeObseverList IN"); + + if (NULL == g_serverObsListMutex) + { + g_serverObsListMutex = oc_mutex_new(); + } + + OIC_LOG(DEBUG, TAG, "InitializeObseverList OUT"); + return OC_STACK_OK; +} + +void TerminateObserverList() +{ + OIC_LOG(DEBUG, TAG, "TerminateObserverList IN"); + + DeleteObserverList(); + + if (NULL != g_serverObsListMutex) + { + oc_mutex_free(g_serverObsListMutex); + g_serverObsListMutex = NULL; + } + + OIC_LOG(DEBUG, TAG, "TerminateObserverList OUT"); +} + +void FreeObserver (ResourceObserver* obsNode) +{ + if (NULL != obsNode) + { + OICFree(obsNode->resUri); + OICFree(obsNode->query); + OICFree(obsNode->token); + OICFree(obsNode); + } +}