1 //******************************************************************
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "NSProviderTopic.h"
22 #include "oic_string.h"
23 #include "oic_malloc.h"
26 NSCacheList * consumerTopicList;
27 NSCacheList * registeredTopicList;
29 NSResult NSSendTopicUpdation();
31 NSResult NSInitTopicList()
33 NS_LOG(DEBUG, "NSInitTopicList - IN");
35 consumerTopicList = NSProviderStorageCreate();
36 NS_VERIFY_NOT_NULL(consumerTopicList, NS_FAIL);
37 consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;
39 registeredTopicList = NSProviderStorageCreate();
40 NS_VERIFY_NOT_NULL(registeredTopicList, NS_FAIL);
41 registeredTopicList->cacheType = NS_PROVIDER_CACHE_REGISTER_TOPIC;
43 NS_LOG(DEBUG, "NSInitTopicList - OUT");
47 size_t NSProviderGetTopicListSize(NSTopicLL * firstElement)
56 NSTopicLL * iter = firstElement;
67 NSResult NSRegisterTopic(const char * topicName)
69 NS_LOG(DEBUG, "NSWriteTopicsToStorage()");
71 NSCacheTopicData * data = (NSCacheTopicData *) OICMalloc(sizeof(NSCacheTopicData));
72 NS_VERIFY_NOT_NULL(data, NS_FAIL);
73 data->topicName = (char *) topicName;
74 data->state = NS_TOPIC_UNSUBSCRIBED;
76 NSCacheElement * element = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
79 NSOICFree(data->topicName);
84 element->data = (void *) data;
87 if (NSProviderStorageWrite(registeredTopicList, element) != NS_OK)
89 NS_LOG(DEBUG, "fail to write cache");
93 NSSendTopicUpdation();
94 NS_LOG(DEBUG, "NSWriteTopicsToStorage() NS_OK");
98 NSResult NSUnregisterTopic(const char * topicName)
100 NS_LOG(DEBUG, "NSDeleteTopics()");
101 NSResult result = NS_OK;
105 NS_LOG(ERROR, "topicName is NULL");
109 result = NSProviderStorageDelete(registeredTopicList, topicName);
111 while (NSProviderStorageDelete(consumerTopicList, topicName) != NS_FAIL)
117 NSSendTopicUpdation();
123 NSResult NSSendTopicUpdation()
125 NS_LOG(DEBUG, "NSSendTopicUpdation - IN");
127 OCRepPayload* payload = OCRepPayloadCreate();
131 NS_LOG(ERROR, "fail to create playload");
135 OCResourceHandle rHandle = NULL;
136 if (NSPutMessageResource(NULL, &rHandle) != NS_OK)
138 NS_LOG(ERROR, "Fail to put message resource");
142 OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);
143 OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);
144 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
146 OCObservationId obArray[3839] =
150 NSCacheElement * it = consumerSubList->head;
154 NSCacheSubData * subData = (NSCacheSubData *) it->data;
156 if (subData->isWhite)
158 if (subData->messageObId != 0)
160 obArray[obCount++] = subData->messageObId;
163 #if (defined WITH_CLOUD)
164 if (subData->remote_messageObId != 0)
166 obArray[obCount++] = subData->remote_messageObId;
176 NS_LOG(ERROR, "observer count is zero");
180 if (OCNotifyListOfObservers(rHandle, obArray, obCount, payload, OC_HIGH_QOS) != OC_STACK_OK)
182 NS_LOG(ERROR, "fail to send topic updation");
183 OCRepPayloadDestroy(payload);
187 OCRepPayloadDestroy(payload);
189 NS_LOG(DEBUG, "NSSendTopicUpdation - OUT");
193 NSResult NSSendTopicUpdationToConsumer(char *consumerId)
195 NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - IN");
197 OCRepPayload* payload = OCRepPayloadCreate();
201 NS_LOG(ERROR, "fail to create playload");
205 OCResourceHandle rHandle = NULL;
206 if (NSPutMessageResource(NULL, &rHandle) != NS_OK)
208 NS_LOG(ERROR, "Fail to put message resource");
212 OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);
213 OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);
214 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
216 NSCacheElement * element = NSProviderStorageRead(consumerSubList, consumerId);
220 NS_LOG(ERROR, "element is NULL");
224 NSCacheSubData * subData = (NSCacheSubData*) element->data;
226 if (OCNotifyListOfObservers(rHandle, (OCObservationId*) &subData->messageObId, 1, payload,
227 OC_HIGH_QOS) != OC_STACK_OK)
229 NS_LOG(ERROR, "fail to send topic updation");
230 OCRepPayloadDestroy(payload);
234 OCRepPayloadDestroy(payload);
236 NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - OUT");
240 NSResult NSSendTopicList(OCEntityHandlerRequest * entityHandlerRequest)
242 NS_LOG(DEBUG, "NSSendTopicList - IN");
244 char * copyReq = OICStrdup(entityHandlerRequest->query);
245 char * id = NSGetValueFromQuery(copyReq, NS_QUERY_CONSUMER_ID);
246 NSTopicLL * topics = NULL;
250 NS_LOG(DEBUG, "Send registered topic list");
251 topics = NSProviderGetTopicsCacheData(registeredTopicList);
255 NS_LOG(DEBUG, "Send subscribed topic list to consumer");
256 topics = NSProviderGetConsumerTopicsCacheData(registeredTopicList, consumerTopicList, id);
259 topics = NSProviderGetTopicsCacheData(registeredTopicList);
263 // make response for the Get Request
264 OCEntityHandlerResponse response;
265 response.numSendVendorSpecificHeaderOptions = 0;
266 memset(response.sendVendorSpecificHeaderOptions, 0,
267 sizeof response.sendVendorSpecificHeaderOptions);
268 memset(response.resourceUri, 0, sizeof response.resourceUri);
270 OCRepPayload* payload = OCRepPayloadCreate();
273 NS_LOG(ERROR, "payload is NULL");
278 OCRepPayloadSetUri(payload, NS_COLLECTION_TOPIC_URI);
281 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, id);
283 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
288 NS_LOG(DEBUG, "topicList is NULL");
289 size_t dimensionSize = (size_t) NSProviderGetTopicListSize(topics);
290 NS_LOG_V(DEBUG, "dimensionSize = %d", (int)dimensionSize);
297 OCRepPayload** payloadTopicArray = (OCRepPayload **) OICMalloc(
298 sizeof(OCRepPayload *) * dimensionSize);
299 NS_VERIFY_NOT_NULL(payloadTopicArray, NS_ERROR);
301 size_t dimensions[3] = { dimensionSize, 0, 0 };
303 for (int i = 0; i < (int) dimensionSize; i++)
305 NS_LOG_V(DEBUG, "topicName = %s", topics->topicName);
306 NS_LOG_V(DEBUG, "topicState = %d",(int) topics->state);
308 payloadTopicArray[i] = OCRepPayloadCreate();
309 if (!payloadTopicArray[i])
311 NS_LOG_V(ERROR, "payloadTopicArray[%d] is NULL", i);
312 NSOICFree(payloadTopicArray);
316 OCRepPayloadSetPropString(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_NAME,
318 OCRepPayloadSetPropInt(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_SELECTION,
319 (int) topics->state);
321 NSTopicLL * next = topics->next;
322 NSOICFree(topics->topicName);
327 OCRepPayloadSetPropObjectArrayAsOwner(payload, NS_ATTRIBUTE_TOPIC_LIST,
328 payloadTopicArray, dimensions);
332 size_t dimensions[3] = { 0, 0, 0 };
334 OCRepPayloadSetPropObjectArrayAsOwner(payload, NS_ATTRIBUTE_TOPIC_LIST,
335 (OCRepPayload **) NULL, dimensions);
338 copyReq = OICStrdup(entityHandlerRequest->query);
339 char * reqInterface = NSGetValueFromQuery(copyReq, NS_QUERY_INTERFACE);
341 if (reqInterface && strcmp(reqInterface, NS_INTERFACE_BASELINE) == 0)
343 OCResourcePayloadAddStringLL(&payload->interfaces, NS_INTERFACE_BASELINE);
344 OCResourcePayloadAddStringLL(&payload->interfaces, NS_INTERFACE_READ);
345 OCResourcePayloadAddStringLL(&payload->types, NS_ROOT_TYPE);
349 response.requestHandle = entityHandlerRequest->requestHandle;
350 response.resourceHandle = entityHandlerRequest->resource;
351 response.persistentBufferFlag = 0;
352 response.ehResult = OC_EH_OK;
353 response.payload = (OCPayload *) payload;
355 if (OCDoResponse(&response) != OC_STACK_OK)
357 NS_LOG(ERROR, "Fail to response topic list");
358 OCRepPayloadDestroy(payload);
362 OCRepPayloadDestroy(payload);
363 NS_LOG(DEBUG, "NSSendTopicList - OUT");
367 NSResult NSPostConsumerTopics(OCEntityHandlerRequest * entityHandlerRequest)
369 NS_LOG(DEBUG, "NSPostConsumerTopics() - IN");
371 char * consumerId = NULL;
372 OCRepPayload * payload = (OCRepPayload *) entityHandlerRequest->payload;
373 OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, &consumerId);
377 NS_LOG(DEBUG, "Invalid consumer ID");
381 NS_LOG_V(INFO_PRIVATE, "TOPIC consumer ID = %s", consumerId);
383 consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_CID;
385 while (NSProviderStorageDelete(consumerTopicList, consumerId) != NS_FAIL)
389 consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;
390 OCRepPayload ** topicListPayload = NULL;
391 OCRepPayloadValue * payloadValue = NULL;
392 payloadValue = NSPayloadFindValue(payload, NS_ATTRIBUTE_TOPIC_LIST);
393 size_t dimensionSize = calcDimTotal(payloadValue->arr.dimensions);
394 size_t dimensions[3] = { dimensionSize, 0, 0 };
395 OCRepPayloadGetPropObjectArray(payload, NS_ATTRIBUTE_TOPIC_LIST, &topicListPayload, dimensions);
397 for (int i = 0; i < (int) dimensionSize; i++)
399 char * topicName = NULL;
400 int64_t topicState = 0;
402 OCRepPayloadGetPropString(topicListPayload[i], NS_ATTRIBUTE_TOPIC_NAME, &topicName);
403 if (OCRepPayloadGetPropInt(topicListPayload[i], NS_ATTRIBUTE_TOPIC_SELECTION, &topicState))
405 NS_LOG_V(DEBUG, "Topic Name(state): %s(%d)", topicName, (int)topicState);
408 if (NS_TOPIC_SUBSCRIBED == (NSTopicState) topicState)
410 NSCacheTopicSubData * topicSubData = (NSCacheTopicSubData *) OICMalloc(
411 sizeof(NSCacheTopicSubData));
412 NS_VERIFY_NOT_NULL_EXIT(topicSubData);
414 OICStrcpy(topicSubData->id, NS_UUID_STRING_SIZE, consumerId);
415 topicSubData->topicName = topicName;
417 NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
421 NSOICFree(topicSubData->topicName);
422 NSOICFree(topicSubData);
423 NSOICFree(consumerId);
426 for (size_t k = 0; k < dimensionSize; k++)
428 OCRepPayloadDestroy(topicListPayload[k]);
430 OICFree(topicListPayload);
435 newObj->data = (NSCacheData *) topicSubData;
438 if (NS_OK != NSProviderStorageWrite(consumerTopicList, newObj))
440 NS_LOG(ERROR, "Fail to write cache");
444 NSSendTopicUpdationToConsumer(consumerId);
447 for (size_t k = 0; k < dimensionSize; k++)
449 OCRepPayloadDestroy(topicListPayload[k]);
451 OICFree(topicListPayload);
453 NSOICFree(consumerId);
454 NS_LOG(DEBUG, "NSPostConsumerTopics() - OUT");
458 OICFree(topicListPayload);
462 void * NSTopicSchedule(void * ptr)
466 NS_LOG(DEBUG, "Create NSTopicSchedule");
469 while (NSIsRunning[TOPIC_SCHEDULER])
471 sem_wait(&NSSemaphore[TOPIC_SCHEDULER]);
472 pthread_mutex_lock(&NSMutex[TOPIC_SCHEDULER]);
474 if (NSHeadMsg[TOPIC_SCHEDULER] != NULL)
476 NSTask *node = NSHeadMsg[TOPIC_SCHEDULER];
477 NSHeadMsg[TOPIC_SCHEDULER] = node->nextTask;
479 switch (node->taskType)
481 case TASK_SEND_TOPICS:
482 NS_LOG(DEBUG, "CASE TASK_SEND_TOPICS : ");
483 NSSendTopicList((OCEntityHandlerRequest*) node->taskData);
484 NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);
486 case TASK_SUBSCRIBE_TOPIC:
488 NS_LOG(DEBUG, "CASE TASK_SUBSCRIBE_TOPIC : ");
489 NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
490 pthread_mutex_lock(topicSyncResult->mutex);
491 NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
492 NSCacheTopicSubData * subData =
493 (NSCacheTopicSubData *) topicSyncResult->topicData;
496 NSOICFree(subData->topicName);
498 pthread_cond_signal(topicSyncResult->condition);
499 pthread_mutex_unlock(topicSyncResult->mutex);
503 if (NSProviderStorageRead(registeredTopicList, subData->topicName))
505 newObj->data = topicSyncResult->topicData;
508 if (NSProviderStorageWrite(consumerTopicList, newObj) == NS_OK)
512 NSSendTopicUpdationToConsumer(subData->id);
514 topicSyncResult->result = NS_OK;
519 NSOICFree(subData->topicName);
524 pthread_cond_signal(topicSyncResult->condition);
525 pthread_mutex_unlock(topicSyncResult->mutex);
528 case TASK_UNSUBSCRIBE_TOPIC:
530 NS_LOG(DEBUG, "CASE TASK_UNSUBSCRIBE_TOPIC : ");
531 NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
532 pthread_mutex_lock(topicSyncResult->mutex);
533 NSCacheTopicSubData * topicSubData =
534 (NSCacheTopicSubData *) topicSyncResult->topicData;
536 if (NSProviderDeleteConsumerTopic(consumerTopicList, topicSubData) == NS_OK)
538 NSSendTopicUpdationToConsumer(topicSubData->id);
539 topicSyncResult->result = NS_OK;
542 NSOICFree(topicSubData->topicName);
543 NSOICFree(topicSubData);
544 pthread_cond_signal(topicSyncResult->condition);
545 pthread_mutex_unlock(topicSyncResult->mutex);
549 case TASK_REGISTER_TOPIC:
551 NS_LOG(DEBUG, "CASE TASK_ADD_TOPIC : ");
552 NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
554 pthread_mutex_lock(topicSyncResult->mutex);
555 topicSyncResult->result = NSRegisterTopic(
556 (const char *) topicSyncResult->topicData);
557 pthread_cond_signal(topicSyncResult->condition);
558 pthread_mutex_unlock(topicSyncResult->mutex);
561 case TASK_UNREGISTER_TOPIC:
563 NS_LOG(DEBUG, "CASE_TASK_DELETE_TOPIC : ");
564 NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
565 pthread_mutex_lock(topicSyncResult->mutex);
566 topicSyncResult->result = NSUnregisterTopic(
567 (const char *) topicSyncResult->topicData);
568 pthread_cond_signal(topicSyncResult->condition);
569 pthread_mutex_unlock(topicSyncResult->mutex);
572 case TASK_POST_TOPIC:
574 NS_LOG(DEBUG, "TASK_POST_TOPIC : ");
575 NSPostConsumerTopics((OCEntityHandlerRequest*) node->taskData);
576 NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);
579 case TASK_GET_TOPICS:
581 NS_LOG(DEBUG, "TASK_GET_TOPICS : ");
582 NSTopicSync * topicSync = (NSTopicSync *) node->taskData;
583 pthread_mutex_lock(topicSync->mutex);
584 NSTopicLL * topics = NSProviderGetTopicsCacheData(registeredTopicList);
585 topicSync->topics = topics;
586 pthread_cond_signal(topicSync->condition);
587 pthread_mutex_unlock(topicSync->mutex);
590 case TAST_GET_CONSUMER_TOPICS:
592 NS_LOG(DEBUG, "TASK_GET_CONSUMER_TOPICS : ");
593 NSTopicSync * topicSync = (NSTopicSync *) node->taskData;
594 pthread_mutex_lock(topicSync->mutex);
595 NSTopicLL * topics = NSProviderGetConsumerTopicsCacheData(registeredTopicList,
596 consumerTopicList, topicSync->consumerId);
597 topicSync->topics = topics;
598 pthread_cond_signal(topicSync->condition);
599 pthread_mutex_unlock(topicSync->mutex);
609 pthread_mutex_unlock(&NSMutex[TOPIC_SCHEDULER]);
612 NS_LOG(DEBUG, "Destroy NSTopicSchedule");