void NSDiscoveredProvider(NSProvider * provider)
{
- NSConsumerThread * thread = NSThreadInit(NSDiscoveredProviderFunc, (void *) provider);
+ NS_VERIFY_NOT_NULL_V(provider);
+
+ NSProvider * retProvider = (NSProvider *)NSCopyProvider((NSProvider_internal *)provider);
+ NS_VERIFY_NOT_NULL_V(retProvider);
+
+ NSConsumerThread * thread = NSThreadInit(NSDiscoveredProviderFunc, (void *) retProvider);
NS_VERIFY_NOT_NULL_V(thread);
}
void NSNotificationSync(NSSyncInfo * sync)
{
NS_VERIFY_NOT_NULL_V(sync);
- NSConsumerThread * thread = NSThreadInit(NSNotificationSyncFunc, (void *) sync);
+
+ NSSyncInfo * retSync = (NSSyncInfo *)OICMalloc(sizeof(NSSyncInfo));
+ NS_VERIFY_NOT_NULL_V(retSync);
+ memcpy(retSync, sync, sizeof(NSSyncInfo));
+
+ NSConsumerThread * thread = NSThreadInit(NSNotificationSyncFunc, (void *) retSync);
NS_VERIFY_NOT_NULL_V(thread);
}
void NSMessagePost(NSMessage * msg)
{
NS_VERIFY_NOT_NULL_V(msg);
- NSConsumerThread * thread = NSThreadInit(NSMessagePostFunc, (void *) msg);
+
+ NSMessage * retMsg = (NSMessage *)NSCopyMessage((NSMessage_consumer *)msg);
+ NS_VERIFY_NOT_NULL_V(retMsg);
+
+ NSConsumerThread * thread = NSThreadInit(NSMessagePostFunc, (void *) retMsg);
NS_VERIFY_NOT_NULL_V(thread);
}
OICStrcpy(newMsg->providerId, NS_DEVICE_ID_LENGTH, msg->providerId);
newMsg->i_addr = (OCDevAddr *)OICMalloc(sizeof(OCDevAddr));
- NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(newMsg, NULL, OICFree(newMsg));
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(newMsg, NULL, NSOICFree(newMsg));
memcpy(newMsg->i_addr, msg->i_addr, sizeof(OCDevAddr));
newMsg->messageId = msg->messageId;
newMsg->title = OICStrdup(msg->title);
newMsg->contentText = OICStrdup(msg->contentText);
newMsg->sourceName = OICStrdup(msg->sourceName);
+ newMsg->type = msg->type;
return newMsg;
}
void NSRemoveMessage(NSMessage_consumer * msg)
{
+ NS_VERIFY_NOT_NULL_V(msg);
+
msg->messageId = 0;
NSOICFree(msg->title);
NSOICFree(msg->contentText);
void NSRemoveConnections(NSProviderConnectionInfo * connections)
{
+ NS_VERIFY_NOT_NULL_V(connections);
+
NSProviderConnectionInfo * tmp = connections;
while(tmp)
connections->messageHandle = NULL;
connections->syncHandle = NULL;
connections->isCloudConnection = false;
+ connections->isSubscribing = false;
connections->next = NULL;
if (inAddr)
copyInfo->messageHandle = tmp->messageHandle;
copyInfo->syncHandle = tmp->syncHandle;
copyInfo->isCloudConnection = tmp->isCloudConnection;
+ copyInfo->isSubscribing = tmp->isSubscribing;
tmp = tmp->next;
copyInfo = copyInfo->next;
}
}
void NSRemoveProvider(NSProvider_internal * prov)
{
+ NS_VERIFY_NOT_NULL_V(prov);
+
NSOICFree(prov->messageUri);
NSOICFree(prov->syncUri);
NSRemoveConnections(prov->connection);
return ret;
}
+
+bool NSOCResultToSuccess(OCStackResult ret)
+{
+ switch (ret)
+ {
+ case OC_STACK_OK:
+ case OC_STACK_RESOURCE_CREATED:
+ case OC_STACK_RESOURCE_DELETED:
+ case OC_STACK_CONTINUE:
+ case OC_STACK_RESOURCE_CHANGED:
+ return true;
+ default:
+ return false;
+ }
+}
#include "NSStructs.h"
#include "ocstack.h"
-#define NS_QOS OC_LOW_QOS
+#define NS_QOS OC_HIGH_QOS
#define NS_RESOURCE_TYPE "oic.r.notification"
#define NS_RESOURCE_URI "/notification"
#define NS_INTERFACE_BASELINE "oic.if.baseline"
} \
}
-#define NS_VERIFY_STACK_OK_V(obj) \
+#define NS_VERIFY_STACK_SUCCESS_V(obj) \
{ \
- OCStackResult _ret = (obj); \
- if ( _ret != OC_STACK_OK) \
+ bool _ret = (obj); \
+ if ( _ret != true) \
{ \
NS_LOG_V(ERROR, "%s : %s is not OC_STACK_OK : %d", __func__, #obj, _ret); \
return; \
} \
}
-#define NS_VERIFY_STACK_OK(obj, retVal) \
+#define NS_VERIFY_STACK_SUCCESS(obj, retVal) \
{ \
- OCStackResult _ret = (obj); \
- if ( _ret != OC_STACK_OK) \
+ bool _ret = (obj); \
+ if ( _ret != true) \
{ \
NS_LOG_V(ERROR, "%s : %s is not OC_STACK_OK : %d", __func__, #obj, _ret); \
return (retVal); \
} \
}
-#define NS_VERIFY_STACK_OK_WITH_POST_CLEANING(obj, retVal, func) \
+#define NS_VERIFY_STACK_SUCCESS_WITH_POST_CLEANING(obj, retVal, func) \
{ \
- OCStackResult _ret = (obj); \
- if ( _ret != OC_STACK_OK) \
+ bool _ret = (obj); \
+ if ( _ret != true) \
{ \
NS_LOG_V(ERROR, "%s : %s is not OC_STACK_OK : %d", __func__, #obj, _ret); \
(func); \
OCDoHandle syncHandle;
bool isCloudConnection;
+ bool isSubscribing;
struct NSProviderConnectionInfo * next;
const char * queryUrl, OCPayload * payload,
void * callbackFunc, void * callbackData, OCConnectivityType type);
+bool NSOCResultToSuccess(OCStackResult ret);
+
#ifdef __cplusplus
}
#endif // __cplusplus
NSProviderConnectionInfo * connections = provider_internal->connection;
while(connections)
{
- if (connections->messageHandle)
+ if (connections->isSubscribing == true)
{
+ connections = connections->next;
continue;
}
}
}
+ NS_LOG_V(DEBUG, "subscribe to %s:%d", connections->addr->addr, connections->addr->port);
+
NS_LOG(DEBUG, "get subscribe message query");
char * query = NULL;
query = NSMakeRequestUriWithConsumerId(msgUri);
OCStackResult ret = NSInvokeRequest(&(connections->messageHandle),
OC_REST_OBSERVE, connections->addr, query, NULL,
NSConsumerMessageListener, NULL, type);
- NS_VERIFY_STACK_OK_WITH_POST_CLEANING(ret, NS_ERROR, NSOICFree(query));
+ NS_VERIFY_STACK_SUCCESS_WITH_POST_CLEANING(
+ NSOCResultToSuccess(ret), NS_ERROR, NSOICFree(query));
NSOICFree(query);
NSOICFree(msgUri);
ret = NSInvokeRequest(&(connections->syncHandle),
OC_REST_OBSERVE, connections->addr, query, NULL,
NSConsumerSyncInfoListener, NULL, type);
- NS_VERIFY_STACK_OK_WITH_POST_CLEANING(ret, NS_ERROR, NSOICFree(query));
+ NS_VERIFY_STACK_SUCCESS_WITH_POST_CLEANING(
+ NSOCResultToSuccess(ret), NS_ERROR, NSOICFree(query));
NSOICFree(query);
NSOICFree(syncUri);
+ connections->isSubscribing = true;
+
connections = connections->next;
}
(void) handle;
NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION);
- NS_VERIFY_STACK_OK(clientResponse->result, OC_STACK_KEEP_TRANSACTION);
+ NS_VERIFY_STACK_SUCCESS(
+ NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION);
return OC_STACK_KEEP_TRANSACTION;
}
(void) handle;
NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION);
- NS_VERIFY_STACK_OK(clientResponse->result, OC_STACK_KEEP_TRANSACTION);
+ NS_VERIFY_STACK_SUCCESS(
+ NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION);
NS_LOG(DEBUG, "get NSSyncInfo");
NSSyncInfo * newSync = NSGetSyncInfoc(clientResponse);
(void) handle;
NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION);
- NS_VERIFY_STACK_OK(clientResponse->result, OC_STACK_KEEP_TRANSACTION);
+ NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION);
NS_LOG(DEBUG, "build NSMessage");
NSMessage_consumer * newNoti = NSGetMessage(clientResponse);
NSProviderConnectionInfo * connections = provider->connection;
while(connections)
{
+ if (connections->isSubscribing == false)
+ {
+ NS_LOG_V(DEBUG, "unsubscribed to %s:%d",
+ connections->addr->addr, connections->addr->port);
+ connections = connections->next;
+ continue;
+ }
+ NS_LOG_V(DEBUG, "cancel subscribe to %s:%d",
+ connections->addr->addr, connections->addr->port);
OCCancel(connections->messageHandle, NS_QOS, NULL, 0);
OCCancel(connections->syncHandle, NS_QOS, NULL, 0);
+ connections->messageHandle = NULL;
+ connections->syncHandle = NULL;
+ connections->isSubscribing = false;
connections = connections->next;
}
}
{
NS_LOG(ERROR, "Unknown type message");
}
+ NSOICFree(task);
}
(void) handle;
NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION);
- NS_VERIFY_STACK_OK(clientResponse->result, OC_STACK_KEEP_TRANSACTION);
+ NS_VERIFY_STACK_SUCCESS(
+ NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION);
NS_LOG_V(DEBUG, "Presence income : %s:%d",
clientResponse->devAddr.addr, clientResponse->devAddr.port);
NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION);
NS_VERIFY_NOT_NULL(clientResponse->payload, OC_STACK_KEEP_TRANSACTION);
- NS_VERIFY_STACK_OK(clientResponse->result, OC_STACK_KEEP_TRANSACTION);
+ NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION);
NS_LOG_V(DEBUG, "Discover income : %s:%d",
clientResponse->devAddr.addr, clientResponse->devAddr.port);
(void) handle;
NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION);
- NS_VERIFY_STACK_OK(clientResponse->result, OC_STACK_KEEP_TRANSACTION);
+ NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION);
NS_LOG_V(DEBUG, "GET response income : %s:%d",
clientResponse->devAddr.addr, clientResponse->devAddr.port);
{
NSStorageDestroy(cache);
}
+
+ NSSetMessageCacheList(NULL);
}
void NSDestroyProviderCacheList()
{
NSStorageDestroy(cache);
}
+
+ NSSetProviderCacheList(NULL);
}
NSMessage_consumer * NSMessageCacheFind(const char * messageId)
}
NSCacheElement * cacheElement = NSStorageRead(MessageCache, messageId);
+ NS_VERIFY_NOT_NULL(cacheElement, NULL);
- return (NSMessage_consumer *) cacheElement->data;
+ return NSCopyMessage((NSMessage_consumer *) cacheElement->data);
}
NSProvider_internal * NSProviderCacheFind(const char * providerId)
NSCacheElement * cacheElement = NSStorageRead(ProviderCache, providerId);
NS_VERIFY_NOT_NULL(cacheElement, NULL);
- return (NSProvider_internal *) cacheElement->data;
+ return NSCopyProvider((NSProvider_internal *) cacheElement->data);
}
NS_LOG(DEBUG, "try to write to storage");
NSResult ret = NSStorageWrite(MessageCache, obj);
NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(ret == NS_OK ? (void *) 1 : NULL,
- NS_ERROR, NSRemoveMessage(msg));
-
- NSOICFree(obj);
+ NS_ERROR, NSOICFree(obj));
return NS_OK;
}
NS_LOG(DEBUG, "try to write to storage");
NSResult ret = NSStorageWrite(ProviderCache, obj);
NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(ret == NS_OK ? (void *) 1 : NULL,
- NS_ERROR, NSRemoveProvider(provider));
-
- NSOICFree(obj);
+ NS_ERROR, NSOICFree(obj));
return NS_OK;
}
NS_VERIFY_NOT_NULL_V(provider);
bool isAdded = true;
+ bool isSubscribing = false;
NSProvider_internal * providerCacheData = NSProviderCacheFind(provider->providerId);
- //NS_VERIFY_NOT_NULL_V(providerCacheData == NULL ? (void *)1 : NULL);
+
if (providerCacheData == NULL)
{
isAdded = false;
}
else
{
+ providerCacheData->accessPolicy = provider->accessPolicy;
NSProviderConnectionInfo * infos = providerCacheData->connection;
OCTransportAdapter newAdapter = provider->connection->addr->adapter;
- while(infos)
+ while (infos)
{
- if (infos->addr->adapter == newAdapter)
+ isSubscribing |= infos->isSubscribing;
+ if (infos->addr->adapter == newAdapter && infos->isSubscribing == true)
{
- NS_LOG(DEBUG, "This provider already discovered.");
+ NS_LOG_V(DEBUG, "This provider already discovered : %s:%d",
+ infos->addr->addr, infos->addr->port);
+ NS_LOG_V(DEBUG, "Subscription : %d", infos->isSubscribing);
return;
}
infos = infos->next;
NS_LOG(DEBUG, "provider's connection is updated.");
}
-
- if (provider->accessPolicy == NS_ACCESS_DENY && isAdded == false)
+ if (provider->accessPolicy == NS_ACCESS_DENY && isSubscribing == false)
{
NS_LOG(DEBUG, "accepter is NS_ACCEPTER_CONSUMER, Callback to user");
NSDiscoveredProvider((NSProvider *) provider);
else
{
NS_LOG(DEBUG, "accepter is NS_ACCEPTER_PROVIDER, request subscribe");
- NSTask * task = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE, (void *) provider);
+ NSProvider_internal * subProvider = NSCopyProvider(provider);
+ NSTask * task = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE, (void *) subProvider);
NS_VERIFY_NOT_NULL_V(task);
NSConsumerPushEvent(task);
}
+
+ NSRemoveProvider(providerCacheData);
+}
+
+void NSConsumerHandleProviderDeleted(NSProvider_internal * provider)
+{
+ // TODO delete provider infomation on storage list.
+ (void) provider;
}
void NSConsumerHandleRecvSubscriptionConfirmed(NSMessage_consumer * msg)
{
NS_VERIFY_NOT_NULL_V(msg);
+ NS_LOG_V(DEBUG, "confirmed by : %s", msg->providerId);
NSProvider_internal * provider = NSProviderCacheFind(msg->providerId);
NS_VERIFY_NOT_NULL_V(provider);
{
NS_VERIFY_NOT_NULL_V(sync);
- NSProvider_internal * provider = NSProviderCacheFind(sync->providerId);
- NS_VERIFY_NOT_NULL_V(provider);
-
char msgId[NS_DEVICE_ID_LENGTH] = { 0, };
snprintf(msgId, NS_DEVICE_ID_LENGTH, "%lld", sync->messageId);
NSResult ret = NSMessageCacheUpdate(msg, sync->state);
NS_VERIFY_NOT_NULL_V(ret == NS_OK ? (void *) 1 : NULL);
+ NSRemoveMessage(msg);
NSNotificationSync(sync);
}
NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(syncTask, NSOICFree(syncInfo));
NSConsumerPushEvent(syncTask);
-
- NSOICFree(sync);
}
void NSConsumerInternalTaskProcessing(NSTask * task)
{
NS_LOG(DEBUG, "Receive Subscribe confirm from provider.");
NSConsumerHandleRecvSubscriptionConfirmed((NSMessage_consumer *)task->taskData);
+ NSRemoveMessage((NSMessage_consumer *)task->taskData);
break;
}
case TASK_CONSUMER_RECV_MESSAGE:
{
NS_LOG(DEBUG, "Receive New Notification");
NSConsumerHandleRecvMessage((NSMessage_consumer *)task->taskData);
-
+ NSRemoveMessage((NSMessage_consumer *)task->taskData);
break;
}
case TASK_CONSUMER_PROVIDER_DISCOVERED:
{
NS_LOG(DEBUG, "Receive New Provider is discovered.");
NSConsumerHandleProviderDiscovered((NSProvider_internal *)task->taskData);
+ NSRemoveProvider((NSProvider_internal *)task->taskData);
break;
}
case TASK_RECV_SYNCINFO:
{
NS_LOG(DEBUG, "Receive SyncInfo.");
NSConsumerHandleRecvSyncInfo((NSSyncInfo *)task->taskData);
+ NSOICFree(task->taskData);
break;
}
case TASK_MAKE_SYNCINFO:
{
NS_LOG(DEBUG, "Make SyncInfo, get Provider's Addr");
NSConsumerHandleMakeSyncInfo((NSSyncInfo *)task->taskData);
+ NSOICFree(task->taskData);
+ break;
+ }
+ case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
+ {
+ NSConsumerHandleProviderDeleted((NSProvider_internal *)task->taskData);
+ NSRemoveProvider((NSProvider_internal *)task->taskData);
break;
}
default :
return ;
}
}
+ NSOICFree(task);
}
OCStackResult stackResult = NSInvokeRequest(getPresenceHandle(), OC_REST_PRESENCE, NULL,
NS_PRESENCE_SUBSCRIBE_QUERY, NULL, NSConsumerPresenceListener,
NULL, CT_DEFAULT);
- NS_VERIFY_STACK_OK(stackResult, NS_ERROR);
+ NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(stackResult), NS_ERROR);
NS_LOG(DEBUG, "Request to discover provider");
stackResult = NSInvokeRequest(NULL, OC_REST_DISCOVER, NULL,
NS_DISCOVER_QUERY, NULL, NSProviderDiscoverListener,
NULL, CT_DEFAULT);
- NS_VERIFY_STACK_OK(stackResult, NS_ERROR);
+ NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(stackResult), NS_ERROR);
return NS_OK;
}
void NSDestroyQueue(NSConsumerQueue * queue)
{
+ NS_VERIFY_NOT_NULL_V(queue);
+
NSConsumerQueueObject * node = NSPopQueue(queue);
while(node)
{
node = (NSConsumerQueueObject *)node->next;
- OICFree(node->data);
- OICFree(node);
+ NSOICFree(node->data);
+ NSOICFree(node);
}
- OICFree(queue);
+ NSOICFree(queue);
}
bool NSPushQueue(NSConsumerQueue * queue, NSConsumerQueueObject * object)
NSConsumerListenerTermiate();
NSThreadStop(*(NSGetMsgHandleThreadHandle()));
NSDestroyQueue(*(NSGetMsgHandleQueue()));
+ NSSetMsgHandleQueue(NULL);
}
void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
if (!queue)
{
NS_LOG(ERROR, "NSQueue is null. can not insert to queue");
- OICFree(data);
- OICFree(obj);
+ NSOICFree(data);
+ NSOICFree(obj);
}
else
{
if (it)
{
NSMessage_consumer * msgObj = (NSMessage_consumer *) it->data;
+ if(msgObj->type == newMsgObj->type)
+ {
+ NS_LOG (DEBUG, "Already receive message");
+ pthread_mutex_unlock(mutex);
+ return NS_ERROR;
+ }
+
+
it->data = (void *) NSCopyMessage(newMsgObj);
if (!it->data)
{
}
infos->next = NSCopyProviderConnections(newProvObj->connection);
- NSRemoveProvider(newProvObj);
pthread_mutex_unlock(mutex);
return NS_OK;