From 7549a6f3e09aa5f8a55188f33ce951074260e682 Mon Sep 17 00:00:00 2001 From: Vijay Date: Wed, 17 Sep 2014 13:48:56 -0700 Subject: [PATCH] Support for notifying specific observers using observation Id 1. Initial code changes 2. Combine GET & observe registration calls to entity handler into a single call; support for CON messages in NotifySpecificObservers; updates sample server app 3. Added the support for immediate observe cancelation in OCCancel 4. Fixed the new interface to OCCancel in C++ layer 5. A temp solution for the case when observation sequence number wraps around. 6. Merging with Yamin's changes 7. Changed OCNotifyObservers to OCNotifyAllObservers 8. Build issue 9. Another Merge to support calling the entity handler when deregister an observer 10. Rebase & CPP Changes from Sudarshan and Sashi to support Notify specific observers. 11. Command line argument options for simpleserver (so it can choose between all observers or specific observers) 12. CPP bugfix. 13. Fixed review comments for CPP. 14. Changed COAP_DEFAULT_RESPONSE_TIMEOUT to 2 and all the sample applications to sleep accordingly. 15. Fixed review comment for new line. 16. Fixed review commend for line exceed. 17. Response to comments received on patch 11 18. Header file comments updated Change-Id: I03aa7ab2a580899441b70bd2a76cca05575b192a --- csdk/libcoap-4.1.1/pdu.h | 5 +- csdk/occoap/include/occoaphelper.h | 4 +- csdk/occoap/src/occoap.c | 60 +++--- csdk/occoap/src/occoaphelper.c | 22 +- csdk/stack/include/internal/occlientcb.h | 2 +- csdk/stack/include/internal/ocobserve.h | 28 ++- csdk/stack/include/internal/ocstackinternal.h | 4 +- csdk/stack/include/ocstack.h | 77 ++++++- .../SimpleClientServer/ocserver/ocserver.cpp | 2 +- .../samples/linux/SimpleClientServer/makefile | 4 +- .../samples/linux/SimpleClientServer/occlient.cpp | 38 +++- .../linux/SimpleClientServer/occlientcoll.cpp | 4 +- .../samples/linux/SimpleClientServer/ocserver.cpp | 222 ++++++++++++++++++--- .../linux/SimpleClientServer/ocservercoll.cpp | 4 +- csdk/stack/src/occlientcb.c | 11 +- csdk/stack/src/ocobserve.c | 181 +++++++++++++---- csdk/stack/src/ocresource.c | 18 +- csdk/stack/src/ocstack.c | 202 +++++++++++++++---- examples/ocicuc/light_resource.cpp | 2 +- examples/roomserver.cpp | 6 +- examples/simpleclient.cpp | 2 +- examples/simpleserver.cpp | 79 +++++++- include/OCApi.h | 25 ++- include/OCPlatform.h | 24 ++- include/OCResourceRequest.h | 36 +++- src/InProcClientWrapper.cpp | 4 +- src/InProcServerWrapper.cpp | 90 +++++---- src/OCPlatform.cpp | 36 +++- 28 files changed, 932 insertions(+), 260 deletions(-) diff --git a/csdk/libcoap-4.1.1/pdu.h b/csdk/libcoap-4.1.1/pdu.h index b6039a6..6e9b282 100644 --- a/csdk/libcoap-4.1.1/pdu.h +++ b/csdk/libcoap-4.1.1/pdu.h @@ -25,8 +25,9 @@ extern "C" { // This value is based on the DEFAULT_LEISURE (5 seconds) defined in RFC 7252 #define MAX_MULTICAST_DELAY_SEC (5) -#define COAP_DEFAULT_RESPONSE_TIMEOUT 3 /* response timeout in seconds */ -#define COAP_DEFAULT_MAX_RETRANSMIT 3 /* max number of retransmissions */ +/* response timeout in seconds, this can not be less that the delays between calls to OCProcess*/ +#define COAP_DEFAULT_RESPONSE_TIMEOUT 2 +#define COAP_DEFAULT_MAX_RETRANSMIT 4 /* max number of retransmissions */ #define COAP_DEFAULT_PORT 5683 /* CoAP default UDP port */ #define COAP_DEFAULT_MAX_AGE 60 /* default maximum object lifetime in seconds */ #ifndef COAP_MAX_PDU_SIZE diff --git a/csdk/occoap/include/occoaphelper.h b/csdk/occoap/include/occoaphelper.h index e69f916..8d79986 100644 --- a/csdk/occoap/include/occoaphelper.h +++ b/csdk/occoap/include/occoaphelper.h @@ -82,7 +82,7 @@ OCStackResult RetrieveOCCoAPToken(const coap_pdu_t * pdu, OCCoAPToken * * rcvdTokenLoc); // Internal function to create OCObserveReq at the server -OCStackResult FormOCObserveReq(OCObserveReq ** observeReqLoc, uint8_t obsOption, +OCStackResult FormOCObserveReq(OCObserveReq ** observeReqLoc, uint32_t obsOption, OCDevAddr * remote, OCCoAPToken * rcvdToken); // Internal function to create OCResponse struct at the client from a received coap pdu @@ -99,7 +99,7 @@ void HandleSendQueue(coap_context_t * gCoAPCtx); // Internal function to form the standard response option list OCStackResult FormOptionList(coap_list_t * * optListLoc, uint8_t * addMediaType, - uint32_t * addMaxAge, uint8_t observeOptionLength, uint8_t * observeOptionPtr, + uint32_t * addMaxAge, uint8_t observeOptionLength, uint32_t * observeOptionPtr, uint16_t * addPortNumber, uint8_t uriLength, unsigned char * uri, uint8_t queryLength, unsigned char * query); diff --git a/csdk/occoap/src/occoap.c b/csdk/occoap/src/occoap.c index af1fdbe..fd531d9 100644 --- a/csdk/occoap/src/occoap.c +++ b/csdk/occoap/src/occoap.c @@ -76,7 +76,7 @@ static void HandleCoAPAckRst(struct coap_context_t * ctx, uint8_t msgType, // silence warnings (void) ctx; - OCStackResult result = OC_STACK_ERROR; + OCStackResult result = OC_STACK_OK; OCCoAPToken * sentToken = NULL; uint8_t * observeOption = NULL; coap_pdu_t * sentPdu = sentQueue->pdu; @@ -91,18 +91,26 @@ static void HandleCoAPAckRst(struct coap_context_t * ctx, uint8_t msgType, if(msgType == COAP_MESSAGE_RST){ // now the observer should be deleted - result = OCObserverStatus(sentToken, OC_OBSERVER_NOT_INTERESTED); - if(result == OC_STACK_OK){ - OC_LOG_V(DEBUG, TAG, "Received RST, removing all queues associated with Token %d bytes",sentToken->tokenLength); - OC_LOG_BUFFER(INFO, TAG, sentToken->token, sentToken->tokenLength); - coap_cancel_all_messages(ctx, &sentQueue->remote, sentToken->token, - sentToken->tokenLength); + if(myStackMode != OC_CLIENT) + { + result = OCObserverStatus(sentToken, OC_OBSERVER_NOT_INTERESTED); + if(result == OC_STACK_OK){ + OC_LOG_V(DEBUG, TAG, + "Received RST, removing all queues associated with Token %d bytes", + sentToken->tokenLength); + OC_LOG_BUFFER(INFO, TAG, sentToken->token, sentToken->tokenLength); + coap_cancel_all_messages(ctx, &sentQueue->remote, sentToken->token, + sentToken->tokenLength); + } } }else if(observeOption && msgType == COAP_MESSAGE_ACK){ OC_LOG_V(DEBUG, TAG, "Received ACK, for Token %d bytes",sentToken->tokenLength); OC_LOG_BUFFER(INFO, TAG, sentToken->token, sentToken->tokenLength); // now the observer is still interested - OCObserverStatus(sentToken, OC_OBSERVER_STILL_INTERESTED); + if(myStackMode != OC_CLIENT) + { + OCObserverStatus(sentToken, OC_OBSERVER_STILL_INTERESTED); + } } exit: @@ -139,7 +147,7 @@ static void HandleCoAPRequests(struct coap_context_t *ctx, unsigned char bufRes[MAX_RESPONSE_LENGTH] = { 0 }; uint8_t * rcvObserveOption = NULL; unsigned char * bufReqPayload = NULL; - uint8_t observeOption = OC_RESOURCE_NO_OBSERVE; + uint32_t observeOption = OC_RESOURCE_NO_OBSERVE; coap_pdu_t * recvPdu = rcvdRequest->pdu; @@ -147,7 +155,7 @@ static void HandleCoAPRequests(struct coap_context_t *ctx, result = ParseCoAPPdu(recvPdu, rcvdUri, rcvdQuery, &rcvObserveOption, NULL, &bufReqPayload); VERIFY_SUCCESS(result, OC_STACK_OK); if(rcvObserveOption){ - observeOption = (uint8_t)(*rcvObserveOption); + observeOption = (uint32_t)(*rcvObserveOption); } // fill OCCoAPToken structure @@ -187,9 +195,8 @@ static void HandleCoAPRequests(struct coap_context_t *ctx, } #endif - TODO("we should return the same registration option; however, this will confuse the receiver \ - whether it is a sequence number or registration option!------------"); - OC_LOG_V(INFO, TAG, "Response from ocstack: %s", request->entityHandlerRequest->resJSONPayload); + OC_LOG_V(INFO, TAG, "Response from ocstack: %s", + request->entityHandlerRequest->resJSONPayload); if(rcvdObsReq) { @@ -197,8 +204,9 @@ static void HandleCoAPRequests(struct coap_context_t *ctx, { case OC_STACK_OK: observeOption = rcvdObsReq->option; - result = FormOptionList(&optList, &mediaType, &maxAge, 0, NULL, NULL, - 0, NULL, 0, NULL); + result = FormOptionList(&optList, &mediaType, &maxAge, + sizeof(observeOption), &observeOption, + NULL, 0, NULL, 0, NULL); break; case OC_STACK_OBSERVER_NOT_ADDED: case OC_STACK_OBSERVER_NOT_REMOVED: @@ -257,7 +265,7 @@ static void HandleCoAPResponses(struct coap_context_t *ctx, unsigned char * bufRes = NULL; uint8_t * rcvObserveOption = NULL; uint8_t * rcvMaxAgeOption = NULL; - uint32_t sequenceNumber = 0; + uint32_t sequenceNumber = OC_RESOURCE_NO_OBSERVE; uint32_t maxAge = 0; OCStackResult result = OC_STACK_ERROR; coap_pdu_t *sendPdu = NULL; @@ -293,7 +301,7 @@ static void HandleCoAPResponses(struct coap_context_t *ctx, OC_LOG_V(DEBUG, TAG, "The maxAge/TTL of this response %u", maxAge); OC_LOG_V(DEBUG, TAG, "The response received is %s", bufRes); - if(sequenceNumber != 0) + if(sequenceNumber >= OC_OFFSET_SEQUENCE_NUMBER) { isObserveNotification = 1; OC_LOG(INFO, TAG, PCF("Received an observe notification")); @@ -355,8 +363,11 @@ static void HandleCoAPResponses(struct coap_context_t *ctx, result = SendCoAPPdu(gCoAPCtx, (coap_address_t*) &rcvdResponse->remote, sendPdu, 0); } + //TODO: check the standard for methods to detect wrap around condition if(cbNode->method == OC_REST_OBSERVE && - clientResponse->sequenceNumber <= cbNode->sequenceNumber) + (clientResponse->sequenceNumber <= cbNode->sequenceNumber || + (clientResponse->sequenceNumber > cbNode->sequenceNumber && + clientResponse->sequenceNumber == MAX_SEQUENCE_NUMBER))) { OC_LOG_V(DEBUG, TAG, "Observe notification came out of order. \ Ignoring Incoming:%d Against Current:%d.", @@ -555,7 +566,7 @@ OCStackResult OCDoCoAPResource(OCMethod method, OCQualityOfService qos, OCCoAPTo coap_list_t *optList = NULL; uint8_t coapMsgType; uint8_t coapMethod; - uint8_t observeOption; + uint32_t observeOption; OC_LOG(INFO, TAG, PCF("Entering OCDoCoAPResource")); @@ -600,11 +611,12 @@ OCStackResult OCDoCoAPResource(OCMethod method, OCQualityOfService qos, OCCoAPTo break; case OC_REST_OBSERVE_ALL: case OC_REST_OBSERVE: + case OC_REST_CANCEL_OBSERVE: coapMethod = COAP_REQUEST_GET; - observeOption = OC_RESOURCE_OBSERVE_REGISTER; + observeOption = (method == OC_REST_CANCEL_OBSERVE)? + OC_RESOURCE_OBSERVE_DEREGISTER:OC_RESOURCE_OBSERVE_REGISTER; coap_insert(&optList, CreateNewOptionNode(COAP_OPTION_OBSERVE, - sizeof(observeOption), &observeOption), OrderOptions); - + sizeof(observeOption), (uint8_t *)&observeOption), OrderOptions); break; default: coapMethod = OC_REST_NOMETHOD; @@ -654,8 +666,8 @@ OCStackResult OCSendCoAPNotification (unsigned char * uri, OCDevAddr *dstAddr, O else { #endif - result = FormOptionList(&optList, &mediaType, &maxAge, 4, - (uint8_t *)(&seqNum), NULL, strlen((char *)uri), uri, 0, NULL); + result = FormOptionList(&optList, &mediaType, &maxAge, sizeof(seqNum), + &seqNum, NULL, strlen((char *)uri), uri, 0, NULL); #ifdef WITH_PRESENCE } #endif diff --git a/csdk/occoap/src/occoaphelper.c b/csdk/occoap/src/occoaphelper.c index cb12f78..a173fe5 100644 --- a/csdk/occoap/src/occoaphelper.c +++ b/csdk/occoap/src/occoaphelper.c @@ -271,7 +271,7 @@ OCStackResult FormOCRequest(OCRequest * * requestLoc, OCQualityOfService qos, } // Form the OCObserveReq struct -OCStackResult FormOCObserveReq(OCObserveReq ** observeReqLoc, uint8_t observeOption, +OCStackResult FormOCObserveReq(OCObserveReq ** observeReqLoc, uint32_t observeOption, OCDevAddr * remote, OCCoAPToken * rcvdToken) { OCObserveReq * observeReq; @@ -324,6 +324,8 @@ OCStackResult FormOCEntityHandlerRequest(OCEntityHandlerRequest * * entityHandle entityHandlerRequest->resJSONPayload = resBuf; entityHandlerRequest->resJSONPayloadLen = MAX_RESPONSE_LENGTH; + entityHandlerRequest->obsInfo = NULL; + *entityHandlerRequestLoc = entityHandlerRequest; return OC_STACK_OK; } @@ -384,7 +386,7 @@ OCStackResult FormOCClientResponse(OCClientResponse * * clientResponseLoc, } OCStackResult FormOptionList(coap_list_t * * optListLoc, uint8_t * addMediaType, - uint32_t * addMaxAge, uint8_t observeOptionLength, uint8_t * observeOptionPtr, + uint32_t * addMaxAge, uint8_t observeOptionLength, uint32_t * observeOptionPtr, uint16_t * addPortNumber, uint8_t uriLength, unsigned char * uri, uint8_t queryLength, unsigned char * query) { @@ -486,6 +488,13 @@ SendCoAPPdu(coap_context_t * gCoAPCtx, coap_address_t* dst, coap_pdu_t * pdu, tid = coap_send(gCoAPCtx, dst, pdu, sendFlag); OC_LOG_V(INFO, TAG, "TID %d", tid); + if(tid != COAP_INVALID_TID) + { + OC_LOG(INFO, TAG, PCF("Sending a pdu with Token:")); + OC_LOG_BUFFER(INFO,TAG, pdu->hdr->token, pdu->hdr->token_length); + res = OC_STACK_OK; + } + if ((pdu->hdr->type != COAP_MESSAGE_CON && !delayFlag) || tid == COAP_INVALID_TID) { OC_LOG(INFO, TAG, PCF("Deleting PDU")); @@ -496,13 +505,6 @@ SendCoAPPdu(coap_context_t * gCoAPCtx, coap_address_t* dst, coap_pdu_t * pdu, OC_LOG(INFO, TAG, PCF("Keeping PDU, we will handle the retry/delay of this pdu")); } - if(tid != COAP_INVALID_TID) - { - OC_LOG(INFO, TAG, PCF("Sending a pdu with Token:")); - OC_LOG_BUFFER(INFO,TAG, pdu->hdr->token, pdu->hdr->token_length); - res = OC_STACK_OK; - } - return res; } @@ -657,7 +659,7 @@ OCStackResult HandleFailedCommunication(coap_context_t * ctx, coap_queue_t * que HandleStackResponses(response); observation: - observer = GetObserver(token); + observer = GetObserverUsingToken (token); if(!observer) { goto exit; diff --git a/csdk/stack/include/internal/occlientcb.h b/csdk/stack/include/internal/occlientcb.h index 105e8b4..ff72c56 100644 --- a/csdk/stack/include/internal/occlientcb.h +++ b/csdk/stack/include/internal/occlientcb.h @@ -114,7 +114,7 @@ void DeleteClientCB(ClientCB *cbNode); * @retval address of the node if found, otherwise NULL */ //------------------------------------------------------------------------ -ClientCB* GetClientCB(OCCoAPToken * token, OCDoHandle * handle, unsigned char * requestUri); +ClientCB* GetClientCB(OCCoAPToken * token, OCDoHandle handle, unsigned char * requestUri); //-- DeleteClientCBList -------------------------------------------------- /** @ingroup ocstack diff --git a/csdk/stack/include/internal/ocobserve.h b/csdk/stack/include/internal/ocobserve.h index 0b0f516..6998cd4 100644 --- a/csdk/stack/include/internal/ocobserve.h +++ b/csdk/stack/include/internal/ocobserve.h @@ -36,10 +36,10 @@ /* This information is stored for each registerd observer */ typedef struct ResourceObserver { + // Observation Identifier for request + OCObservationId observeId; // URI of observed resource unsigned char *resUri; - //Quality of service of the request - OCQualityOfService qos; // Query unsigned char *query; // CoAP token for the observe request @@ -48,6 +48,8 @@ typedef struct ResourceObserver { OCResource *resource; // IP address & port of client registered for observe OCDevAddr *addr; + // Quality of service of the request + OCQualityOfService qos; // number of times the server failed to reach the observer uint8_t failedCommCount; // number of times the server sent NON notifications @@ -66,14 +68,20 @@ OCStackResult SendObserverNotification (OCMethod method, OCResource *resPtr, uin void DeleteObserverList(); -OCStackResult AddObserver ( const char *resUri, - const char *query, - OCCoAPToken * token, - OCDevAddr *addr, - OCResource *resHandle, - OCQualityOfService qos); -OCStackResult DeleteObserver (OCCoAPToken * token); +OCStackResult GenerateObserverId (OCObservationId *observationId); + +OCStackResult AddObserver (const char *resUri, + const char *query, + OCObservationId obsId, + OCCoAPToken *token, + OCDevAddr *addr, + OCResource *resHandle, + OCQualityOfService qos); + +OCStackResult DeleteObserverUsingToken (OCCoAPToken * token); + +ResourceObserver* GetObserverUsingToken (const OCCoAPToken * token); -ResourceObserver* GetObserver (const OCCoAPToken * token); +ResourceObserver* GetObserverUsingId (const OCObservationId observeId); #endif //OC_OBSERVE_H diff --git a/csdk/stack/include/internal/ocstackinternal.h b/csdk/stack/include/internal/ocstackinternal.h index 8e8ef06..b9fc8a3 100644 --- a/csdk/stack/include/internal/ocstackinternal.h +++ b/csdk/stack/include/internal/ocstackinternal.h @@ -43,7 +43,7 @@ extern "C" { // Defines //----------------------------------------------------------------------------- #define OC_COAP_SCHEME "coap://" - +#define OC_OFFSET_SEQUENCE_NUMBER (4) // the first outgoing sequence number will be 5 //----------------------------------------------------------------------------- // Virtual Resource Presence Attributes @@ -137,7 +137,7 @@ typedef struct rsrc_t { typedef struct { // Observe option field - uint8_t option; + uint32_t option; // IP address & port of client registered for observe OCDevAddr *subAddr; // CoAP token for the observe request diff --git a/csdk/stack/include/ocstack.h b/csdk/stack/include/ocstack.h index c0b2605..9d931b4 100644 --- a/csdk/stack/include/ocstack.h +++ b/csdk/stack/include/ocstack.h @@ -31,7 +31,8 @@ extern "C" { // Defines //----------------------------------------------------------------------------- -//May want to refactor this in upcoming sprints. Don't want to expose to application layer that lower level stack is using CoAP. +//TODO: May want to refactor this in upcoming sprints. +//Don't want to expose to application layer that lower level stack is using CoAP. #define OC_WELL_KNOWN_QUERY "coap://224.0.1.187:5683/oc/core" #define OC_EXPLICIT_DEVICE_DISCOVERY_URI "coap://224.0.1.187:5683/oc/core?rt=core.led" #define OC_MULTICAST_PREFIX "coap://224.0.1.187:5683" @@ -69,10 +70,15 @@ typedef enum { OC_REST_PUT = (1 << 1), // Write OC_REST_POST = (1 << 2), // Update OC_REST_DELETE = (1 << 3), // Delete - OC_REST_OBSERVE = (1 << 4), // Register observe request for most up date notifications ONLY. - OC_REST_OBSERVE_ALL = (1 << 5), // Register observe request for all notifications, including stale notifications. + // Register observe request for most up date notifications ONLY. + OC_REST_OBSERVE = (1 << 4), + // Register observe request for all notifications, including stale notifications. + OC_REST_OBSERVE_ALL = (1 << 5), + // Deregister observation, intended for internal use + OC_REST_CANCEL_OBSERVE = (1 << 6), #ifdef WITH_PRESENCE - OC_REST_PRESENCE = (1 << 6) // Subscribe for all presence notifications of a particular resource. + // Subscribe for all presence notifications of a particular resource. + OC_REST_PRESENCE = (1 << 7) #endif } OCMethod; @@ -145,6 +151,30 @@ typedef void * OCDoHandle; typedef void * OCResourceHandle; /** + * Unique identifier for each observation request. Used when observations are + * registered or deregistering. Used by entity handler to signal specific + * observers to be notified of resource changes. + * There can be maximum of 256 observations per server. + */ +typedef uint8_t OCObservationId; + +/** + * Action associated with observation + */ +typedef enum { + OC_OBSERVE_REGISTER = 0, + OC_OBSERVE_DEREGISTER = 1, + OC_OBSERVE_NO_OPTION = 2 +} OCObserveAction; + +typedef struct { + // Action associated with observation request + OCObserveAction action; + // Identifier for observation being registered/deregistered + OCObservationId obsId; +} OCObservationInfo; + +/** * Incoming requests handled by the server. Requests are passed in as a parameter to the @ref OCEntityHandler callback API. * @brief The @ref OCEntityHandler callback API must be implemented in the application in order to receive these requests. */ @@ -161,6 +191,9 @@ typedef struct { unsigned char * resJSONPayload; // Length of maximum allowed response uint16_t resJSONPayloadLen; + // Information associated with observation - valid only when OCEntityHandler + // flag includes OC_OBSERVE_FLAG + OCObservationInfo *obsInfo; }OCEntityHandlerRequest; /** @@ -293,12 +326,12 @@ OCStackResult OCDoResource(OCDoHandle *handle, OCMethod method, const char *req * Cancel a request associated with a specific @ref OCDoResource invocation. * * @param handle - Used to identify a specific OCDoResource invocation. - * + * @param qos - valid only when handle is for an observe method. * @return * OC_STACK_OK - No errors; Success * OC_STACK_INVALID_PARAM - The handle provided is invalid. */ -OCStackResult OCCancel(OCDoHandle handle); +OCStackResult OCCancel(OCDoHandle handle, OCQualityOfService qos); #ifdef WITH_PRESENCE /** @@ -557,17 +590,41 @@ OCResourceHandle OCGetResourceHandleFromCollection(OCResourceHandle collectionHa OCEntityHandler OCGetResourceHandler(OCResourceHandle handle); /** - * Notify observers that an observed value has changed. + * Notify all registered observers that the resource representation has + * changed. If observation includes a query the client is notified only + * if the query is valid after the resource representation has changed. * - * **NOTE: This API has NOT been finalized!!!** + * @param handle - handle of resource + * + * @return + * OC_STACK_OK - no errors + * OC_STACK_NO_RESOURCE - invalid resource handle + * OC_STACK_NO_OBSERVERS - no more observers intrested in resource + */ +OCStackResult OCNotifyAllObservers(OCResourceHandle handle); + +/** + * Notify specific observers with updated value of representation. + * Before this API is invoked by entity handler it has finished processing + * queries for the associated observers. * * @param handle - handle of resource + * @param obsIdList - list of observation ids that need to be notified + * @param numberOfIds - number of observation ids included in obsIdList + * @param notificationJSONPayload - JSON encoded payload to send in notification + * NOTE: The memory for obsIdList and notificationJSONPayload is managed by the + * entity invoking the API. The maximum size of the notification is 1015 bytes + * for non-Arduino platforms. For Arduino the maximum size is 247 bytes. * * @return * OC_STACK_OK - no errors - * OC_STACK_ERROR - stack not initialized + * OC_STACK_NO_RESOURCE - invalid resource handle */ -OCStackResult OCNotifyObservers(OCResourceHandle handle); +OCStackResult +OCNotifyListOfObservers (OCResourceHandle handle, + OCObservationId *obsIdList, + uint8_t numberOfIds, + unsigned char *notificationJSONPayload); #ifdef __cplusplus } diff --git a/csdk/stack/samples/arduino/SimpleClientServer/ocserver/ocserver.cpp b/csdk/stack/samples/arduino/SimpleClientServer/ocserver/ocserver.cpp index cd8c609..184bf02 100644 --- a/csdk/stack/samples/arduino/SimpleClientServer/ocserver/ocserver.cpp +++ b/csdk/stack/samples/arduino/SimpleClientServer/ocserver/ocserver.cpp @@ -216,7 +216,7 @@ void *ChangeLEDRepresentation (void *param) if (gLEDUnderObservation) { OC_LOG_V(INFO, TAG, " =====> Notifying stack of new power level %d\n", LED.power); - result = OCNotifyObservers (LED.handle); + result = OCNotifyAllObservers (LED.handle); if (OC_STACK_NO_OBSERVERS == result) { gLEDUnderObservation = 0; diff --git a/csdk/stack/samples/linux/SimpleClientServer/makefile b/csdk/stack/samples/linux/SimpleClientServer/makefile index e58dd21..000c6f9 100644 --- a/csdk/stack/samples/linux/SimpleClientServer/makefile +++ b/csdk/stack/samples/linux/SimpleClientServer/makefile @@ -18,8 +18,8 @@ # // # //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= # -# override with `make BUILD=release` -# default to debug build +# override with `make BUILD=debug` +# default to release build BUILD := release PLATFORM := linux CC := g++ diff --git a/csdk/stack/samples/linux/SimpleClientServer/occlient.cpp b/csdk/stack/samples/linux/SimpleClientServer/occlient.cpp index dd755c8..9459ed3 100644 --- a/csdk/stack/samples/linux/SimpleClientServer/occlient.cpp +++ b/csdk/stack/samples/linux/SimpleClientServer/occlient.cpp @@ -53,6 +53,7 @@ typedef enum { #ifdef WITH_PRESENCE TEST_OBS_PRESENCE, #endif + TEST_OBS_REQ_NON_CANCEL_IMM, MAX_TESTS } CLIENT_TEST; @@ -71,10 +72,10 @@ OCDoHandle gObserveDoHandle; OCDoHandle gPresenceHandle; #endif // After this crosses a threshold client deregisters for further notifications -int gNumObserveNotifies = 1; +int gNumObserveNotifies = 0; #ifdef WITH_PRESENCE -int gNumPresenceNotifies = 1; +int gNumPresenceNotifies = 0; #endif int gQuitFlag = 0; @@ -111,6 +112,7 @@ static void PrintUsage() #ifdef WITH_PRESENCE OC_LOG(INFO, TAG, "-t 8 : Discover Resources and Initiate Nonconfirmable presence"); #endif + OC_LOG(INFO, TAG, "-t 9 : Discover Resources and Initiate Nonconfirmable Observe Requests then cancel immediately"); } OCStackResult InvokeOCDoResource(std::ostringstream &query, @@ -132,7 +134,7 @@ OCStackResult InvokeOCDoResource(std::ostringstream &query, { OC_LOG_V(ERROR, TAG, "OCDoResource returns error %d with method %d", ret, method); } - else if (method == OC_REST_OBSERVE) + else if (method == OC_REST_OBSERVE || method == OC_REST_OBSERVE_ALL) { gObserveDoHandle = handle; } @@ -188,12 +190,29 @@ OCStackApplicationResult obsReqCB(void* ctx, OCDoHandle handle, OCClientResponse OC_LOG_V(INFO, TAG, "Callback Context for OBSERVE notification recvd successfully %d", gNumObserveNotifies); OC_LOG_V(INFO, TAG, "JSON = %s =============> Obs Response", clientResponse->resJSONPayload); gNumObserveNotifies++; - if (gNumObserveNotifies == 3) + if (gNumObserveNotifies == 5) { - printf ("************************** CANCEL OBSERVE\n"); - if (OCCancel (gObserveDoHandle) != OC_STACK_OK){ - OC_LOG(ERROR, TAG, "Observe cancel error"); + printf ("************************** CANCEL OBSERVE with "); + if(TEST_CASE == TEST_OBS_REQ_NON || TEST_CASE == TEST_OBS_REQ_CON){ + printf ("RESET\n"); + if (OCCancel (gObserveDoHandle, OC_NON_CONFIRMABLE) != OC_STACK_OK){ + OC_LOG(ERROR, TAG, "Observe cancel error"); + } + return OC_STACK_DELETE_TRANSACTION; + }else if(TEST_CASE == TEST_OBS_REQ_NON_CANCEL_IMM){ + printf ("Deregister\n"); + if (OCCancel (gObserveDoHandle, OC_CONFIRMABLE) != OC_STACK_OK){ + OC_LOG(ERROR, TAG, "Observe cancel error"); + } } + } + if(clientResponse->sequenceNumber == OC_OBSERVE_REGISTER){ + OC_LOG(INFO, TAG, "This also serves as a registration confirmation"); + }else if(clientResponse->sequenceNumber == OC_OBSERVE_DEREGISTER){ + OC_LOG(INFO, TAG, "This also serves as a deregistration confirmation"); + return OC_STACK_DELETE_TRANSACTION; + }else if(clientResponse->sequenceNumber == OC_OBSERVE_NO_OPTION){ + OC_LOG(INFO, TAG, "This also tells you that registration/deregistration failed"); return OC_STACK_DELETE_TRANSACTION; } } @@ -216,7 +235,7 @@ OCStackApplicationResult presenceCB(void* ctx, OCDoHandle handle, OCClientRespon if (gNumPresenceNotifies == 15) { printf ("************************** CANCEL PRESENCE\n"); - if (OCCancel (gPresenceHandle) != OC_STACK_OK){ + if (OCCancel (gPresenceHandle, OC_NON_CONFIRMABLE) != OC_STACK_OK){ OC_LOG(ERROR, TAG, "Presence cancel error"); } return OC_STACK_DELETE_TRANSACTION; @@ -260,6 +279,7 @@ OCStackApplicationResult discoveryReqCB(void* ctx, OCDoHandle handle, InitPutRequest(); break; case TEST_OBS_REQ_NON: + case TEST_OBS_REQ_NON_CANCEL_IMM: InitObserveRequest(OC_NON_CONFIRMABLE); break; case TEST_GET_UNAVAILABLE_RES_REQ_NON: @@ -413,7 +433,7 @@ int main(int argc, char* argv[]) { return 0; } - sleep(3); + sleep(2); } OC_LOG(INFO, TAG, "Exiting occlient main loop..."); diff --git a/csdk/stack/samples/linux/SimpleClientServer/occlientcoll.cpp b/csdk/stack/samples/linux/SimpleClientServer/occlientcoll.cpp index c691df4..4898a8c 100644 --- a/csdk/stack/samples/linux/SimpleClientServer/occlientcoll.cpp +++ b/csdk/stack/samples/linux/SimpleClientServer/occlientcoll.cpp @@ -100,7 +100,7 @@ OCStackApplicationResult getReqCB(void* ctx, OCDoHandle handle, OCClientResponse gNumObserveNotifies++; if (gNumObserveNotifies == 3) { - if (OCCancel (gObserveDoHandle) != OC_STACK_OK){ + if (OCCancel (gObserveDoHandle, OC_NON_CONFIRMABLE) != OC_STACK_OK){ OC_LOG(ERROR, TAG, "Observe cancel error"); } } @@ -296,7 +296,7 @@ int main(int argc, char* argv[]) { return 0; } - sleep(3); + sleep(2); } OC_LOG(INFO, TAG, "Exiting occlient main loop..."); diff --git a/csdk/stack/samples/linux/SimpleClientServer/ocserver.cpp b/csdk/stack/samples/linux/SimpleClientServer/ocserver.cpp index 873e5ad..46e745d 100644 --- a/csdk/stack/samples/linux/SimpleClientServer/ocserver.cpp +++ b/csdk/stack/samples/linux/SimpleClientServer/ocserver.cpp @@ -32,6 +32,8 @@ const char *getResult(OCStackResult result); #define TAG PCF("ocserver") +static int gObserveNotifyType = 3; + int gQuitFlag = 0; int gLEDUnderObservation = 0; void createLEDResource(); @@ -43,6 +45,14 @@ typedef struct LEDRESOURCE{ static LEDResource LED; +typedef struct { + OCObservationId observationId; + bool valid; +} Observers; + +#define SAMPLE_MAX_NUM_OBSERVATIONS 8 +Observers interestedObservers[SAMPLE_MAX_NUM_OBSERVATIONS]; + #ifdef WITH_PRESENCE static int stopPresenceCount = 10; #endif @@ -52,34 +62,121 @@ const char responsePayloadGet[] = "{\"href\":\"/a/led\",\"rep\":{\"state\":\"on\ const char responsePayloadPut[] = "{\"href\":\"/a/led\",\"rep\":{\"state\":\"off\",\"power\":0}}"; static uint16_t OC_WELL_KNOWN_PORT = 5683; -OCEntityHandlerResult OCEntityHandlerCb(OCEntityHandlerFlag flag, OCEntityHandlerRequest * entityHandlerRequest ) { - const char* typeOfMessage; +void ProcessGetRequest (OCEntityHandlerRequest *ehRequest) +{ + if (ehRequest->resJSONPayloadLen > strlen ((char *)responsePayloadGet)) + { + strncpy((char *)ehRequest->resJSONPayload, responsePayloadGet, + strlen((char *)responsePayloadGet)); + } + else + { + OC_LOG_V (INFO, TAG, "Response buffer: %d bytes is too small", + ehRequest->resJSONPayloadLen); + } +} - switch (flag) { - case OC_INIT_FLAG: - typeOfMessage = "OC_INIT_FLAG"; - break; - case OC_REQUEST_FLAG: - typeOfMessage = "OC_REQUEST_FLAG"; - break; - case OC_OBSERVE_FLAG: - typeOfMessage = "OC_OBSERVE_FLAG"; +void ProcessPutRequest (OCEntityHandlerRequest *ehRequest) +{ + if (ehRequest->resJSONPayloadLen > strlen ((char *)responsePayloadPut)) + { + strncpy((char *)ehRequest->resJSONPayload, responsePayloadPut, + strlen((char *)responsePayloadPut)); + } + else + { + OC_LOG_V (INFO, TAG, "Response buffer: %d bytes is too small", + ehRequest->resJSONPayloadLen); + } +} + +void ProcessObserveRegister (OCEntityHandlerRequest *ehRequest) +{ + OC_LOG_V (INFO, TAG, "Received observation registration request with observation Id %d", + ehRequest->obsInfo->obsId); + for (uint8_t i = 0; i < SAMPLE_MAX_NUM_OBSERVATIONS; i++) + { + if (interestedObservers[i].valid == false) + { + interestedObservers[i].observationId = ehRequest->obsInfo->obsId; + interestedObservers[i].valid = true; + gLEDUnderObservation = 1; break; - default: - typeOfMessage = "UNKNOWN"; - } - OC_LOG_V(INFO, TAG, "Receiving message type: %s", typeOfMessage); - if(entityHandlerRequest && flag == OC_REQUEST_FLAG){ //[CL] - if(OC_REST_GET == entityHandlerRequest->method) - strncpy((char *)entityHandlerRequest->resJSONPayload, responsePayloadGet, entityHandlerRequest->resJSONPayloadLen); - if(OC_REST_PUT == entityHandlerRequest->method) { - OC_LOG_V(INFO, TAG, "PUT JSON payload from client: %s", entityHandlerRequest->reqJSONPayload); - strncpy((char *)entityHandlerRequest->resJSONPayload, responsePayloadPut, entityHandlerRequest->resJSONPayloadLen); - } + } + } +} + +void ProcessObserveDeregister (OCEntityHandlerRequest *ehRequest) +{ + bool clientStillObserving = false; + + OC_LOG_V (INFO, TAG, "Received observation deregistration request for observation Id %d", + ehRequest->obsInfo->obsId); + for (uint8_t i = 0; i < SAMPLE_MAX_NUM_OBSERVATIONS; i++) + { + if (interestedObservers[i].observationId == ehRequest->obsInfo->obsId) + { + interestedObservers[i].valid = false; + } + if (interestedObservers[i].valid == true) + { + // Even if there is one single client observing we continue notifying entity handler + clientStillObserving = true; + } + } + if (clientStillObserving == false) + gLEDUnderObservation = 0; +} - } else if (entityHandlerRequest && flag == OC_OBSERVE_FLAG) { - //TODO : Should the OC_OBSERVE_FLAG behave just like a GET according to the standard? - gLEDUnderObservation = 1; +OCEntityHandlerResult +OCEntityHandlerCb (OCEntityHandlerFlag flag, + OCEntityHandlerRequest *entityHandlerRequest) +{ + const char* typeOfMessage; + + OC_LOG_V (INFO, TAG, "Inside entity handler - flags: 0x%x", flag); + if (flag & OC_INIT_FLAG) + { + OC_LOG (INFO, TAG, "Flag includes OC_INIT_FLAG"); + } + if (flag & OC_REQUEST_FLAG) + { + OC_LOG (INFO, TAG, "Flag includes OC_REQUEST_FLAG"); + if (entityHandlerRequest) + { + if (OC_REST_GET == entityHandlerRequest->method) + { + OC_LOG (INFO, TAG, "Received OC_REST_GET from client"); + ProcessGetRequest (entityHandlerRequest); + } + else if (OC_REST_PUT == entityHandlerRequest->method) + { + OC_LOG (INFO, TAG, "Received OC_REST_PUT from client"); + ProcessPutRequest (entityHandlerRequest); + } + else + { + OC_LOG_V (INFO, TAG, "Received unsupported method %d from client", + entityHandlerRequest->method); + } + } + } + if (flag & OC_OBSERVE_FLAG) + { + OC_LOG(INFO, TAG, "Flag includes OC_OBSERVE_FLAG"); + if (entityHandlerRequest) + { + if (OC_OBSERVE_REGISTER == entityHandlerRequest->obsInfo->action) + { + OC_LOG (INFO, TAG, "Received OC_OBSERVE_REGISTER from client"); + ProcessObserveRegister (entityHandlerRequest); + } + else if (OC_OBSERVE_DEREGISTER == entityHandlerRequest->obsInfo->action) + { + OC_LOG (INFO, TAG, "Received OC_OBSERVE_DEREGISTER from client"); + ProcessObserveDeregister (entityHandlerRequest); + } + } } return OC_EH_OK; @@ -97,6 +194,10 @@ void *ChangeLEDRepresentation (void *param) (void)param; OCStackResult result = OC_STACK_ERROR; + uint8_t j = 0; + uint8_t numNotifies = (SAMPLE_MAX_NUM_OBSERVATIONS)/2; + OCObservationId obsNotify[numNotifies]; + while (1) { sleep(10); @@ -104,10 +205,35 @@ void *ChangeLEDRepresentation (void *param) if (gLEDUnderObservation) { OC_LOG_V(INFO, TAG, " =====> Notifying stack of new power level %d\n", LED.power); - result = OCNotifyObservers (LED.handle); - if (OC_STACK_NO_OBSERVERS == result) + if (gObserveNotifyType == 1) + { + // Notify list of observers. Alternate observers on the list will be notified. + j = 0; + for (uint8_t i = 0; i < SAMPLE_MAX_NUM_OBSERVATIONS; (i=i+2)) + { + if (interestedObservers[i].valid == true) + { + obsNotify[j] = interestedObservers[i].observationId; + j++; + } + } + result = OCNotifyListOfObservers (LED.handle, obsNotify, j, + (unsigned char *)responsePayloadGet); + } + else if (gObserveNotifyType == 0) + { + // Notifying all observers + result = OCNotifyAllObservers (LED.handle); + if (OC_STACK_NO_OBSERVERS == result) + { + OC_LOG (INFO, TAG, + "=======> No more observers exist, stop sending observations"); + gLEDUnderObservation = 0; + } + } + else { - gLEDUnderObservation = 0; + OC_LOG (ERROR, TAG, "Incorrect notification type selected"); } } #ifdef WITH_PRESENCE @@ -122,14 +248,42 @@ void *ChangeLEDRepresentation (void *param) return NULL; } -int main() { - OC_LOG(DEBUG, TAG, "OCServer is starting..."); +static void PrintUsage() +{ + OC_LOG(INFO, TAG, "Usage : ocserver -o <0|1>"); + OC_LOG(INFO, TAG, "-o 0 : Notify all observers"); + OC_LOG(INFO, TAG, "-o 1 : Notify list of observers"); +} + +int main(int argc, char* argv[]) +{ uint8_t addr[20] = {0}; uint8_t* paddr = NULL; uint16_t port = OC_WELL_KNOWN_PORT; uint8_t ifname[] = "eth0"; pthread_t threadId; + int opt; + + while ((opt = getopt(argc, argv, "o:")) != -1) + { + switch(opt) + { + case 'o': + gObserveNotifyType = atoi(optarg); + break; + default: + PrintUsage(); + return -1; + } + } + + if ((gObserveNotifyType != 0) && (gObserveNotifyType != 1)) + { + PrintUsage(); + return -1; + } + OC_LOG(DEBUG, TAG, "OCServer is starting..."); /*Get Ip address on defined interface and initialize coap on it with random port number * this port number will be used as a source port in all coap communications*/ if ( OCGetInterfaceAddress(ifname, sizeof(ifname), AF_INET, addr, @@ -154,6 +308,12 @@ int main() { */ createLEDResource(); + // Initialize observations data structure for the resource + for (uint8_t i = 0; i < SAMPLE_MAX_NUM_OBSERVATIONS; i++) + { + interestedObservers[i].valid = false; + } + /* * Create a thread for changing the representation of the LED */ @@ -168,7 +328,7 @@ int main() { return 0; } - sleep(3); + sleep(2); } OC_LOG(INFO, TAG, "Exiting ocserver main loop..."); diff --git a/csdk/stack/samples/linux/SimpleClientServer/ocservercoll.cpp b/csdk/stack/samples/linux/SimpleClientServer/ocservercoll.cpp index 06b55b6..a905f4d 100644 --- a/csdk/stack/samples/linux/SimpleClientServer/ocservercoll.cpp +++ b/csdk/stack/samples/linux/SimpleClientServer/ocservercoll.cpp @@ -171,7 +171,7 @@ void *ChangeLEDRepresentation (void *param) if (gLEDUnderObservation) { OC_LOG_V(INFO, TAG, " =====> Notifying stack of new power level %d\n", LED.power); - result = OCNotifyObservers (LED.handle); + result = OCNotifyAllObservers (LED.handle); if (OC_STACK_NO_OBSERVERS == result) { gLEDUnderObservation = 0; @@ -222,7 +222,7 @@ int main() { OC_LOG(ERROR, TAG, "OCStack process error"); return 0; } - sleep(3); + sleep(2); } OC_LOG(INFO, TAG, "Exiting ocserver main loop..."); diff --git a/csdk/stack/src/occlientcb.c b/csdk/stack/src/occlientcb.c index b737223..092d464 100644 --- a/csdk/stack/src/occlientcb.c +++ b/csdk/stack/src/occlientcb.c @@ -26,7 +26,7 @@ #include /// Module Name -#define MOD_NAME PCF("occlientcb") +#define TAG PCF("occlientcb") struct ClientCB *cbList = NULL; @@ -58,6 +58,8 @@ OCStackResult AddClientCB(ClientCB** clientCB, OCCallbackData* cbData, void DeleteClientCB(ClientCB * cbNode) { if(cbNode) { LL_DELETE(cbList, cbNode); + OC_LOG(INFO, TAG, PCF("deleting tokens")); + OC_LOG_BUFFER(INFO, TAG, cbNode->token->token, cbNode->token->tokenLength); OCFree(cbNode->token); OCFree(cbNode->handle); OCFree(cbNode->requestUri); @@ -77,10 +79,13 @@ void DeleteClientCB(ClientCB * cbNode) { } } -ClientCB* GetClientCB(OCCoAPToken * token, OCDoHandle * handle, unsigned char * requestUri) { +ClientCB* GetClientCB(OCCoAPToken * token, OCDoHandle handle, unsigned char * requestUri) { ClientCB* out = NULL; if(token) { LL_FOREACH(cbList, out) { + OC_LOG(INFO, TAG, PCF("comparing tokens")); + OC_LOG_BUFFER(INFO, TAG, token->token, token->tokenLength); + OC_LOG_BUFFER(INFO, TAG, out->token->token, out->token->tokenLength); if((out->token->tokenLength == token->tokenLength) && (memcmp(out->token->token, token->token, token->tokenLength) == 0) ) { return out; @@ -101,7 +106,7 @@ ClientCB* GetClientCB(OCCoAPToken * token, OCDoHandle * handle, unsigned char * } } } - OC_LOG(INFO, MOD_NAME, PCF("Callback Not found !!")); + OC_LOG(INFO, TAG, PCF("Callback Not found !!")); return NULL; } diff --git a/csdk/stack/src/ocobserve.c b/csdk/stack/src/ocobserve.c index 5acb553..b426e8a 100644 --- a/csdk/stack/src/ocobserve.c +++ b/csdk/stack/src/ocobserve.c @@ -18,6 +18,7 @@ // //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= +#include #include "ocstack.h" #include "ocstackconfig.h" #include "ocstackinternal.h" @@ -26,7 +27,7 @@ #include "occoap.h" #include "utlist.h" #include "debug.h" -#include +#include "ocrandom.h" // Module Name #define MOD_NAME PCF("ocobserve") @@ -42,27 +43,39 @@ OCStackResult OCObserverStatus(OCCoAPToken * token, uint8_t status) { OCStackResult result = OC_STACK_ERROR; ResourceObserver * observer = NULL; + OCEntityHandlerRequest * ehRequest; + OCObservationInfo observationInfo; + unsigned char bufRes[MAX_RESPONSE_LENGTH] = {0}; switch(status) { case OC_OBSERVER_NOT_INTERESTED: OC_LOG(DEBUG, TAG, PCF("observer is not interested in our notifications anymore")); + observer = GetObserverUsingToken (token); + if(observer) + { + FormOCEntityHandlerRequest(&ehRequest, OC_REST_CANCEL_OBSERVE, bufRes, NULL, NULL); + ehRequest->obsInfo = &observationInfo; + ehRequest->obsInfo->action = OC_OBSERVE_DEREGISTER; + ehRequest->obsInfo->obsId = observer->observeId; + observer->resource->entityHandler(OC_OBSERVE_FLAG, ehRequest); + } //observer is dead, or it is not observing anymore - result = DeleteObserver (token); + result = DeleteObserverUsingToken (token); if(result != OC_STACK_OK) { result = OC_STACK_OBSERVER_NOT_REMOVED; } else { - OC_LOG(DEBUG, TAG, PCF("removing an observer")); + OC_LOG(DEBUG, TAG, PCF("Removed observer successfully")); } break; case OC_OBSERVER_STILL_INTERESTED: //observer is still interested OC_LOG(DEBUG, TAG, PCF("observer is interested in our \ notifications, reset the failedCount")); - observer = GetObserver(token); + observer = GetObserverUsingToken(token); if(observer) { observer->forceCON = 0; @@ -77,12 +90,18 @@ OCStackResult OCObserverStatus(OCCoAPToken * token, uint8_t status) case OC_OBSERVER_FAILED_COMM: //observer is not reachable OC_LOG(DEBUG, TAG, PCF("observer is not reachable")); - observer = GetObserver(token); + observer = GetObserverUsingToken(token); if(observer) { if(observer->failedCommCount >= MAX_OBSERVER_FAILED_COMM) { - result = DeleteObserver (token); + FormOCEntityHandlerRequest(&ehRequest, OC_REST_CANCEL_OBSERVE, bufRes, NULL, NULL); + ehRequest->obsInfo = &observationInfo; + ehRequest->obsInfo->action = OC_OBSERVE_DEREGISTER; + ehRequest->obsInfo->obsId = observer->observeId; + observer->resource->entityHandler(OC_OBSERVE_FLAG, ehRequest); + + result = DeleteObserverUsingToken (token); if(result != OC_STACK_OK) { result = OC_STACK_OBSERVER_NOT_REMOVED; @@ -114,53 +133,100 @@ OCStackResult ProcessObserveRequest (OCResource *resource, OCRequest *request) OCEntityHandlerResult ehRet = OC_EH_ERROR; OCEntityHandlerRequest *ehReq = request->entityHandlerRequest; OCObserveReq *obs = request->observe; + OCObservationInfo observationInfo; + OCObservationId obsId; + ResourceObserver *resObs = NULL; OC_LOG(INFO, TAG, PCF("Entering ProcessObserveRequest")); - // Register new observation request->entityHandlerRequest->resource = (OCResourceHandle)resource; - ehRet = resource->entityHandler(OC_OBSERVE_FLAG, request->entityHandlerRequest); - if(ehRet == OC_EH_OK) + request->entityHandlerRequest->obsInfo = &observationInfo; + + if (obs->option == OC_RESOURCE_OBSERVE_REGISTER) { - if (obs->option == OC_RESOURCE_OBSERVE_REGISTER) + // Request to register new observation + observationInfo.action = OC_OBSERVE_REGISTER; + // Generate observation Id for the request + while (1) { - // Add subscriber to the server observation list - // TODO: we need to check if the obsrever is already there using its OCDevAdd.... - stackRet = AddObserver ((const char*)(request->resourceUrl), (const char *)(ehReq->query), - obs->token, obs->subAddr, resource, request->qos); - if(stackRet != OC_STACK_OK) - { - obs->result = OC_STACK_OBSERVER_NOT_ADDED; - } - else + if (OC_STACK_OK != GenerateObserverId (&obsId)) + return OC_STACK_ERROR; + + // Check if observation Id already exists + resObs = GetObserverUsingId (obsId); + if (NULL == resObs) { - OC_LOG(DEBUG, TAG, PCF("adding an observer")); + OC_LOG_V(INFO, TAG, "Observation ID is %d", obsId); + break; } } - else if (obs->option == OC_RESOURCE_OBSERVE_DEREGISTER) + + observationInfo.obsId = obsId; + // Register the observation request with entity handler + ehRet = resource->entityHandler ((OC_REQUEST_FLAG | OC_OBSERVE_FLAG), + request->entityHandlerRequest); + if (ehRet == OC_EH_OK) { - // Deregister observation - stackRet = DeleteObserver (obs->token); + // Add subscriber to the server observation list + stackRet = AddObserver ((const char*)(request->resourceUrl), + (const char *)(ehReq->query), + obsId, obs->token, obs->subAddr, + resource, request->qos); if(stackRet != OC_STACK_OK) { - obs->result = OC_STACK_OBSERVER_NOT_REMOVED; + obs->result = OC_STACK_OBSERVER_NOT_ADDED; + stackRet = OC_STACK_OBSERVER_NOT_ADDED; + // If the observation was not added in the stack notify the entity handler + observationInfo.action = OC_OBSERVE_DEREGISTER; + // If the entity handler is unable to deregister, stack cannot do anything, + // hence the return value from entity handler is not being checked + resource->entityHandler (OC_OBSERVE_FLAG, request->entityHandlerRequest); } else { - OC_LOG(DEBUG, TAG, PCF("removing an observer")); + OC_LOG(DEBUG, TAG, PCF("Added observer successfully")); } } else { - // Invalid option - OC_LOG(ERROR, TAG, PCF("Invalid CoAP observe option")); - obs->result = OC_STACK_INVALID_OBSERVE_PARAM; + stackRet = OC_STACK_OBSERVER_NOT_ADDED; + } + } + else if (obs->option == OC_RESOURCE_OBSERVE_DEREGISTER) + { + // Request to deregister observation + observationInfo.action = OC_OBSERVE_DEREGISTER; + + // Get observation Id using token + resObs = GetObserverUsingToken (obs->token); + if (NULL == resObs) + { + // Stack does not contain this observation request + // Either token is incorrect or observation list is corrupted + return OC_STACK_ERROR; + } + observationInfo.action = OC_OBSERVE_DEREGISTER; + observationInfo.obsId = resObs->observeId; + // Deregister the observation with entity handler. Ignoring return value + // from entity handler and deleting the observation from stack + resource->entityHandler ((OC_REQUEST_FLAG | OC_OBSERVE_FLAG), + request->entityHandlerRequest); + stackRet = DeleteObserverUsingToken (obs->token); + if(stackRet != OC_STACK_OK) + { + obs->result = OC_STACK_OBSERVER_NOT_REMOVED; + stackRet = OC_STACK_OBSERVER_NOT_REMOVED; + } + else + { + OC_LOG(DEBUG, TAG, PCF("Removed observer successfully")); } - stackRet = OC_STACK_OK; } else { - stackRet = OC_STACK_ERROR; + // Invalid observe option + OC_LOG(ERROR, TAG, PCF("Invalid CoAP observe option")); + obs->result = OC_STACK_INVALID_OBSERVE_PARAM; } return stackRet; } @@ -264,11 +330,24 @@ OCStackResult SendObserverNotification (OCMethod method, OCResource *resPtr, uin return stackRet; } -OCStackResult AddObserver (const char *resUri, - const char *query, - OCCoAPToken * token, - OCDevAddr *addr, - OCResource *resHandle, +OCStackResult GenerateObserverId (OCObservationId *observationId) +{ + OC_LOG(INFO, TAG, PCF("Entering GenerateObserverId")); + + VERIFY_NON_NULL (observationId); + *observationId = OCGetRandomByte(); + return OC_STACK_OK; + +exit: + return OC_STACK_ERROR; +} + +OCStackResult AddObserver (const char *resUri, + const char *query, + OCObservationId obsId, + OCCoAPToken *token, + OCDevAddr *addr, + OCResource *resHandle, OCQualityOfService qos) { ResourceObserver *obsNode = NULL; @@ -277,6 +356,7 @@ OCStackResult AddObserver (const char *resUri, obsNode = (ResourceObserver *) OCMalloc(sizeof(ResourceObserver)); if (obsNode) { + obsNode->observeId = obsId; obsNode->resUri = (unsigned char *)OCMalloc(strlen(resUri)+1); VERIFY_NON_NULL (obsNode->resUri); memcpy (obsNode->resUri, resUri, strlen(resUri)+1); @@ -313,7 +393,25 @@ exit: return OC_STACK_NO_MEMORY; } -ResourceObserver* GetObserver (const OCCoAPToken * token) +ResourceObserver* GetObserverUsingId (const OCObservationId observeId) +{ + ResourceObserver *out = NULL; + + if (observeId) + { + LL_FOREACH (serverObsList, out) + { + if (out->observeId == observeId) + { + return out; + } + } + } + OC_LOG(INFO, TAG, PCF("Observer node not found!!")); + return NULL; +} + +ResourceObserver* GetObserverUsingToken (const OCCoAPToken * token) { ResourceObserver *out = NULL; @@ -321,6 +419,9 @@ ResourceObserver* GetObserver (const OCCoAPToken * token) { LL_FOREACH (serverObsList, out) { + OC_LOG(INFO, TAG,PCF("comparing tokens")); + OC_LOG_BUFFER(INFO, TAG, token->token, token->tokenLength); + OC_LOG_BUFFER(INFO, TAG, out->token->token, out->token->tokenLength); if((out->token->tokenLength == token->tokenLength) && (memcmp(out->token->token, token->token, token->tokenLength) == 0)) { @@ -332,13 +433,15 @@ ResourceObserver* GetObserver (const OCCoAPToken * token) return NULL; } -OCStackResult DeleteObserver (OCCoAPToken * token) +OCStackResult DeleteObserverUsingToken (OCCoAPToken * token) { ResourceObserver *obsNode = NULL; - obsNode = GetObserver (token); + obsNode = GetObserverUsingToken (token); if (obsNode) { + OC_LOG_V(INFO, TAG, PCF("deleting tokens")); + OC_LOG_BUFFER(INFO, TAG, obsNode->token->token, obsNode->token->tokenLength); LL_DELETE (serverObsList, obsNode); OCFree(obsNode->resUri); OCFree(obsNode->query); @@ -356,7 +459,7 @@ void DeleteObserverList() ResourceObserver *tmp = NULL; LL_FOREACH_SAFE (serverObsList, out, tmp) { - DeleteObserver (out->token); + DeleteObserverUsingToken (out->token); } serverObsList = NULL; } diff --git a/csdk/stack/src/ocresource.c b/csdk/stack/src/ocresource.c index 34c2dd4..f9dbcdc 100644 --- a/csdk/stack/src/ocresource.c +++ b/csdk/stack/src/ocresource.c @@ -370,7 +370,7 @@ HandleVirtualResource (OCRequest *request, OCResource* resource) else { if(resource->resourceProperties & OC_ACTIVE){ - OCNotifyObservers((OCResourceHandle) resource); + OCNotifyAllObservers((OCResourceHandle) resource); } result = OC_STACK_PRESENCE_DO_NOT_HANDLE; } @@ -398,16 +398,24 @@ HandleResourceWithEntityHandler (OCRequest *request, OC_LOG(INFO, TAG, PCF("Entering HandleResourceWithEntityHandler")); ehRequest->resource = (OCResourceHandle)resource; - // status code from entity handler is ignored unless observe call - resource->entityHandler(OC_REQUEST_FLAG, ehRequest); - if (request->observe != NULL) + // status code from entity handler is ignored unless observe call + if (request->observe == NULL) + { + resource->entityHandler(OC_REQUEST_FLAG, ehRequest); + } + else { + // If an observation register/deregister is included handle separately if (!collectionResource) + { result = ProcessObserveRequest (resource, request); + } else - // Observation on collection resources not supported in M1 + { + // Observation on collection resources not currently supported result = OC_STACK_ERROR; + } } if (result == OC_STACK_OK) diff --git a/csdk/stack/src/ocstack.c b/csdk/stack/src/ocstack.c index a1d430c..c8cb91e 100644 --- a/csdk/stack/src/ocstack.c +++ b/csdk/stack/src/ocstack.c @@ -351,16 +351,10 @@ OCStackResult OCDoResource(OCDoHandle *handle, OCMethod method, const char *requ break; case OC_REST_OBSERVE: case OC_REST_OBSERVE_ALL: + case OC_REST_CANCEL_OBSERVE: break; #ifdef WITH_PRESENCE case OC_REST_PRESENCE: - requestUri = (unsigned char *) OCMalloc(strlen(requiredUri) + 1); - if(requestUri){ - memcpy(requestUri, requiredUri, strlen(requiredUri) + 1); - }else{ - result = OC_STACK_NO_MEMORY; - goto exit; - } break; #endif default: @@ -368,11 +362,27 @@ OCStackResult OCDoResource(OCDoHandle *handle, OCMethod method, const char *requ goto exit; } + if(strlen(requiredUri) > MAX_URI_LENGTH) + { + result = OC_STACK_INVALID_PARAM; + goto exit; + } + + requestUri = (unsigned char *) OCMalloc(strlen(requiredUri) + 1); + if(requestUri) + { + memcpy(requestUri, requiredUri, strlen(requiredUri) + 1); + } + else + { + result = OC_STACK_NO_MEMORY; + goto exit; + } + *handle = GenerateInvocationHandle(); if(!*handle) { result = OC_STACK_NO_MEMORY; - OCFree(*handle); goto exit; } @@ -384,6 +394,7 @@ OCStackResult OCDoResource(OCDoHandle *handle, OCMethod method, const char *requ OCFree(token); goto exit; } + if((result = AddClientCB(&clientCB, cbData, token, *handle, method, requestUri)) != OC_STACK_OK) { result = OC_STACK_NO_MEMORY; @@ -411,43 +422,59 @@ exit: * OC_STACK_OK - No errors; Success * OC_STACK_INVALID_PARAM - The handle provided is invalid. */ -OCStackResult OCCancel(OCDoHandle handle) { +OCStackResult OCCancel(OCDoHandle handle, OCQualityOfService qos) { /* - * This ftn can be implemented one of two ways: + * This ftn is implemented one of two ways in the case of observation: * - * 1. When observe is unobserved..Remove the callback associated on client side. - * When the next notification comes in from server, reply with RESET message to server. + * 1. qos == OC_NON_CONFIRMABLE. When observe is unobserved.. + * Remove the callback associated on client side. + * When the next notification comes in from server, + * reply with RESET message to server. + * Keep in mind that the server will react to RESET only + * if the last notification was sent ans CON * - * 2. When OCCancel is called, and it is associated with an observe request + * 2. qos == OC_CONFIRMABLE. When OCCancel is called, + * and it is associated with an observe request * (i.e. ClientCB->method == OC_REST_OBSERVE || OC_REST_OBSERVE_ALL), - * Send Observe request to server with observe flag = OC_RESOURCE_OBSERVE_DEREGISTER. + * Send CON Observe request to server with + * observe flag = OC_RESOURCE_OBSERVE_DEREGISTER. * Remove the callback associated on client side. - * - * Number 1 is implemented here. */ + OCStackResult ret = OC_STACK_OK; + if(!handle) { return OC_STACK_INVALID_PARAM; } OC_LOG(INFO, TAG, PCF("Entering OCCancel")); - ClientCB *clientCB = GetClientCB(NULL, &handle, NULL); + ClientCB *clientCB = GetClientCB(NULL, handle, NULL); if(clientCB) { switch (clientCB->method) { case OC_REST_OBSERVE: case OC_REST_OBSERVE_ALL: + if(qos == OC_CONFIRMABLE) + { + ret = OCDoCoAPResource(OC_REST_CANCEL_OBSERVE, qos, + clientCB->token, (const char *) clientCB->requestUri, NULL); + } + else + { + FindAndDeleteClientCB(clientCB); + } + break; #ifdef WITH_PRESENCE case OC_REST_PRESENCE: - #endif FindAndDeleteClientCB(clientCB); break; + #endif default: return OC_STACK_INVALID_METHOD; } } - return OC_STACK_OK; + return ret; } #ifdef WITH_PRESENCE OCStackResult OCProcessPresence() @@ -580,7 +607,7 @@ OCStackResult OCStartPresence(const uint32_t ttl) OCCoAPToken * token = OCGenerateCoAPToken(); OCBuildIPv4Address(224, 0, 1, 187, 5683, &multiCastAddr); //add the presence observer - AddObserver(OC_PRESENCE_URI, NULL, token, &multiCastAddr, + AddObserver(OC_PRESENCE_URI, NULL, 0, token, &multiCastAddr, (OCResource *)presenceResource.handle, OC_NON_CONFIRMABLE); } @@ -588,7 +615,7 @@ OCStackResult OCStartPresence(const uint32_t ttl) // a different random 32-bit integer number is used ((OCResource *)presenceResource.handle)->sequenceNum = OCGetRandom(); - return OCNotifyObservers(presenceResource.handle); + return OCNotifyAllObservers(presenceResource.handle); } /** @@ -609,7 +636,7 @@ OCStackResult OCStopPresence() result = OCChangeResourceProperty( &(((OCResource *) presenceResource.handle)->resourceProperties), OC_ACTIVE, 0); - result = OCNotifyObservers(presenceResource.handle); + result = OCNotifyAllObservers(presenceResource.handle); return result; } #endif @@ -679,6 +706,7 @@ OCStackResult OCCreateResource(OCResourceHandle *handle, goto exit; } memset(pointer, 0, sizeof(OCResource)); + pointer->sequenceNum = OC_OFFSET_SEQUENCE_NUMBER; insertResource(pointer); @@ -729,7 +757,7 @@ OCStackResult OCCreateResource(OCResourceHandle *handle, if(presenceResource.handle) { ((OCResource *)presenceResource.handle)->sequenceNum = OCGetRandom(); - OCNotifyObservers(presenceResource.handle); + OCNotifyAllObservers(presenceResource.handle); } #endif exit: @@ -790,7 +818,7 @@ OCStackResult OCBindResource( if(presenceResource.handle) { ((OCResource *)presenceResource.handle)->sequenceNum = OCGetRandom(); - OCNotifyObservers(presenceResource.handle); + OCNotifyAllObservers(presenceResource.handle); } #endif @@ -848,7 +876,7 @@ OCStackResult OCUnBindResource( if(presenceResource.handle) { ((OCResource *)presenceResource.handle)->sequenceNum = OCGetRandom(); - OCNotifyObservers(presenceResource.handle); + OCNotifyAllObservers(presenceResource.handle); } #endif @@ -915,7 +943,7 @@ OCStackResult OCBindResourceTypeToResource(OCResourceHandle handle, if(presenceResource.handle) { ((OCResource *)presenceResource.handle)->sequenceNum = OCGetRandom(); - OCNotifyObservers(presenceResource.handle); + OCNotifyAllObservers(presenceResource.handle); } #endif @@ -984,7 +1012,7 @@ OCStackResult OCBindResourceInterfaceToResource(OCResourceHandle handle, if(presenceResource.handle) { ((OCResource *)presenceResource.handle)->sequenceNum = OCGetRandom(); - OCNotifyObservers(presenceResource.handle); + OCNotifyAllObservers(presenceResource.handle); } #endif @@ -1068,7 +1096,7 @@ OCStackResult OCDeleteResource(OCResourceHandle handle) { if(presenceResource.handle) { ((OCResource *)presenceResource.handle)->sequenceNum = OCGetRandom(); - OCNotifyObservers(presenceResource.handle); + OCNotifyAllObservers(presenceResource.handle); } #endif @@ -1286,7 +1314,7 @@ OCStackResult OCBindResourceHandler(OCResourceHandle handle, if(presenceResource.handle) { ((OCResource *)presenceResource.handle)->sequenceNum = OCGetRandom(); - OCNotifyObservers(presenceResource.handle); + OCNotifyAllObservers(presenceResource.handle); } #endif @@ -1324,7 +1352,7 @@ void incrementSequenceNumber(OCResource * resPtr) resPtr->sequenceNum += 1; if (resPtr->sequenceNum == MAX_SEQUENCE_NUMBER) { - resPtr->sequenceNum = 1; + resPtr->sequenceNum = OC_OFFSET_SEQUENCE_NUMBER+1; } return; } @@ -1332,20 +1360,20 @@ void incrementSequenceNumber(OCResource * resPtr) /** * Notify observers that an observed value has changed. * - * * @param handle - handle of resource * * @return * OC_STACK_OK - no errors - * OC_STACK_ERROR - stack not initialized + * OC_STACK_NO_RESOURCE - invalid resource handle + * OC_STACK_NO_OBSERVERS - no more observers intrested in resource */ -OCStackResult OCNotifyObservers(OCResourceHandle handle) { +OCStackResult OCNotifyAllObservers(OCResourceHandle handle) { OCResource *resPtr = NULL; OCStackResult result; OCMethod method = OC_REST_NOMETHOD; uint32_t maxAge = 0; - OC_LOG(INFO, TAG, PCF("Entering OCNotifyObservers")); + OC_LOG(INFO, TAG, PCF("Entering OCNotifyAllObservers")); VERIFY_NON_NULL(handle, ERROR, OC_STACK_ERROR); @@ -1384,6 +1412,112 @@ OCStackResult OCNotifyObservers(OCResourceHandle handle) { } } +OCStackResult +OCNotifyListOfObservers (OCResourceHandle handle, + OCObservationId *obsIdList, + uint8_t numberOfIds, + unsigned char *notificationJSONPayload) +{ + OC_LOG(INFO, TAG, PCF("Entering OCNotifyListOfObservers")); + + VERIFY_NON_NULL(handle, ERROR, OC_STACK_ERROR); + VERIFY_NON_NULL(obsIdList, ERROR, OC_STACK_ERROR); + VERIFY_NON_NULL(notificationJSONPayload, ERROR, OC_STACK_ERROR); + + uint8_t numIds = numberOfIds; + ResourceObserver *observation; + OCResource *resPtr = NULL; + uint32_t maxAge = 0; + unsigned char bufNotify[MAX_RESPONSE_LENGTH] = {0}; + unsigned char *currPtr; + OCQualityOfService qos = OC_NON_CONFIRMABLE; + uint8_t numSentNotification = 0; + + // Verify the notification payload length does not exceed the maximim + // the stack can handle + if ((strlen((char *)notificationJSONPayload) + + OC_JSON_PREFIX_LEN + OC_JSON_SUFFIX_LEN) > MAX_RESPONSE_LENGTH) + { + OC_LOG(INFO, TAG, PCF("Observe notification message length too long")); + return OC_STACK_ERROR; + } + + // Verify that the resource exists + resPtr = findResource ((OCResource *) handle); + if (NULL == resPtr || myStackMode == OC_CLIENT) + { + return OC_STACK_NO_RESOURCE; + } + else + { + incrementSequenceNumber(resPtr); + //TODO: we should allow the serve to define thisl + maxAge = 0x2FFFF; + } + + while (numIds) + { + OC_LOG_V(INFO, TAG, "Need to notify observation id %d", *obsIdList); + observation = NULL; + observation = GetObserverUsingId (*obsIdList); + if (observation) + { + // Found observation - verify if it matches the resource handle + if (observation->resource == resPtr) + { + strcpy((char*)bufNotify, OC_JSON_PREFIX); + currPtr = bufNotify + OC_JSON_PREFIX_LEN; + memcpy (currPtr, notificationJSONPayload, strlen((char *)notificationJSONPayload)); + currPtr += strlen((char *)notificationJSONPayload); + strcpy((char*)currPtr, OC_JSON_SUFFIX); + + // send notifications based on the qos of the request + qos = observation->qos; + if(qos == OC_NON_CONFIRMABLE) + { + OC_LOG_V(INFO, TAG, "Current NON count for this observer is %d", + observation->NONCount); + if(observation->forceCON \ + || observation->NONCount >= MAX_OBSERVER_NON_COUNT) + { + observation->NONCount = 0; + // at some point we have to to send CON to check on the + // availability of observer + OC_LOG(INFO, TAG, PCF("This time we are sending the \ + notification as CON")); + qos = OC_CONFIRMABLE; + } + else + { + observation->NONCount++; + } + } + OCSendCoAPNotification (observation->resUri, observation->addr, + OC_STACK_OK, qos, + observation->token, + bufNotify, resPtr->sequenceNum, maxAge); + numSentNotification++; + } + } + obsIdList++; + numIds--; + } + if(numSentNotification == numberOfIds) + { + return OC_STACK_OK; + } + else if(numSentNotification == 0) + { + return OC_STACK_NO_OBSERVERS; + } + else + { + //TODO: we need to signal that not every one in the + // list got an update, should we also indicate who did not receive on? + return OC_STACK_OK; + } +} + //----------------------------------------------------------------------------- // Private internal function definitions //----------------------------------------------------------------------------- diff --git a/examples/ocicuc/light_resource.cpp b/examples/ocicuc/light_resource.cpp index 3402d5c..d58ff09 100644 --- a/examples/ocicuc/light_resource.cpp +++ b/examples/ocicuc/light_resource.cpp @@ -69,7 +69,7 @@ void LightResource::observe_function() m_power += 10; - const auto result = OCPlatform::notifyObservers(getHandle()); + const auto result = OCPlatform::notifyAllObservers(getHandle()); // Stop notifications when there are no more observers: if(OC_STACK_NO_OBSERVERS == result) diff --git a/examples/roomserver.cpp b/examples/roomserver.cpp index 89da8de..035e6cc 100644 --- a/examples/roomserver.cpp +++ b/examples/roomserver.cpp @@ -240,7 +240,7 @@ void entityHandlerRoom(std::shared_ptr request, std::shared_p { // Get the request type and request flag std::string requestType = request->getRequestType(); - RequestHandlerFlag requestFlag = request->getRequestHandlerFlag(); + int requestFlag = request->getRequestHandlerFlag(); if(requestFlag == RequestHandlerFlag::InitFlag) { @@ -327,7 +327,7 @@ void entityHandlerLight(std::shared_ptr request, std::shared_ { // Get the request type and request flag std::string requestType = request->getRequestType(); - RequestHandlerFlag requestFlag = request->getRequestHandlerFlag(); + int requestFlag = request->getRequestHandlerFlag(); if(requestFlag == RequestHandlerFlag::InitFlag) { @@ -397,7 +397,7 @@ void entityHandlerFan(std::shared_ptr request, std::shared_pt { // Get the request type and request flag std::string requestType = request->getRequestType(); - RequestHandlerFlag requestFlag = request->getRequestHandlerFlag(); + int requestFlag = request->getRequestHandlerFlag(); if(requestFlag == RequestHandlerFlag::InitFlag) { diff --git a/examples/simpleclient.cpp b/examples/simpleclient.cpp index 830de5a..886ea5c 100644 --- a/examples/simpleclient.cpp +++ b/examples/simpleclient.cpp @@ -139,7 +139,7 @@ void putLightRepresentation(std::shared_ptr resource) } } -// callback handler on GET request +// Callback handler on GET request void onGet(const OCRepresentation& rep, const int eCode) { if(eCode == SUCCESS_RESPONSE) diff --git a/examples/simpleserver.cpp b/examples/simpleserver.cpp index d95ce63..297c7e6 100644 --- a/examples/simpleserver.cpp +++ b/examples/simpleserver.cpp @@ -35,6 +35,11 @@ using namespace std; int gObservation = 0; +// Specifies where to notify all observers or list of observers +// 0 - notifies all observers +// 1 - notifies list of observers +int isListOfObservers = 0; + // Forward declaring the entityHandler void entityHandler(std::shared_ptr request, std::shared_ptr response); @@ -51,6 +56,7 @@ public: std::string m_lightUri; OCResourceHandle m_resourceHandle; OCRepresentation m_lightRep; + ObservationIds m_interestedObservers; public: /// Constructor @@ -177,7 +183,23 @@ void * ChangeLightRepresentation (void *param) cout << "\nPower updated to : " << myLight.m_power << endl; cout << "Notifying observers with resource handle: " << myLight.getHandle() << endl; - OCStackResult result = OCPlatform::notifyObservers(myLight.getHandle()); + OCStackResult result = OC_STACK_OK; + + if(isListOfObservers) + { + std::shared_ptr resourceResponse(new OCResourceResponse()); + + resourceResponse->setErrorCode(200); + resourceResponse->setResourceRepresentation(myLight.get(), DEFAULT_INTERFACE); + + result = OCPlatform::notifyListOfObservers( myLight.getHandle(), + myLight.m_interestedObservers, + resourceResponse); + } + else + { + result = OCPlatform::notifyAllObservers(myLight.getHandle()); + } if(OC_STACK_NO_OBSERVERS == result) { @@ -190,7 +212,6 @@ void * ChangeLightRepresentation (void *param) return NULL; } - // This is just a sample implementation of entity handler. // Entity handler can be implemented in several ways by the manufacturer void entityHandler(std::shared_ptr request, std::shared_ptr response) @@ -201,15 +222,15 @@ void entityHandler(std::shared_ptr request, std::shared_ptrgetRequestType(); - RequestHandlerFlag requestFlag = request->getRequestHandlerFlag(); + int requestFlag = request->getRequestHandlerFlag(); - if(requestFlag == RequestHandlerFlag::InitFlag) + if(requestFlag & RequestHandlerFlag::InitFlag) { cout << "\t\trequestFlag : Init\n"; // entity handler to perform resource initialization operations } - else if(requestFlag == RequestHandlerFlag::RequestFlag) + if(requestFlag & RequestHandlerFlag::RequestFlag) { cout << "\t\trequestFlag : Request\n"; @@ -255,8 +276,22 @@ void entityHandler(std::shared_ptr request, std::shared_ptrgetObservationInfo(); + if(ObserveAction::ObserveRegister == observationInfo.action) + { + myLight.m_interestedObservers.push_back(observationInfo.obsId); + } + else if(ObserveAction::ObserveUnregister == observationInfo.action) + { + myLight.m_interestedObservers.erase(std::remove( + myLight.m_interestedObservers.begin(), + myLight.m_interestedObservers.end(), + observationInfo.obsId), + myLight.m_interestedObservers.end()); + } + pthread_t threadId; cout << "\t\trequestFlag : Observer\n"; @@ -271,7 +306,6 @@ void entityHandler(std::shared_ptr request, std::shared_ptr request, std::shared_ptr\n"; + std::cout << " ObserveType : 0 - Observe All\n"; + std::cout << " ObserveType : 1 - Observe List of observers\n\n"; +} + + +int main(int argc, char* argv[1]) { + PrintUsage(); + + if (argc == 1) + { + isListOfObservers = 0; + } + else if (argc == 2) + { + int value = atoi(argv[1]); + if (value == 1) + isListOfObservers = 1; + else + isListOfObservers = 0; + } + else + { + return -1; + } + + // Create PlatformConfig object PlatformConfig cfg { OC::ServiceType::InProc, diff --git a/include/OCApi.h b/include/OCApi.h index b78430d..5b15f49 100644 --- a/include/OCApi.h +++ b/include/OCApi.h @@ -95,11 +95,11 @@ namespace OC {} }; - enum class RequestHandlerFlag + enum RequestHandlerFlag { - InitFlag, - RequestFlag, - ObserverFlag + InitFlag = 1 << 0, + RequestFlag = 1 << 1, + ObserverFlag = 1 << 2 }; enum class ObserveType @@ -228,6 +228,23 @@ namespace OC // Typedef for query parameter map typedef std::map QueryParamsMap; + // Typedef for list of observation IDs + typedef std::vector ObservationIds; + + enum class ObserveAction + { + ObserveRegister, + ObserveUnregister + }; + + typedef struct + { + // Action associated with observation request + ObserveAction action; + // Identifier for observation being registered/unregistered + OCObservationId obsId; + } ObservationInfo; + // const strings for different interfaces // Default interface diff --git a/include/OCPlatform.h b/include/OCPlatform.h index aa46b5b..4c2c97a 100644 --- a/include/OCPlatform.h +++ b/include/OCPlatform.h @@ -64,7 +64,7 @@ namespace OC virtual ~OCPlatform(void); /** - * API for notifying core that resource's attributes have changed. + * API for notifying base that resource's attributes have changed. * * @param OCResourceHandle resource handle of the resource * @@ -73,7 +73,27 @@ namespace OC * NOTE: OCResourceHandle is defined in ocstack.h. * NOTE: OCStackResult is defined in ocstack.h. */ - static OCStackResult notifyObservers(OCResourceHandle resourceHandle); + static OCStackResult notifyAllObservers(OCResourceHandle resourceHandle); + + /** + * API for notifying only specific clients that resource's attributes have changed. + * + * @param OCResourceHandle resource handle of the resource + * @param observationIds std vector of observationIds. These set of ids are ones which + * which will be notified upon resource change. + * @param responsePtr OCResourceResponse pointer used by app to fill the response for this + * resource change. + * + * @return OCStackResult return value of this API. Returns OC_STACK_OK if success. + * + * NOTE: This API is for server side only. + * NOTE: OCResourceHandle is defined in ocstack.h. + * NOTE: OCStackResult is defined in ocstack.h. + */ + static OCStackResult notifyListOfObservers( + OCResourceHandle resourceHandle, + ObservationIds& observationIds, + const std::shared_ptr responsePtr); /** * API for Service and Resource Discovery. diff --git a/include/OCResourceRequest.h b/include/OCResourceRequest.h index c927522..d922e24 100644 --- a/include/OCResourceRequest.h +++ b/include/OCResourceRequest.h @@ -62,15 +62,18 @@ namespace OC const QueryParamsMap& getQueryParameters() const {return m_queryParameters;} /** - * Retrieves the request handler flag type. This can be either INIT flag or REQUEST flag or OBSERVE flag. + * Retrieves the request handler flag type. This can be either INIT flag or + * REQUEST flag or OBSERVE flag. * NOTE: - * INIT indicates that the vendor's entity handler should go and perform initialization operations - * REQUEST indicates that it is a request of certain type (GET/PUT/POST/DELETE) and entity handler needs to perform - * corresponding operations - * OBSERVE indicates that the request is of type Observe and entity handler needs to perform corresponding operations - * @return std::string type of request flag + * INIT indicates that the vendor's entity handler should go and perform + * initialization operations + * REQUEST indicates that it is a request of certain type (GET/PUT/POST/DELETE) + * and entity handler needs to perform corresponding operations + * OBSERVE indicates that the request is of type Observe and entity handler + * needs to perform corresponding operations + * @return int type of request flag */ - RequestHandlerFlag getRequestHandlerFlag() const {return m_requestHandlerFlag;} + int getRequestHandlerFlag() const {return m_requestHandlerFlag;} /** * Provides the entire resource attribute representation @@ -78,11 +81,18 @@ namespace OC */ const OCRepresentation& getResourceRepresentation() const {return m_representation;} + /** + * Provides the entire resource representation + * @return OCRepresentation reference which provides entire resource information + */ + const ObservationInfo& getObservationInfo() const {return m_observationInfo;} + private: std::string m_requestType; QueryParamsMap m_queryParameters; - RequestHandlerFlag m_requestHandlerFlag; + int m_requestHandlerFlag; OCRepresentation m_representation; + ObservationInfo m_observationInfo; public: // TODO: This is not a public API for app developers. @@ -138,10 +148,18 @@ namespace OC // TODO: This is not a public API for app developers. // This function will not be exposed in future - void setRequestHandlerFlag(RequestHandlerFlag requestHandlerFlag) + void setRequestHandlerFlag(int requestHandlerFlag) { m_requestHandlerFlag = requestHandlerFlag; } + + // TODO: This is not a public API for app developers. + // This function will not be exposed in future + void setObservationInfo(const ObservationInfo& observationInfo) + { + m_observationInfo = observationInfo; + } + }; } // namespace OC diff --git a/src/InProcClientWrapper.cpp b/src/InProcClientWrapper.cpp index c405fd0..1839962 100644 --- a/src/InProcClientWrapper.cpp +++ b/src/InProcClientWrapper.cpp @@ -602,7 +602,7 @@ namespace OC if(cLock) { std::lock_guard lock(*cLock); - result = OCCancel(handle); + result = OCCancel(handle, OC_NON_CONFIRMABLE); } else { @@ -666,7 +666,7 @@ namespace OC if(cLock) { std::lock_guard lock(*cLock); - result = OCCancel(handle); + result = OCCancel(handle, OC_NON_CONFIRMABLE); } else { diff --git a/src/InProcServerWrapper.cpp b/src/InProcServerWrapper.cpp index 58f633b..ec66e55 100644 --- a/src/InProcServerWrapper.cpp +++ b/src/InProcServerWrapper.cpp @@ -46,52 +46,64 @@ void defaultEntityHandler(const OC::OCResourceRequest::Ptr request, const OC::OC OCEntityHandlerResult EntityHandler(OCEntityHandlerFlag flag, OCEntityHandlerRequest * entityHandlerRequest ) { - // TODO @SASHI we need to have a better way of logging (with various levels of logging) + // TODO we need to have a better way of logging (with various levels of logging) cout << "\nIn C entity handler: " << endl; - // TODO @SASHI dow we need shared pointer? + // TODO do we need shared pointer? auto pRequest = std::make_shared(); auto pResponse = std::make_shared(); - // TODO @ SASHI Utility to convert from C to C++ (every). - switch (flag) { - case OC_INIT_FLAG: - // TODO @SASHI We can fill the common data (resource Handle, etc.. ) - // init time. - pRequest->setRequestHandlerFlag(OC::RequestHandlerFlag::InitFlag); - break; - case OC_REQUEST_FLAG: - pRequest->setRequestHandlerFlag(OC::RequestHandlerFlag::RequestFlag); - - if(entityHandlerRequest) + // TODO Utility to convert from C to C++ (every). + + if(flag & OC_INIT_FLAG) + { + // TODO We can fill the common data (resource Handle, etc.. ) + // init time. + pRequest->setRequestHandlerFlag(OC::RequestHandlerFlag::InitFlag); + } + if(flag & OC_REQUEST_FLAG) + { + pRequest->setRequestHandlerFlag(OC::RequestHandlerFlag::RequestFlag); + + if(entityHandlerRequest) + { + if(entityHandlerRequest->query) { - if(entityHandlerRequest->query) - { - std::string querystr(reinterpret_cast(entityHandlerRequest->query)); + std::string querystr(reinterpret_cast(entityHandlerRequest->query)); - OC::Utilities::QueryParamsKeyVal qp = OC::Utilities::getQueryParams(querystr); + OC::Utilities::QueryParamsKeyVal qp = OC::Utilities::getQueryParams(querystr); - if(qp.size() > 0) - pRequest->setQueryParams(qp); - } - if(OC_REST_GET == entityHandlerRequest->method) + if(qp.size() > 0) { - // TODO @SASHI Why strings : "GET"?? - pRequest->setRequestType("GET"); + pRequest->setQueryParams(qp); } + } + if(OC_REST_GET == entityHandlerRequest->method) + { + // TODO Why strings : "GET"?? + pRequest->setRequestType("GET"); + } - if(OC_REST_PUT == entityHandlerRequest->method) - { - pRequest->setRequestType("PUT"); - pRequest->setPayload(std::string(reinterpret_cast(entityHandlerRequest->reqJSONPayload))); - } + if(OC_REST_PUT == entityHandlerRequest->method) + { + pRequest->setRequestType("PUT"); + pRequest->setPayload(std::string(reinterpret_cast + (entityHandlerRequest->reqJSONPayload))); } - break; - case OC_OBSERVE_FLAG: - pRequest->setRequestHandlerFlag(OC::RequestHandlerFlag::ObserverFlag); - break; + } + } + if(flag & OC_OBSERVE_FLAG) + { + pRequest->setRequestHandlerFlag( + OC::RequestHandlerFlag::RequestFlag | OC::RequestHandlerFlag::ObserverFlag); + if(entityHandlerRequest->obsInfo) + { + OC::ObservationInfo observationInfo; + observationInfo.action = (OC::ObserveAction) entityHandlerRequest->obsInfo->action; + observationInfo.obsId = entityHandlerRequest->obsInfo->obsId; + pRequest->setObservationInfo(observationInfo); + } } - // Finding the corresponding CPP Application entityHandler for a given resource auto entityHandlerEntry = entityHandlerMap.find(entityHandlerRequest->resource); @@ -114,10 +126,9 @@ OCEntityHandlerResult EntityHandler(OCEntityHandlerFlag flag, OCEntityHandlerReq return OC_EH_ERROR; } - - if(flag == OC_REQUEST_FLAG) + if(flag & OC_REQUEST_FLAG) { - // TODO @SASHI we could use const reference + // TODO we could use const reference std::string payLoad = pResponse->getPayload(); if(OC_REST_GET == entityHandlerRequest->method) @@ -196,7 +207,7 @@ namespace OC if(result != OC_STACK_OK) { cout << "Something wrong in OCProcess" << endl; - // TODO: SASHI + // TODO } std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -252,11 +263,12 @@ namespace OC { cout << "\tSomething wrong in creating the resource" << endl; resourceHandle = (OCResourceHandle) 0; - // TODO: SASHI + // TODO } else { - cout << "\tResource creation is successful with resource handle: " << resourceHandle << endl; + cout << "\tResource creation is successful with resource handle: " + << resourceHandle << endl; entityHandlerMap[resourceHandle] = eHandler; } } diff --git a/src/OCPlatform.cpp b/src/OCPlatform.cpp index c312a01..5af916a 100644 --- a/src/OCPlatform.cpp +++ b/src/OCPlatform.cpp @@ -48,9 +48,41 @@ namespace OC std::cout << "platform destructor called" << std::endl; } - OCStackResult OCPlatform::notifyObservers(OCResourceHandle resourceHandle) + OCStackResult OCPlatform::notifyAllObservers(OCResourceHandle resourceHandle) { - return OCNotifyObservers(resourceHandle); + return OCNotifyAllObservers(resourceHandle); + } + + OCStackResult OCPlatform::notifyListOfObservers( + OCResourceHandle resourceHandle, + ObservationIds& observationIds, + const std::shared_ptr pResponse) + { + OCStackResult result = OC_STACK_ERROR; + + if(pResponse) + { + try + { + std::string payload = pResponse->getPayload(); + unsigned char *pBuffer = new unsigned char[payload.length()+1]; + strncpy((char*)pBuffer, payload.c_str(), payload.length() + 1); + + // TODO Logging + printf("\tGoing from stack for List of Observers: Payload: %s\n", (char*)pBuffer); + + result = OCNotifyListOfObservers(resourceHandle, &observationIds[0], + observationIds.size(), pBuffer); + + delete(pBuffer); + } + catch(std::exception e) // TODO : define our own exception + { + throw e; + } + + return result; + } } void OCPlatform::init(const PlatformConfig& config) -- 2.7.4