1 //******************************************************************
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "NSProviderTopic.h"
22 #include "oic_string.h"
23 #include "oic_malloc.h"
26 NSCacheList * consumerTopicList;
27 NSCacheList * registeredTopicList;
29 NSResult NSSendTopicUpdation();
31 NSResult NSInitTopicList()
33 NS_LOG(DEBUG, "NSInitTopicList - IN");
35 consumerTopicList = NSProviderStorageCreate();
36 NS_VERIFY_NOT_NULL(consumerTopicList, NS_FAIL);
37 consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;
39 registeredTopicList = NSProviderStorageCreate();
40 NS_VERIFY_NOT_NULL(registeredTopicList, NS_FAIL);
41 registeredTopicList->cacheType = NS_PROVIDER_CACHE_REGISTER_TOPIC;
43 NS_LOG(DEBUG, "NSInitTopicList - OUT");
47 size_t NSProviderGetTopicListSize(NSTopicLL * firstElement)
56 NSTopicLL * iter = firstElement;
67 NSResult NSRegisterTopic(const char * topicName)
69 NS_LOG(DEBUG, "NSWriteTopicsToStorage()");
71 NSCacheTopicData * data = (NSCacheTopicData *) OICMalloc(sizeof(NSCacheTopicData));
72 NS_VERIFY_NOT_NULL(data, NS_FAIL);
73 data->topicName = (char *) topicName;
74 data->state = NS_TOPIC_UNSUBSCRIBED;
76 NSCacheElement * element = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
79 NSOICFree(data->topicName);
84 element->data = (void *) data;
87 if (NSProviderStorageWrite(registeredTopicList, element) != NS_OK)
89 NS_LOG(DEBUG, "fail to write cache");
93 NSSendTopicUpdation();
94 NS_LOG(DEBUG, "NSWriteTopicsToStorage() NS_OK");
98 NSResult NSUnregisterTopic(const char * topicName)
100 NS_LOG(DEBUG, "NSDeleteTopics()");
101 NSResult result = NS_OK;
105 NS_LOG(ERROR, "topicName is NULL");
109 result = NSProviderStorageDelete(registeredTopicList, topicName);
111 while (NSProviderStorageDelete(consumerTopicList, topicName) != NS_FAIL)
117 NSSendTopicUpdation();
123 NSResult NSSendTopicUpdation()
125 NS_LOG(DEBUG, "NSSendTopicUpdation - IN");
127 OCRepPayload* payload = OCRepPayloadCreate();
131 NS_LOG(ERROR, "fail to create playload");
135 OCResourceHandle rHandle = NULL;
136 if (NSPutMessageResource(NULL, &rHandle) != NS_OK)
138 NS_LOG(ERROR, "Fail to put message resource");
142 OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);
143 OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);
144 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
146 OCObservationId obArray[3839] =
150 NSCacheElement * it = consumerSubList->head;
154 NSCacheSubData * subData = (NSCacheSubData *) it->data;
156 if (subData->isWhite)
158 if (subData->messageObId != 0)
160 obArray[obCount++] = subData->messageObId;
163 #if (defined WITH_CLOUD)
164 if (subData->remote_messageObId != 0)
166 obArray[obCount++] = subData->remote_messageObId;
176 NS_LOG(ERROR, "observer count is zero");
180 if (OCNotifyListOfObservers(rHandle, obArray, obCount, payload, OC_HIGH_QOS) != OC_STACK_OK)
182 NS_LOG(ERROR, "fail to send topic updation");
183 OCRepPayloadDestroy(payload);
187 OCRepPayloadDestroy(payload);
189 NS_LOG(DEBUG, "NSSendTopicUpdation - OUT");
193 NSResult NSSendTopicUpdationToConsumer(char *consumerId)
195 NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - IN");
197 OCRepPayload* payload = OCRepPayloadCreate();
201 NS_LOG(ERROR, "fail to create playload");
205 OCResourceHandle rHandle = NULL;
206 if (NSPutMessageResource(NULL, &rHandle) != NS_OK)
208 NS_LOG(ERROR, "Fail to put message resource");
212 OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);
213 OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);
214 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
216 NSCacheElement * element = NSProviderStorageRead(consumerSubList, consumerId);
220 NS_LOG(ERROR, "element is NULL");
224 NSCacheSubData * subData = (NSCacheSubData*) element->data;
226 if (OCNotifyListOfObservers(rHandle, (OCObservationId*) &subData->messageObId, 1, payload,
227 OC_HIGH_QOS) != OC_STACK_OK)
229 NS_LOG(ERROR, "fail to send topic updation");
230 OCRepPayloadDestroy(payload);
234 OCRepPayloadDestroy(payload);
236 NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - OUT");
240 NSResult NSSendTopicList(OCEntityHandlerRequest * entityHandlerRequest)
242 NS_LOG(DEBUG, "NSSendTopicList - IN");
244 char * copyReq = OICStrdup(entityHandlerRequest->query);
245 char * id = NSGetValueFromQuery(copyReq, NS_QUERY_CONSUMER_ID);
246 NSTopicLL * topics = NULL;
250 NS_LOG(DEBUG, "Send registered topic list");
251 topics = NSProviderGetTopicsCacheData(registeredTopicList);
255 NS_LOG(DEBUG, "Send subscribed topic list to consumer");
256 topics = NSProviderGetConsumerTopicsCacheData(registeredTopicList, consumerTopicList, id);
259 topics = NSProviderGetTopicsCacheData(registeredTopicList);
263 // make response for the Get Request
264 OCEntityHandlerResponse response;
265 response.numSendVendorSpecificHeaderOptions = 0;
266 memset(response.sendVendorSpecificHeaderOptions, 0,
267 sizeof response.sendVendorSpecificHeaderOptions);
268 memset(response.resourceUri, 0, sizeof response.resourceUri);
270 OCRepPayload* payload = OCRepPayloadCreate();
273 NS_LOG(ERROR, "payload is NULL");
278 OCRepPayloadSetUri(payload, NS_COLLECTION_TOPIC_URI);
281 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, id);
283 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
288 NS_LOG(DEBUG, "topicList is NULL");
289 size_t dimensionSize = (size_t) NSProviderGetTopicListSize(topics);
290 NS_LOG_V(DEBUG, "dimensionSize = %d", (int)dimensionSize);
297 OCRepPayload** payloadTopicArray = (OCRepPayload **) OICMalloc(
298 sizeof(OCRepPayload *) * dimensionSize);
299 NS_VERIFY_NOT_NULL(payloadTopicArray, NS_ERROR);
301 size_t dimensions[3] = { dimensionSize, 0, 0 };
303 for (int i = 0; i < (int) dimensionSize; i++)
305 NS_LOG_V(DEBUG, "topicName = %s", topics->topicName);
306 NS_LOG_V(DEBUG, "topicState = %d",(int) topics->state);
308 payloadTopicArray[i] = OCRepPayloadCreate();
309 NS_VERIFY_NOT_NULL(payloadTopicArray[i], NS_ERROR);
310 OCRepPayloadSetPropString(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_NAME,
312 OCRepPayloadSetPropInt(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_SELECTION,
313 (int) topics->state);
315 NSTopicLL * next = topics->next;
316 NSOICFree(topics->topicName);
321 OCRepPayloadSetPropObjectArrayAsOwner(payload, NS_ATTRIBUTE_TOPIC_LIST,
322 payloadTopicArray, dimensions);
326 size_t dimensions[3] = { 0, 0, 0 };
328 OCRepPayloadSetPropObjectArrayAsOwner(payload, NS_ATTRIBUTE_TOPIC_LIST,
329 (OCRepPayload **) NULL, dimensions);
332 copyReq = OICStrdup(entityHandlerRequest->query);
333 char * reqInterface = NSGetValueFromQuery(copyReq, NS_QUERY_INTERFACE);
335 if (reqInterface && strcmp(reqInterface, NS_INTERFACE_BASELINE) == 0)
337 OCResourcePayloadAddStringLL(&payload->interfaces, NS_INTERFACE_BASELINE);
338 OCResourcePayloadAddStringLL(&payload->interfaces, NS_INTERFACE_READ);
339 OCResourcePayloadAddStringLL(&payload->types, NS_ROOT_TYPE);
343 response.requestHandle = entityHandlerRequest->requestHandle;
344 response.resourceHandle = entityHandlerRequest->resource;
345 response.persistentBufferFlag = 0;
346 response.ehResult = OC_EH_OK;
347 response.payload = (OCPayload *) payload;
349 if (OCDoResponse(&response) != OC_STACK_OK)
351 NS_LOG(ERROR, "Fail to response topic list");
352 OCRepPayloadDestroy(payload);
356 OCRepPayloadDestroy(payload);
357 NS_LOG(DEBUG, "NSSendTopicList - OUT");
361 NSResult NSPostConsumerTopics(OCEntityHandlerRequest * entityHandlerRequest)
363 NS_LOG(DEBUG, "NSPostConsumerTopics() - IN");
365 char * consumerId = NULL;
366 OCRepPayload * payload = (OCRepPayload *) entityHandlerRequest->payload;
367 OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, &consumerId);
371 NS_LOG(DEBUG, "Invalid consumer ID");
375 NS_LOG_V(INFO_PRIVATE, "TOPIC consumer ID = %s", consumerId);
377 consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_CID;
379 while (NSProviderStorageDelete(consumerTopicList, consumerId) != NS_FAIL)
383 consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;
384 OCRepPayload ** topicListPayload = NULL;
385 OCRepPayloadValue * payloadValue = NULL;
386 payloadValue = NSPayloadFindValue(payload, NS_ATTRIBUTE_TOPIC_LIST);
387 size_t dimensionSize = calcDimTotal(payloadValue->arr.dimensions);
388 size_t dimensions[3] = { dimensionSize, 0, 0 };
389 OCRepPayloadGetPropObjectArray(payload, NS_ATTRIBUTE_TOPIC_LIST, &topicListPayload, dimensions);
391 for (int i = 0; i < (int) dimensionSize; i++)
393 char * topicName = NULL;
394 int64_t topicState = 0;
396 OCRepPayloadGetPropString(topicListPayload[i], NS_ATTRIBUTE_TOPIC_NAME, &topicName);
397 if (OCRepPayloadGetPropInt(topicListPayload[i], NS_ATTRIBUTE_TOPIC_SELECTION, &topicState))
399 NS_LOG_V(DEBUG, "Topic Name(state): %s(%d)", topicName, (int)topicState);
402 if (NS_TOPIC_SUBSCRIBED == (NSTopicState) topicState)
404 NSCacheTopicSubData * topicSubData = (NSCacheTopicSubData *) OICMalloc(
405 sizeof(NSCacheTopicSubData));
406 NS_VERIFY_NOT_NULL(topicSubData, NS_FAIL);
408 OICStrcpy(topicSubData->id, NS_UUID_STRING_SIZE, consumerId);
409 topicSubData->topicName = topicName;
411 NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
415 NSOICFree(topicSubData->topicName);
416 NSOICFree(topicSubData);
417 NSOICFree(consumerId);
420 for (size_t k = 0; k < dimensionSize; k++)
422 OCRepPayloadDestroy(topicListPayload[k]);
424 OICFree(topicListPayload);
429 newObj->data = (NSCacheData *) topicSubData;
432 if (NS_OK != NSProviderStorageWrite(consumerTopicList, newObj))
434 NS_LOG(ERROR, "Fail to write cache");
438 NSSendTopicUpdationToConsumer(consumerId);
441 for (size_t k = 0; k < dimensionSize; k++)
443 OCRepPayloadDestroy(topicListPayload[k]);
445 OICFree(topicListPayload);
447 NSOICFree(consumerId);
448 NS_LOG(DEBUG, "NSPostConsumerTopics() - OUT");
452 void * NSTopicSchedule(void * ptr)
456 NS_LOG(DEBUG, "Create NSTopicSchedule");
459 while (NSIsRunning[TOPIC_SCHEDULER])
461 sem_wait(&NSSemaphore[TOPIC_SCHEDULER]);
462 pthread_mutex_lock(&NSMutex[TOPIC_SCHEDULER]);
464 if (NSHeadMsg[TOPIC_SCHEDULER] != NULL)
466 NSTask *node = NSHeadMsg[TOPIC_SCHEDULER];
467 NSHeadMsg[TOPIC_SCHEDULER] = node->nextTask;
469 switch (node->taskType)
471 case TASK_SEND_TOPICS:
472 NS_LOG(DEBUG, "CASE TASK_SEND_TOPICS : ");
473 NSSendTopicList((OCEntityHandlerRequest*) node->taskData);
474 NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);
476 case TASK_SUBSCRIBE_TOPIC:
478 NS_LOG(DEBUG, "CASE TASK_SUBSCRIBE_TOPIC : ");
479 NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
480 pthread_mutex_lock(topicSyncResult->mutex);
481 NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
482 NSCacheTopicSubData * subData =
483 (NSCacheTopicSubData *) topicSyncResult->topicData;
486 NSOICFree(subData->topicName);
488 pthread_cond_signal(topicSyncResult->condition);
489 pthread_mutex_unlock(topicSyncResult->mutex);
493 if (NSProviderStorageRead(registeredTopicList, subData->topicName))
495 newObj->data = topicSyncResult->topicData;
498 if (NSProviderStorageWrite(consumerTopicList, newObj) == NS_OK)
500 NSSendTopicUpdationToConsumer(subData->id);
501 topicSyncResult->result = NS_OK;
506 NSOICFree(subData->topicName);
511 pthread_cond_signal(topicSyncResult->condition);
512 pthread_mutex_unlock(topicSyncResult->mutex);
515 case TASK_UNSUBSCRIBE_TOPIC:
517 NS_LOG(DEBUG, "CASE TASK_UNSUBSCRIBE_TOPIC : ");
518 NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
519 pthread_mutex_lock(topicSyncResult->mutex);
520 NSCacheTopicSubData * topicSubData =
521 (NSCacheTopicSubData *) topicSyncResult->topicData;
523 if (NSProviderDeleteConsumerTopic(consumerTopicList, topicSubData) == NS_OK)
525 NSSendTopicUpdationToConsumer(topicSubData->id);
526 topicSyncResult->result = NS_OK;
529 NSOICFree(topicSubData->topicName);
530 NSOICFree(topicSubData);
531 pthread_cond_signal(topicSyncResult->condition);
532 pthread_mutex_unlock(topicSyncResult->mutex);
536 case TASK_REGISTER_TOPIC:
538 NS_LOG(DEBUG, "CASE TASK_ADD_TOPIC : ");
539 NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
541 pthread_mutex_lock(topicSyncResult->mutex);
542 topicSyncResult->result = NSRegisterTopic(
543 (const char *) topicSyncResult->topicData);
544 pthread_cond_signal(topicSyncResult->condition);
545 pthread_mutex_unlock(topicSyncResult->mutex);
548 case TASK_UNREGISTER_TOPIC:
550 NS_LOG(DEBUG, "CASE_TASK_DELETE_TOPIC : ");
551 NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
552 pthread_mutex_lock(topicSyncResult->mutex);
553 topicSyncResult->result = NSUnregisterTopic(
554 (const char *) topicSyncResult->topicData);
555 pthread_cond_signal(topicSyncResult->condition);
556 pthread_mutex_unlock(topicSyncResult->mutex);
559 case TASK_POST_TOPIC:
561 NS_LOG(DEBUG, "TASK_POST_TOPIC : ");
562 NSPostConsumerTopics((OCEntityHandlerRequest*) node->taskData);
563 NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);
566 case TASK_GET_TOPICS:
568 NS_LOG(DEBUG, "TASK_GET_TOPICS : ");
569 NSTopicSync * topicSync = (NSTopicSync *) node->taskData;
570 pthread_mutex_lock(topicSync->mutex);
571 NSTopicLL * topics = NSProviderGetTopicsCacheData(registeredTopicList);
572 topicSync->topics = topics;
573 pthread_cond_signal(topicSync->condition);
574 pthread_mutex_unlock(topicSync->mutex);
577 case TAST_GET_CONSUMER_TOPICS:
579 NS_LOG(DEBUG, "TASK_GET_CONSUMER_TOPICS : ");
580 NSTopicSync * topicSync = (NSTopicSync *) node->taskData;
581 pthread_mutex_lock(topicSync->mutex);
582 NSTopicLL * topics = NSProviderGetConsumerTopicsCacheData(registeredTopicList,
583 consumerTopicList, topicSync->consumerId);
584 topicSync->topics = topics;
585 pthread_cond_signal(topicSync->condition);
586 pthread_mutex_unlock(topicSync->mutex);
596 pthread_mutex_unlock(&NSMutex[TOPIC_SCHEDULER]);
599 NS_LOG(DEBUG, "Destroy NSTopicSchedule");