1 //******************************************************************
\r
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
\r
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
\r
7 // Licensed under the Apache License, Version 2.0 (the "License");
\r
8 // you may not use this file except in compliance with the License.
\r
9 // You may obtain a copy of the License at
\r
11 // http://www.apache.org/licenses/LICENSE-2.0
\r
13 // Unless required by applicable law or agreed to in writing, software
\r
14 // distributed under the License is distributed on an "AS IS" BASIS,
\r
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 // See the License for the specific language governing permissions and
\r
17 // limitations under the License.
\r
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
\r
21 #include "NSProviderTopic.h"
\r
22 #include "oic_string.h"
\r
23 #include "oic_malloc.h"
\r
24 #include <pthread.h>
\r
26 NSResult NSSendTopicUpdation();
\r
28 NSResult NSInitTopicList()
\r
30 NS_LOG(DEBUG, "NSInitTopicList - IN");
\r
32 consumerTopicList = NSProviderStorageCreate();
\r
33 NS_VERIFY_NOT_NULL(consumerTopicList, NS_FAIL);
\r
34 consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;
\r
36 registeredTopicList = NSProviderStorageCreate();
\r
37 NS_VERIFY_NOT_NULL(registeredTopicList, NS_FAIL);
\r
38 registeredTopicList->cacheType = NS_PROVIDER_CACHE_REGISTER_TOPIC;
\r
40 NS_LOG(DEBUG, "NSInitTopicList - OUT");
\r
44 size_t NSProviderGetTopicListSize(NSTopicLL * firstElement)
\r
53 NSTopicLL * iter = firstElement;
\r
64 NSResult NSRegisterTopic(const char * topicName)
\r
66 NS_LOG(DEBUG, "NSWriteTopicsToStorage()");
\r
68 NSCacheTopicData * data = (NSCacheTopicData *) OICMalloc(sizeof(NSCacheTopicData));
\r
69 NS_VERIFY_NOT_NULL(data, NS_FAIL);
\r
70 data->topicName = (char *) topicName;
\r
71 data->state = NS_TOPIC_UNSUBSCRIBED;
\r
73 NSCacheElement * element = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
\r
76 OICFree(data->topicName);
\r
80 element->data = (void *) data;
\r
81 element->next = NULL;
\r
83 if(NSProviderStorageWrite(registeredTopicList, element) != NS_OK)
\r
85 NS_LOG(DEBUG, "fail to write cache");
\r
88 NSSendTopicUpdation();
\r
90 NS_LOG(DEBUG, "NSWriteTopicsToStorage() NS_OK");
\r
94 NSResult NSUnregisterTopic(const char * topicName)
\r
96 NS_LOG(DEBUG, "NSDeleteTopics()");
\r
97 NSResult result = NS_OK;
\r
101 NS_LOG(ERROR, "topicName is NULL");
\r
105 result = NSProviderStorageDelete(registeredTopicList, topicName);
\r
106 while (NSProviderStorageDelete(consumerTopicList, topicName) != NS_FAIL)
\r
110 if (result == NS_OK)
\r
112 NSSendTopicUpdation();
\r
118 NSResult NSSendTopicUpdation()
\r
120 NS_LOG(DEBUG, "NSSendTopicUpdation - IN");
\r
122 OCRepPayload* payload = OCRepPayloadCreate();
\r
126 NS_LOG(ERROR, "fail to create playload");
\r
130 OCResourceHandle rHandle = NULL;
\r
131 if (NSPutMessageResource(NULL, &rHandle) != NS_OK)
\r
133 NS_LOG(ERROR, "Fail to put message resource");
\r
137 OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);
\r
138 OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);
\r
139 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
\r
141 OCObservationId obArray[255] =
\r
145 NSCacheElement * it = consumerSubList->head;
\r
149 NSCacheSubData * subData = (NSCacheSubData *) it->data;
\r
151 if (subData->isWhite)
\r
153 if (subData->messageObId != 0)
\r
155 obArray[obCount++] = subData->messageObId;
\r
158 #if(defined WITH_CLOUD && defined RD_CLIENT)
\r
159 if(subData->remote_messageObId != 0)
\r
161 obArray[obCount++] = subData->remote_messageObId;
\r
170 NS_LOG(ERROR, "observer count is zero");
\r
174 if (OCNotifyListOfObservers(rHandle, obArray, obCount, payload, OC_HIGH_QOS) != OC_STACK_OK)
\r
176 NS_LOG(ERROR, "fail to send topic updation");
\r
177 OCRepPayloadDestroy(payload);
\r
181 OCRepPayloadDestroy(payload);
\r
183 NS_LOG(DEBUG, "NSSendTopicUpdation - OUT");
\r
187 NSResult NSSendTopicUpdationToConsumer(char *consumerId)
\r
189 NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - IN");
\r
191 OCRepPayload* payload = OCRepPayloadCreate();
\r
195 NS_LOG(ERROR, "fail to create playload");
\r
199 OCResourceHandle rHandle = NULL;
\r
200 if (NSPutMessageResource(NULL, &rHandle) != NS_OK)
\r
202 NS_LOG(ERROR, "Fail to put message resource");
\r
206 OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);
\r
207 OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);
\r
208 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
\r
210 NSCacheElement * element = NSProviderStorageRead(consumerSubList, consumerId);
\r
212 if (element == NULL)
\r
214 NS_LOG(ERROR, "element is NULL");
\r
218 NSCacheSubData * subData = (NSCacheSubData*) element->data;
\r
220 if (OCNotifyListOfObservers(rHandle, (OCObservationId*) &subData->messageObId, 1, payload,
\r
221 OC_HIGH_QOS) != OC_STACK_OK)
\r
223 NS_LOG(ERROR, "fail to send topic updation");
\r
224 OCRepPayloadDestroy(payload);
\r
228 OCRepPayloadDestroy(payload);
\r
230 NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - OUT");
\r
234 NSResult NSSendTopicList(OCEntityHandlerRequest * entityHandlerRequest)
\r
236 NS_LOG(DEBUG, "NSSendTopicList - IN");
\r
238 char * copyReq = OICStrdup(entityHandlerRequest->query);
\r
239 char * id = NSGetValueFromQuery(copyReq, NS_QUERY_CONSUMER_ID);
\r
240 NSTopicLL * topics = NULL;
\r
244 NS_LOG(DEBUG, "Send registered topic list");
\r
245 topics = NSProviderGetTopicsCacheData(registeredTopicList);
\r
249 NS_LOG(DEBUG, "Send subscribed topic list to consumer");
\r
250 topics = NSProviderGetConsumerTopicsCacheData(registeredTopicList, consumerTopicList, id);
\r
253 topics = NSProviderGetTopicsCacheData(registeredTopicList);
\r
257 // make response for the Get Request
\r
258 OCEntityHandlerResponse response;
\r
259 response.numSendVendorSpecificHeaderOptions = 0;
\r
260 memset(response.sendVendorSpecificHeaderOptions, 0,
\r
261 sizeof response.sendVendorSpecificHeaderOptions);
\r
262 memset(response.resourceUri, 0, sizeof response.resourceUri);
\r
264 OCRepPayload* payload = OCRepPayloadCreate();
\r
267 NS_LOG(ERROR, "payload is NULL");
\r
272 OCRepPayloadSetUri(payload, NS_COLLECTION_TOPIC_URI);
\r
275 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, id);
\r
277 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
\r
282 NS_LOG(DEBUG, "topicList is NULL");
\r
283 size_t dimensionSize = (size_t) NSProviderGetTopicListSize(topics);
\r
284 NS_LOG_V(DEBUG, "dimensionSize = %d", (int)dimensionSize);
\r
286 if (!dimensionSize)
\r
291 OCRepPayload** payloadTopicArray = (OCRepPayload **) OICMalloc(
\r
292 sizeof(OCRepPayload *) * dimensionSize);
\r
293 NS_VERIFY_NOT_NULL(payloadTopicArray, NS_ERROR);
\r
295 size_t dimensions[3] = { dimensionSize, 0, 0 };
\r
297 for (int i = 0; i < (int) dimensionSize; i++)
\r
299 NS_LOG_V(DEBUG, "topicName = %s", topics->topicName);
\r
300 NS_LOG_V(DEBUG, "topicState = %d",(int) topics->state);
\r
302 payloadTopicArray[i] = OCRepPayloadCreate();
\r
303 NS_VERIFY_NOT_NULL(payloadTopicArray[i], NS_ERROR);
\r
304 OCRepPayloadSetPropString(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_NAME,
\r
305 topics->topicName);
\r
306 OCRepPayloadSetPropInt(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_SELECTION,
\r
307 (int) topics->state);
\r
309 NSTopicLL * next = topics->next;
\r
314 OCRepPayloadSetPropObjectArray(payload, NS_ATTRIBUTE_TOPIC_LIST,
\r
315 (const OCRepPayload**) (payloadTopicArray), dimensions);
\r
319 size_t dimensions[3] = { 0, 0, 0 };
\r
321 OCRepPayloadSetPropObjectArrayAsOwner(payload, NS_ATTRIBUTE_TOPIC_LIST,
\r
322 (OCRepPayload **) NULL, dimensions);
\r
325 copyReq = OICStrdup(entityHandlerRequest->query);
\r
326 char * reqInterface = NSGetValueFromQuery(copyReq, NS_QUERY_INTERFACE);
\r
328 if (reqInterface && strcmp(reqInterface, NS_INTERFACE_BASELINE) == 0)
\r
330 OCResourcePayloadAddStringLL(&payload->interfaces, NS_INTERFACE_BASELINE);
\r
331 OCResourcePayloadAddStringLL(&payload->interfaces, NS_INTERFACE_READ);
\r
332 OCResourcePayloadAddStringLL(&payload->types, NS_ROOT_TYPE);
\r
336 response.requestHandle = entityHandlerRequest->requestHandle;
\r
337 response.resourceHandle = entityHandlerRequest->resource;
\r
338 response.persistentBufferFlag = 0;
\r
339 response.ehResult = OC_EH_OK;
\r
340 response.payload = (OCPayload *) payload;
\r
342 if (OCDoResponse(&response) != OC_STACK_OK)
\r
344 NS_LOG(ERROR, "Fail to response topic list");
\r
347 OCRepPayloadDestroy(payload);
\r
349 NS_LOG(DEBUG, "NSSendTopicList - OUT");
\r
353 NSResult NSPostConsumerTopics(OCEntityHandlerRequest * entityHandlerRequest)
\r
355 NS_LOG(DEBUG, "NSPostConsumerTopics() - IN");
\r
357 char * consumerId = NULL;
\r
358 OCRepPayload * payload = (OCRepPayload *) entityHandlerRequest->payload;
\r
359 OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, &consumerId);
\r
363 NS_LOG(DEBUG, "Invalid consumer ID");
\r
367 NS_LOG_V(DEBUG, "TOPIC consumer ID = %s", consumerId);
\r
369 consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_CID;
\r
371 while (NSProviderStorageDelete(consumerTopicList, consumerId) != NS_FAIL)
\r
374 consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;
\r
376 OCRepPayload ** topicListPayload = NULL;
\r
377 OCRepPayloadValue * payloadValue = NULL;
\r
378 payloadValue = NSPayloadFindValue(payload, NS_ATTRIBUTE_TOPIC_LIST);
\r
379 size_t dimensionSize = calcDimTotal(payloadValue->arr.dimensions);
\r
380 size_t dimensions[3] = { dimensionSize, 0, 0 };
\r
381 OCRepPayloadGetPropObjectArray(payload, NS_ATTRIBUTE_TOPIC_LIST, &topicListPayload, dimensions);
\r
383 for (int i = 0; i < (int) dimensionSize; i++)
\r
385 char * topicName = NULL;
\r
386 int64_t topicState = 0;
\r
388 OCRepPayloadGetPropString(topicListPayload[i], NS_ATTRIBUTE_TOPIC_NAME, &topicName);
\r
389 OCRepPayloadGetPropInt(topicListPayload[i], NS_ATTRIBUTE_TOPIC_SELECTION, &topicState);
\r
390 NS_LOG_V(DEBUG, "Topic Name(state): %s(%d)", topicName, (int)topicState);
\r
392 if (NS_TOPIC_SUBSCRIBED == (NSTopicState) topicState)
\r
394 NSCacheTopicSubData * topicSubData = (NSCacheTopicSubData *) OICMalloc(
\r
395 sizeof(NSCacheTopicSubData));
\r
396 NS_VERIFY_NOT_NULL(topicSubData, NS_FAIL);
\r
398 OICStrcpy(topicSubData->id, NS_UUID_STRING_SIZE, consumerId);
\r
399 topicSubData->topicName = topicName;
\r
401 NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
\r
405 OICFree(topicSubData->topicName);
\r
406 OICFree(topicSubData);
\r
407 OICFree(consumerId);
\r
411 newObj->data = (NSCacheData *) topicSubData;
\r
412 newObj->next = NULL;
\r
414 NSProviderStorageWrite(consumerTopicList, newObj);
417 NSSendTopicUpdationToConsumer(consumerId);
\r
418 OICFree(consumerId);
\r
419 NS_LOG(DEBUG, "NSPostConsumerTopics() - OUT");
\r
423 void * NSTopicSchedule(void * ptr)
\r
427 NS_LOG(DEBUG, "Create NSTopicSchedule");
\r
430 while (NSIsRunning[TOPIC_SCHEDULER])
\r
432 sem_wait(&NSSemaphore[TOPIC_SCHEDULER]);
\r
433 pthread_mutex_lock(&NSMutex[TOPIC_SCHEDULER]);
\r
435 if (NSHeadMsg[TOPIC_SCHEDULER] != NULL)
\r
437 NSTask *node = NSHeadMsg[TOPIC_SCHEDULER];
\r
438 NSHeadMsg[TOPIC_SCHEDULER] = node->nextTask;
\r
440 switch (node->taskType)
\r
442 case TASK_SEND_TOPICS:
\r
443 NS_LOG(DEBUG, "CASE TASK_SEND_TOPICS : ");
\r
444 NSSendTopicList((OCEntityHandlerRequest*) node->taskData);
\r
445 NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);
\r
447 case TASK_SUBSCRIBE_TOPIC:
\r
449 NS_LOG(DEBUG, "CASE TASK_SUBSCRIBE_TOPIC : ");
\r
450 NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
\r
451 pthread_mutex_lock(topicSyncResult->mutex);
\r
452 NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
\r
453 NSCacheTopicSubData * subData =
\r
454 (NSCacheTopicSubData *) topicSyncResult->topicData;
\r
457 OICFree(subData->topicName);
\r
459 pthread_cond_signal(topicSyncResult->condition);
\r
460 pthread_mutex_unlock(topicSyncResult->mutex);
\r
464 if (NSProviderStorageRead(registeredTopicList, subData->topicName))
\r
466 newObj->data = topicSyncResult->topicData;
\r
467 newObj->next = NULL;
\r
468 NSProviderStorageWrite(consumerTopicList, newObj);
\r
469 NSSendTopicUpdationToConsumer(subData->id);
\r
470 topicSyncResult->result = NS_OK;
\r
473 pthread_cond_signal(topicSyncResult->condition);
\r
474 pthread_mutex_unlock(topicSyncResult->mutex);
\r
478 case TASK_UNSUBSCRIBE_TOPIC:
\r
480 NS_LOG(DEBUG, "CASE TASK_UNSUBSCRIBE_TOPIC : ");
\r
481 NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
\r
482 pthread_mutex_lock(topicSyncResult->mutex);
\r
483 NSCacheTopicSubData * topicSubData =
\r
484 (NSCacheTopicSubData *) topicSyncResult->topicData;
\r
485 if (NSProviderDeleteConsumerTopic(consumerTopicList, topicSubData) == NS_OK)
\r
487 NSSendTopicUpdationToConsumer(topicSubData->id);
\r
488 topicSyncResult->result = NS_OK;
\r
490 OICFree(topicSubData->topicName);
\r
491 OICFree(topicSubData);
\r
492 pthread_cond_signal(topicSyncResult->condition);
\r
493 pthread_mutex_unlock(topicSyncResult->mutex);
\r
497 case TASK_REGISTER_TOPIC:
\r
499 NS_LOG(DEBUG, "CASE TASK_ADD_TOPIC : ");
\r
500 NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
\r
502 pthread_mutex_lock(topicSyncResult->mutex);
\r
503 topicSyncResult->result = NSRegisterTopic(
\r
504 (const char *) topicSyncResult->topicData);
\r
505 pthread_cond_signal(topicSyncResult->condition);
\r
506 pthread_mutex_unlock(topicSyncResult->mutex);
\r
509 case TASK_UNREGISTER_TOPIC:
\r
511 NS_LOG(DEBUG, "CASE_TASK_DELETE_TOPIC : ");
\r
512 NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
\r
513 pthread_mutex_lock(topicSyncResult->mutex);
\r
514 topicSyncResult->result = NSUnregisterTopic(
\r
515 (const char *) topicSyncResult->topicData);
\r
516 pthread_cond_signal(topicSyncResult->condition);
\r
517 pthread_mutex_unlock(topicSyncResult->mutex);
\r
520 case TASK_POST_TOPIC:
\r
522 NS_LOG(DEBUG, "TASK_POST_TOPIC : ");
\r
523 NSPostConsumerTopics((OCEntityHandlerRequest*) node->taskData);
\r
524 NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);
\r
527 case TASK_GET_TOPICS:
\r
529 NS_LOG(DEBUG, "TASK_GET_TOPICS : ");
\r
530 NSTopicSync * topicSync = (NSTopicSync *) node->taskData;
\r
531 pthread_mutex_lock(topicSync->mutex);
\r
532 NSTopicLL * topics = NSProviderGetTopicsCacheData(registeredTopicList);
\r
533 topicSync->topics = topics;
\r
534 pthread_cond_signal(topicSync->condition);
\r
535 pthread_mutex_unlock(topicSync->mutex);
\r
538 case TAST_GET_CONSUMER_TOPICS:
\r
540 NS_LOG(DEBUG, "TASK_GET_CONSUMER_TOPICS : ");
\r
541 NSTopicSync * topicSync = (NSTopicSync *) node->taskData;
\r
542 pthread_mutex_lock(topicSync->mutex);
\r
543 NSTopicLL * topics = NSProviderGetConsumerTopicsCacheData(registeredTopicList,
\r
544 consumerTopicList, topicSync->consumerId);
\r
545 topicSync->topics = topics;
\r
546 pthread_cond_signal(topicSync->condition);
\r
547 pthread_mutex_unlock(topicSync->mutex);
\r
557 pthread_mutex_unlock(&NSMutex[TOPIC_SCHEDULER]);
\r
560 NS_LOG(DEBUG, "Destroy NSTopicSchedule");
\r