IoTDevicesMap owned;
PresenceHook presence_hook;
std::mutex owned_mutex;
+ std::shared_ptr<OCResource> notificationResource;
};
IoTivity* IoTivity::instance = nullptr;
inline void guardErrorCode(OCStackResult resultCode, std::string message)
{
+ if (resultCode == OC_STACK_NO_RESOURCE)
+ throw IoTInternalError(message + " no resource error. OCStackResult=" + std::to_string(resultCode), EC_IOTIVITY_ERROR);
if (resultCode != OC_STACK_OK)
- throw IoTInternalError(message + " error:" + std::to_string(resultCode), EC_IOTIVITY_ERROR);
+ throw IoTInternalError(message + " error. OCStackResult=" + std::to_string(resultCode), EC_IOTIVITY_ERROR);
}
void IoTivity::guardUnauthorized()
}
-void IoTivity::subscribeNotifications(NM_NotificationCb callback, void* user_data)
+void IoTivity::subscribeNotifications(NM_NotificationCb callback, void* userData)
{
guardUnauthorized();
bool findResourceTimeout = true;
- std::shared_ptr<OCResource> curResource;
-
- auto observCb = [callback, user_data](const HeaderOptions,
- const OCRepresentation & rep,
- const int& eCode,
- const int& sequenceNumber)
+ auto observCb = [callback, userData](const HeaderOptions,
+ const OCRepresentation & rep,
+ const int& eCode,
+ const int& sequenceNumber)
{
+ if (eCode != OC_STACK_OK)
+ {
+ LOG_E(TAG, "Notification observ callback failed");
+ exit(-1);
+ }
+
+
+ CallbackState callbackState = NORMAL_CALLBACK;
+ if (sequenceNumber == 0)
+ callbackState = FIRST_CALLBACK;
+ if (sequenceNumber == MAX_SEQUENCE_NUMBER + 1)
+ callbackState = LAST_CALLBACK;
+
string messageString = rep.getValueToString("message");
string messageTitle = rep.getValueToString("title");
NM_NotificationData data
rep.getValue<int>("code"),
messageTitle.c_str(),
messageString.c_str(),
- rep.getValue<int>("time")
+ rep.getValue<int>("time"),
+ callbackState
};
- callback(data, user_data);
+ callback(data, userData);
};
+ OCStackResult errorCbCode = OC_STACK_ERROR;
+
auto foundCb = [&](std::shared_ptr<OCResource> resource)
{
lock_guard<std::mutex> lock(curResourceLock);
- findResourceTimeout = false;
- std::string resourceURI = resource->uri();
- std::string resourceDuid = resource->sid();
- std::stringstream searchedUri;
- searchedUri << "/oic/route/" << resourceDuid << NOTIFICATION_URI;
-
- //cout << searchedUri.str() << endl << flush;
- if(searchedUri.str() == resourceURI && !curResource)
+ if(!params->notificationResource)
{
- std::string resourceHost = resource->host();
-
- curResource = resource;
- cout << "\tFound: " << resourceURI << " "
- << resourceHost << " " << resourceDuid << endl << flush;
+ findResourceTimeout = false;
+ errorCbCode = OC_STACK_OK;
+ params->notificationResource = resource;
resource->observe(ObserveType::ObserveAll, QueryParamsMap(), observCb);
condVar.notify_all();
}
};
- auto errorCb = [&](const std::string & resourceUri, const int ecode)
+ auto errorCb = [&](const std::string&, const int eCode)
{
findResourceTimeout = false;
- cout << "Error" << resourceUri << " " << ecode << endl << flush;
+ errorCbCode = (OCStackResult) eCode;
condVar.notify_all();
};
condVar.wait_for(lock, std::chrono::seconds(DEFAULT_TIMEOUT));
guardTimeout(findResourceTimeout, NOTIF_FIND);
+ guardErrorCode(errorCbCode, NOTIF_REQUEST);
}
void IoTivity::unsubscribeNotifications()
{
- guardUnauthorized();
- throw IoTInternalError("unsubscribeNotifications() not implemented", EC_NOT_IMPLEMENTED_YET);
+ if (params->notificationResource)
+ {
+ params->notificationResource->cancelObserve();
+ params->notificationResource = nullptr;
+ }
}
void IoTivity::setPersistentStoragePath(std::string newPath)
struct UserData
{
- bool callbackFired;
+ bool firstCallbackFired;
+ bool lastCallbackFired;
condition_variable notificationCV;
};
static void notificationCb(NM_NotificationData data, void* inUserData)
{
UserData* userData = (UserData*)inUserData;
- userData->callbackFired = true;
+ if (data.callbackState == FIRST_CALLBACK)
+ userData->firstCallbackFired = true;
+ if (data.callbackState == LAST_CALLBACK)
+ userData->lastCallbackFired = true;
userData->notificationCV.notify_all();
}
std::mutex notificationMtx;
std::unique_lock<std::mutex> notificationLock(notificationMtx);
- UserData userDataCheck{false};
+ UserData userDataCheck{false, false};
try
{
iot->subscribeNotifications(notificationCb, &userDataCheck);
+ userDataCheck.notificationCV.wait_for(notificationLock, std::chrono::seconds(2));
}
- catch(std::exception e)
+ catch (NMexception& e)
+ {
+ ADD_FAILURE() << e.what() << " " << e.errorCode();
+ }
+ catch (std::exception& e)
{
ADD_FAILURE() << e.what();
}
- userDataCheck.notificationCV.wait_for(notificationLock, std::chrono::seconds(3));
- ASSERT_TRUE(userDataCheck.callbackFired);
+ iot->unsubscribeNotifications();
+ userDataCheck.notificationCV.wait_for(notificationLock, std::chrono::seconds(2));
+
+ ASSERT_TRUE(userDataCheck.firstCallbackFired);
+ ASSERT_TRUE(userDataCheck.lastCallbackFired);
}
}
static std::condition_variable notificationCV;
-static bool callbackFired = false;
+static bool firstCallbackFired = false;
+static bool lastCallbackFired = false;
static char* callbackUserDataCheck = NULL;
static void notificationCb(NM_NotificationData data, void* user_data)
<< data.title << " "
<< data.message << " "
<< data.time << " "
+ << data.callbackState << " "
<< endl << flush;
- callbackFired = true;
+
+ if (data.callbackState == FIRST_CALLBACK)
+ firstCallbackFired = true;
+ if (data.callbackState == LAST_CALLBACK)
+ lastCallbackFired = true;
+
callbackUserDataCheck = (char*) user_data;
notificationCV.notify_all();
}
/* 2. Call subscribe method and wait no more than 3 sec for notification callback */
ASSERT_EQ(EC_OK, NM_subscribeNotifications(ctx, notificationCb, userDataCheck));
notificationCV.wait_for(notificationLock, std::chrono::seconds(3));
+
+ NM_unsubscribeNotifications(ctx);
+ notificationCV.wait_for(notificationLock, std::chrono::seconds(3));
+
NM_signOut(ctx);
/* 3. Check callback was called */
- ASSERT_TRUE(callbackFired);
+ ASSERT_TRUE(firstCallbackFired);
+ ASSERT_TRUE(lastCallbackFired);
/* 4. Check user data was transferd */
ASSERT_EQ(userDataCheck, callbackUserDataCheck);