1 //******************************************************************
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "NSProviderTopic.h"
22 #include "oic_string.h"
23 #include "oic_malloc.h"
26 NSResult NSSendTopicUpdation();
28 NSResult NSInitTopicList()
30 NS_LOG(DEBUG, "NSInitTopicList - IN");
32 consumerTopicList = NSProviderStorageCreate();
33 NS_VERIFY_NOT_NULL(consumerTopicList, NS_FAIL);
34 consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;
36 registeredTopicList = NSProviderStorageCreate();
37 NS_VERIFY_NOT_NULL(registeredTopicList, NS_FAIL);
38 registeredTopicList->cacheType = NS_PROVIDER_CACHE_REGISTER_TOPIC;
40 NS_LOG(DEBUG, "NSInitTopicList - OUT");
44 size_t NSProviderGetTopicListSize(NSTopicLL * firstElement)
53 NSTopicLL * iter = firstElement;
64 NSResult NSRegisterTopic(const char * topicName)
66 NS_LOG(DEBUG, "NSWriteTopicsToStorage()");
68 NSCacheTopicData * data = (NSCacheTopicData *) OICMalloc(sizeof(NSCacheTopicData));
69 NS_VERIFY_NOT_NULL(data, NS_FAIL);
70 data->topicName = (char *) topicName;
71 data->state = NS_TOPIC_UNSUBSCRIBED;
73 NSCacheElement * element = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
76 NSOICFree(data->topicName);
81 element->data = (void *) data;
84 if (NSProviderStorageWrite(registeredTopicList, element) != NS_OK)
86 NS_LOG(DEBUG, "fail to write cache");
90 NSSendTopicUpdation();
91 NS_LOG(DEBUG, "NSWriteTopicsToStorage() NS_OK");
95 NSResult NSUnregisterTopic(const char * topicName)
97 NS_LOG(DEBUG, "NSDeleteTopics()");
98 NSResult result = NS_OK;
102 NS_LOG(ERROR, "topicName is NULL");
106 result = NSProviderStorageDelete(registeredTopicList, topicName);
108 while (NSProviderStorageDelete(consumerTopicList, topicName) != NS_FAIL)
114 NSSendTopicUpdation();
120 NSResult NSSendTopicUpdation()
122 NS_LOG(DEBUG, "NSSendTopicUpdation - IN");
124 OCRepPayload* payload = OCRepPayloadCreate();
128 NS_LOG(ERROR, "fail to create playload");
132 OCResourceHandle rHandle = NULL;
133 if (NSPutMessageResource(NULL, &rHandle) != NS_OK)
135 NS_LOG(ERROR, "Fail to put message resource");
139 OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);
140 OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);
141 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
143 OCObservationId obArray[255] =
147 NSCacheElement * it = consumerSubList->head;
151 NSCacheSubData * subData = (NSCacheSubData *) it->data;
153 if (subData->isWhite)
155 if (subData->messageObId != 0)
157 obArray[obCount++] = subData->messageObId;
160 #if (defined WITH_CLOUD)
161 if (subData->remote_messageObId != 0)
163 obArray[obCount++] = subData->remote_messageObId;
173 NS_LOG(ERROR, "observer count is zero");
177 if (OCNotifyListOfObservers(rHandle, obArray, obCount, payload, OC_HIGH_QOS) != OC_STACK_OK)
179 NS_LOG(ERROR, "fail to send topic updation");
180 OCRepPayloadDestroy(payload);
184 OCRepPayloadDestroy(payload);
186 NS_LOG(DEBUG, "NSSendTopicUpdation - OUT");
190 NSResult NSSendTopicUpdationToConsumer(char *consumerId)
192 NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - IN");
194 OCRepPayload* payload = OCRepPayloadCreate();
198 NS_LOG(ERROR, "fail to create playload");
202 OCResourceHandle rHandle = NULL;
203 if (NSPutMessageResource(NULL, &rHandle) != NS_OK)
205 NS_LOG(ERROR, "Fail to put message resource");
209 OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);
210 OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);
211 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
213 NSCacheElement * element = NSProviderStorageRead(consumerSubList, consumerId);
217 NS_LOG(ERROR, "element is NULL");
221 NSCacheSubData * subData = (NSCacheSubData*) element->data;
223 if (OCNotifyListOfObservers(rHandle, (OCObservationId*) &subData->messageObId, 1, payload,
224 OC_HIGH_QOS) != OC_STACK_OK)
226 NS_LOG(ERROR, "fail to send topic updation");
227 OCRepPayloadDestroy(payload);
231 OCRepPayloadDestroy(payload);
233 NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - OUT");
237 NSResult NSSendTopicList(OCEntityHandlerRequest * entityHandlerRequest)
239 NS_LOG(DEBUG, "NSSendTopicList - IN");
241 char * copyReq = OICStrdup(entityHandlerRequest->query);
242 char * id = NSGetValueFromQuery(copyReq, NS_QUERY_CONSUMER_ID);
243 NSTopicLL * topics = NULL;
247 NS_LOG(DEBUG, "Send registered topic list");
248 topics = NSProviderGetTopicsCacheData(registeredTopicList);
252 NS_LOG(DEBUG, "Send subscribed topic list to consumer");
253 topics = NSProviderGetConsumerTopicsCacheData(registeredTopicList, consumerTopicList, id);
256 topics = NSProviderGetTopicsCacheData(registeredTopicList);
260 // make response for the Get Request
261 OCEntityHandlerResponse response;
262 response.numSendVendorSpecificHeaderOptions = 0;
263 memset(response.sendVendorSpecificHeaderOptions, 0,
264 sizeof response.sendVendorSpecificHeaderOptions);
265 memset(response.resourceUri, 0, sizeof response.resourceUri);
267 OCRepPayload* payload = OCRepPayloadCreate();
270 NS_LOG(ERROR, "payload is NULL");
275 OCRepPayloadSetUri(payload, NS_COLLECTION_TOPIC_URI);
278 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, id);
280 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
285 NS_LOG(DEBUG, "topicList is NULL");
286 size_t dimensionSize = (size_t) NSProviderGetTopicListSize(topics);
287 NS_LOG_V(DEBUG, "dimensionSize = %d", (int)dimensionSize);
294 OCRepPayload** payloadTopicArray = (OCRepPayload **) OICMalloc(
295 sizeof(OCRepPayload *) * dimensionSize);
296 NS_VERIFY_NOT_NULL(payloadTopicArray, NS_ERROR);
298 size_t dimensions[3] = { dimensionSize, 0, 0 };
300 for (int i = 0; i < (int) dimensionSize; i++)
302 NS_LOG_V(DEBUG, "topicName = %s", topics->topicName);
303 NS_LOG_V(DEBUG, "topicState = %d",(int) topics->state);
305 payloadTopicArray[i] = OCRepPayloadCreate();
306 NS_VERIFY_NOT_NULL(payloadTopicArray[i], NS_ERROR);
307 OCRepPayloadSetPropString(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_NAME,
309 OCRepPayloadSetPropInt(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_SELECTION,
310 (int) topics->state);
312 NSTopicLL * next = topics->next;
313 NSOICFree(topics->topicName);
318 OCRepPayloadSetPropObjectArrayAsOwner(payload, NS_ATTRIBUTE_TOPIC_LIST,
319 payloadTopicArray, dimensions);
323 size_t dimensions[3] = { 0, 0, 0 };
325 OCRepPayloadSetPropObjectArrayAsOwner(payload, NS_ATTRIBUTE_TOPIC_LIST,
326 (OCRepPayload **) NULL, dimensions);
329 copyReq = OICStrdup(entityHandlerRequest->query);
330 char * reqInterface = NSGetValueFromQuery(copyReq, NS_QUERY_INTERFACE);
332 if (reqInterface && strcmp(reqInterface, NS_INTERFACE_BASELINE) == 0)
334 OCResourcePayloadAddStringLL(&payload->interfaces, NS_INTERFACE_BASELINE);
335 OCResourcePayloadAddStringLL(&payload->interfaces, NS_INTERFACE_READ);
336 OCResourcePayloadAddStringLL(&payload->types, NS_ROOT_TYPE);
340 response.requestHandle = entityHandlerRequest->requestHandle;
341 response.resourceHandle = entityHandlerRequest->resource;
342 response.persistentBufferFlag = 0;
343 response.ehResult = OC_EH_OK;
344 response.payload = (OCPayload *) payload;
346 if (OCDoResponse(&response) != OC_STACK_OK)
348 NS_LOG(ERROR, "Fail to response topic list");
349 OCRepPayloadDestroy(payload);
353 OCRepPayloadDestroy(payload);
354 NS_LOG(DEBUG, "NSSendTopicList - OUT");
358 NSResult NSPostConsumerTopics(OCEntityHandlerRequest * entityHandlerRequest)
360 NS_LOG(DEBUG, "NSPostConsumerTopics() - IN");
362 char * consumerId = NULL;
363 OCRepPayload * payload = (OCRepPayload *) entityHandlerRequest->payload;
364 OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, &consumerId);
368 NS_LOG(DEBUG, "Invalid consumer ID");
372 NS_LOG_V(INFO_PRIVATE, "TOPIC consumer ID = %s", consumerId);
374 consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_CID;
376 while (NSProviderStorageDelete(consumerTopicList, consumerId) != NS_FAIL)
380 consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;
381 OCRepPayload ** topicListPayload = NULL;
382 OCRepPayloadValue * payloadValue = NULL;
383 payloadValue = NSPayloadFindValue(payload, NS_ATTRIBUTE_TOPIC_LIST);
384 size_t dimensionSize = calcDimTotal(payloadValue->arr.dimensions);
385 size_t dimensions[3] = { dimensionSize, 0, 0 };
386 OCRepPayloadGetPropObjectArray(payload, NS_ATTRIBUTE_TOPIC_LIST, &topicListPayload, dimensions);
388 for (int i = 0; i < (int) dimensionSize; i++)
390 char * topicName = NULL;
391 int64_t topicState = 0;
393 OCRepPayloadGetPropString(topicListPayload[i], NS_ATTRIBUTE_TOPIC_NAME, &topicName);
394 if (OCRepPayloadGetPropInt(topicListPayload[i], NS_ATTRIBUTE_TOPIC_SELECTION, &topicState))
396 NS_LOG_V(DEBUG, "Topic Name(state): %s(%d)", topicName, (int)topicState);
399 if (NS_TOPIC_SUBSCRIBED == (NSTopicState) topicState)
401 NSCacheTopicSubData * topicSubData = (NSCacheTopicSubData *) OICMalloc(
402 sizeof(NSCacheTopicSubData));
403 NS_VERIFY_NOT_NULL(topicSubData, NS_FAIL);
405 OICStrcpy(topicSubData->id, NS_UUID_STRING_SIZE, consumerId);
406 topicSubData->topicName = topicName;
408 NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
412 NSOICFree(topicSubData->topicName);
413 NSOICFree(topicSubData);
414 NSOICFree(consumerId);
417 for (size_t k = 0; k < dimensionSize; k++)
419 OCRepPayloadDestroy(topicListPayload[k]);
421 OICFree(topicListPayload);
426 newObj->data = (NSCacheData *) topicSubData;
429 if (NS_OK != NSProviderStorageWrite(consumerTopicList, newObj))
431 NS_LOG(ERROR, "Fail to write cache");
435 NSSendTopicUpdationToConsumer(consumerId);
438 for (size_t k = 0; k < dimensionSize; k++)
440 OCRepPayloadDestroy(topicListPayload[k]);
442 OICFree(topicListPayload);
444 NSOICFree(consumerId);
445 NS_LOG(DEBUG, "NSPostConsumerTopics() - OUT");
449 void * NSTopicSchedule(void * ptr)
453 NS_LOG(DEBUG, "Create NSTopicSchedule");
456 while (NSIsRunning[TOPIC_SCHEDULER])
458 sem_wait(&NSSemaphore[TOPIC_SCHEDULER]);
459 pthread_mutex_lock(&NSMutex[TOPIC_SCHEDULER]);
461 if (NSHeadMsg[TOPIC_SCHEDULER] != NULL)
463 NSTask *node = NSHeadMsg[TOPIC_SCHEDULER];
464 NSHeadMsg[TOPIC_SCHEDULER] = node->nextTask;
466 switch (node->taskType)
468 case TASK_SEND_TOPICS:
469 NS_LOG(DEBUG, "CASE TASK_SEND_TOPICS : ");
470 NSSendTopicList((OCEntityHandlerRequest*) node->taskData);
471 NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);
473 case TASK_SUBSCRIBE_TOPIC:
475 NS_LOG(DEBUG, "CASE TASK_SUBSCRIBE_TOPIC : ");
476 NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
477 pthread_mutex_lock(topicSyncResult->mutex);
478 NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
479 NSCacheTopicSubData * subData =
480 (NSCacheTopicSubData *) topicSyncResult->topicData;
483 NSOICFree(subData->topicName);
485 pthread_cond_signal(topicSyncResult->condition);
486 pthread_mutex_unlock(topicSyncResult->mutex);
490 if (NSProviderStorageRead(registeredTopicList, subData->topicName))
492 newObj->data = topicSyncResult->topicData;
495 if (NSProviderStorageWrite(consumerTopicList, newObj) == NS_OK)
497 NSSendTopicUpdationToConsumer(subData->id);
498 topicSyncResult->result = NS_OK;
503 NSOICFree(subData->topicName);
508 pthread_cond_signal(topicSyncResult->condition);
509 pthread_mutex_unlock(topicSyncResult->mutex);
512 case TASK_UNSUBSCRIBE_TOPIC:
514 NS_LOG(DEBUG, "CASE TASK_UNSUBSCRIBE_TOPIC : ");
515 NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
516 pthread_mutex_lock(topicSyncResult->mutex);
517 NSCacheTopicSubData * topicSubData =
518 (NSCacheTopicSubData *) topicSyncResult->topicData;
520 if (NSProviderDeleteConsumerTopic(consumerTopicList, topicSubData) == NS_OK)
522 NSSendTopicUpdationToConsumer(topicSubData->id);
523 topicSyncResult->result = NS_OK;
526 NSOICFree(topicSubData->topicName);
527 NSOICFree(topicSubData);
528 pthread_cond_signal(topicSyncResult->condition);
529 pthread_mutex_unlock(topicSyncResult->mutex);
533 case TASK_REGISTER_TOPIC:
535 NS_LOG(DEBUG, "CASE TASK_ADD_TOPIC : ");
536 NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
538 pthread_mutex_lock(topicSyncResult->mutex);
539 topicSyncResult->result = NSRegisterTopic(
540 (const char *) topicSyncResult->topicData);
541 pthread_cond_signal(topicSyncResult->condition);
542 pthread_mutex_unlock(topicSyncResult->mutex);
545 case TASK_UNREGISTER_TOPIC:
547 NS_LOG(DEBUG, "CASE_TASK_DELETE_TOPIC : ");
548 NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
549 pthread_mutex_lock(topicSyncResult->mutex);
550 topicSyncResult->result = NSUnregisterTopic(
551 (const char *) topicSyncResult->topicData);
552 pthread_cond_signal(topicSyncResult->condition);
553 pthread_mutex_unlock(topicSyncResult->mutex);
556 case TASK_POST_TOPIC:
558 NS_LOG(DEBUG, "TASK_POST_TOPIC : ");
559 NSPostConsumerTopics((OCEntityHandlerRequest*) node->taskData);
560 NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);
563 case TASK_GET_TOPICS:
565 NS_LOG(DEBUG, "TASK_GET_TOPICS : ");
566 NSTopicSync * topicSync = (NSTopicSync *) node->taskData;
567 pthread_mutex_lock(topicSync->mutex);
568 NSTopicLL * topics = NSProviderGetTopicsCacheData(registeredTopicList);
569 topicSync->topics = topics;
570 pthread_cond_signal(topicSync->condition);
571 pthread_mutex_unlock(topicSync->mutex);
574 case TAST_GET_CONSUMER_TOPICS:
576 NS_LOG(DEBUG, "TASK_GET_CONSUMER_TOPICS : ");
577 NSTopicSync * topicSync = (NSTopicSync *) node->taskData;
578 pthread_mutex_lock(topicSync->mutex);
579 NSTopicLL * topics = NSProviderGetConsumerTopicsCacheData(registeredTopicList,
580 consumerTopicList, topicSync->consumerId);
581 topicSync->topics = topics;
582 pthread_cond_signal(topicSync->condition);
583 pthread_mutex_unlock(topicSync->mutex);
593 pthread_mutex_unlock(&NSMutex[TOPIC_SCHEDULER]);
596 NS_LOG(DEBUG, "Destroy NSTopicSchedule");