1 //******************************************************************
\r
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
\r
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
\r
7 // Licensed under the Apache License, Version 2.0 (the "License");
\r
8 // you may not use this file except in compliance with the License.
\r
9 // You may obtain a copy of the License at
\r
11 // http://www.apache.org/licenses/LICENSE-2.0
\r
13 // Unless required by applicable law or agreed to in writing, software
\r
14 // distributed under the License is distributed on an "AS IS" BASIS,
\r
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 // See the License for the specific language governing permissions and
\r
17 // limitations under the License.
\r
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
\r
21 #include "NSProviderTopic.h"
\r
22 #include "oic_string.h"
\r
23 #include "oic_malloc.h"
\r
25 NSResult NSSendTopicUpdation();
\r
27 NSResult NSInitTopicList()
\r
29 NS_LOG(DEBUG, "NSInitTopicList - IN");
\r
30 consumerTopicList = NSStorageCreate();
\r
31 consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;
\r
33 registeredTopicList = NSStorageCreate();
\r
34 registeredTopicList->cacheType = NS_PROVIDER_CACHE_REGISTER_TOPIC;
\r
36 NS_LOG(DEBUG, "NSInitTopicList - OUT");
\r
40 NSResult NSAddTopics(const char * topicName)
\r
42 NS_LOG(DEBUG, "NSWriteTopicsToStorage()");
\r
44 NSCacheTopicData * data = (NSCacheTopicData *)OICMalloc(sizeof(NSCacheTopicData));
\r
45 data->topicName = topicName;
\r
46 data->state = NS_TOPIC_UNSUBSCRIBED;
\r
48 NSCacheElement * element = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
\r
49 element->data = (void *) data;
\r
50 element->next = NULL;
\r
52 if(NSStorageWrite(registeredTopicList, element) != NS_OK)
\r
54 NS_LOG(DEBUG, "fail to write cache");
\r
56 NSSendTopicUpdation();
\r
58 NS_LOG(DEBUG, "NSWriteTopicsToStorage() NS_OK");
\r
62 NSResult NSDeleteTopics(const char * topicName)
\r
64 NS_LOG(DEBUG, "NSDeleteTopics()");
\r
68 NS_LOG(ERROR, "topicName is NULL");
\r
72 NSStorageDelete(registeredTopicList, topicName);
\r
73 while(NSStorageDelete(consumerTopicList, topicName) != NS_FAIL);
\r
77 NSResult NSSendTopicUpdation()
\r
79 NS_LOG(DEBUG, "NSSendTopicUpdation - IN");
\r
81 OCRepPayload* payload = OCRepPayloadCreate();
\r
85 NS_LOG(ERROR, "fail to create playload");
\r
89 OCResourceHandle rHandle = NULL;
\r
90 if (NSPutMessageResource(NULL, &rHandle) != NS_OK)
\r
92 NS_LOG(ERROR, "Fail to put message resource");
\r
96 OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);
\r
97 OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);
\r
98 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
\r
100 OCObservationId obArray[255] = { 0, };
\r
103 NSCacheElement * it = consumerSubList->head;
\r
107 NSCacheSubData * subData = (NSCacheSubData *) it->data;
\r
109 if (subData->isWhite)
\r
111 if(subData->messageObId != 0)
\r
113 obArray[obCount++] = subData->messageObId;
\r
117 if(subData->remote_messageObId != 0)
\r
119 obArray[obCount++] = subData->remote_messageObId;
\r
129 NS_LOG(ERROR, "observer count is zero");
\r
133 if (OCNotifyListOfObservers(rHandle, obArray, obCount, payload, OC_HIGH_QOS)
\r
136 NS_LOG(ERROR, "fail to send topic updation");
\r
137 OCRepPayloadDestroy(payload);
\r
141 OCRepPayloadDestroy(payload);
\r
143 NS_LOG(DEBUG, "NSSendTopicUpdation - OUT");
\r
147 NSResult NSSendTopicUpdationToConsumer(char *consumerId)
\r
149 NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - IN");
\r
151 OCRepPayload* payload = OCRepPayloadCreate();
\r
155 NS_LOG(ERROR, "fail to create playload");
\r
159 OCResourceHandle rHandle = NULL;
\r
160 if (NSPutMessageResource(NULL, &rHandle) != NS_OK)
\r
162 NS_LOG(ERROR, "Fail to put message resource");
\r
166 OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);
\r
167 OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);
\r
168 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
\r
170 NSCacheElement * element = NSStorageRead(consumerSubList, consumerId);
\r
172 if(element == NULL)
\r
174 NS_LOG(ERROR, "element is NULL");
\r
178 NSCacheSubData * subData = (NSCacheSubData*) element->data;
\r
180 if (OCNotifyListOfObservers(rHandle, (OCObservationId*)&subData->messageObId, 1, payload, OC_HIGH_QOS)
\r
183 NS_LOG(ERROR, "fail to send topic updation");
\r
184 OCRepPayloadDestroy(payload);
\r
189 OCRepPayloadDestroy(payload);
\r
191 NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - OUT");
\r
195 NSResult NSSendTopicList(OCEntityHandlerRequest * entityHandlerRequest)
\r
197 NS_LOG(DEBUG, "NSSendTopicList - IN");
\r
199 char * id = NSGetValueFromQuery(OICStrdup(entityHandlerRequest->query), NS_QUERY_CONSUMER_ID);
\r
200 NSTopicLL * topics = NULL;
\r
201 NSCacheElement * currList = NULL;
\r
205 NS_LOG(DEBUG, "Send registered topic list");
\r
206 topics = NSProviderGetTopicsCacheData(registeredTopicList);
\r
207 currList = registeredTopicList->head;
\r
211 NS_LOG(DEBUG, "Send subscribed topic list to consumer");
\r
212 topics = NSProviderGetConsumerTopicsCacheData(registeredTopicList, consumerTopicList, id);
\r
213 currList = consumerTopicList->head;
\r
218 NS_LOG(DEBUG, "currList is NULL");
\r
222 // make response for the Get Request
\r
223 OCEntityHandlerResponse response;
\r
224 response.numSendVendorSpecificHeaderOptions = 0;
\r
225 memset(response.sendVendorSpecificHeaderOptions, 0,
\r
226 sizeof response.sendVendorSpecificHeaderOptions);
\r
227 memset(response.resourceUri, 0, sizeof response.resourceUri);
\r
229 OCRepPayload* payload = OCRepPayloadCreate();
\r
232 NS_LOG(ERROR, "payload is NULL");
\r
236 // set topics to the array of resource property
\r
238 NSCacheElement * iter = currList;
\r
239 size_t dimensionSize = (size_t)NSProviderGetListSize(iter);
\r
241 NS_LOG_V(DEBUG, "dimensionSize = %d", dimensionSize);
\r
248 OCRepPayload** payloadTopicArray = (OCRepPayload **) OICMalloc(
\r
249 sizeof(OCRepPayload *) * dimensionSize);
\r
251 size_t dimensions[3] = {dimensionSize, 0, 0};
\r
252 for (int i = 0; i < (int)dimensionSize; i++)
\r
254 NSTopicLL * topic = (NSTopicLL *) iter->data;
\r
256 NS_LOG_V(DEBUG, "topicName = %s", topic->topicName);
\r
257 NS_LOG_V(DEBUG, "topicState = %d",(int) topic->state);
\r
259 payloadTopicArray[i] = OCRepPayloadCreate();
\r
260 OCRepPayloadSetPropString(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_NAME, topic->topicName);
\r
261 OCRepPayloadSetPropInt(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_SELECTION,
\r
262 (int)topic->state);
\r
267 OCRepPayloadSetUri(payload, NS_COLLECTION_TOPIC_URI);
\r
270 OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, id);
\r
272 OCRepPayloadSetPropObjectArray(payload, NS_ATTRIBUTE_TOPIC_LIST,
\r
273 (const OCRepPayload**)(payloadTopicArray), dimensions);
\r
275 response.requestHandle = entityHandlerRequest->requestHandle;
\r
276 response.resourceHandle = entityHandlerRequest->resource;
\r
277 response.persistentBufferFlag = 0;
\r
278 response.ehResult = OC_EH_OK;
\r
279 response.payload = (OCPayload *) payload;
\r
281 if (OCDoResponse(&response) != OC_STACK_OK)
\r
283 NS_LOG(ERROR, "Fail to response topic list");
\r
286 OCRepPayloadDestroy(payload);
\r
288 NS_LOG(DEBUG, "NSSendTopicList - OUT");
\r
292 NSResult NSPostConsumerTopics(OCEntityHandlerRequest * entityHandlerRequest)
\r
294 NS_LOG(DEBUG, "NSPostConsumerTopics() - IN");
\r
296 char * consumerId = NULL;
\r
297 OCRepPayload * payload = entityHandlerRequest->payload;
\r
298 OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, &consumerId);
\r
302 NS_LOG(DEBUG, "Invalid consumer ID");
\r
306 NS_LOG_V(DEBUG, "TOPIC consumer ID = %s", consumerId);
\r
308 consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_CID;
\r
309 while(NSStorageDelete(consumerTopicList, consumerId) != NS_FAIL);
\r
310 consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;
\r
312 OCRepPayload ** topicListPayload = NULL;
\r
313 OCRepPayloadValue * payloadValue = NULL;
\r
314 payloadValue = NSPayloadFindValue(payload, NS_ATTRIBUTE_TOPIC_LIST);
\r
315 size_t dimensionSize = calcDimTotal(payloadValue->arr.dimensions);
\r
316 size_t dimensions[3] = {dimensionSize, 0, 0};
\r
317 OCRepPayloadGetPropObjectArray(payload, NS_ATTRIBUTE_TOPIC_LIST, & topicListPayload, dimensions);
\r
319 for(int i = 0; i <(int)dimensionSize; i++)
\r
321 char * topicName = NULL;
\r
322 int64_t topicState = 0;
\r
324 OCRepPayloadGetPropString(topicListPayload[i], NS_ATTRIBUTE_TOPIC_NAME, &topicName);
\r
325 OCRepPayloadGetPropInt(topicListPayload[i], NS_ATTRIBUTE_TOPIC_SELECTION, &topicState);
\r
326 NS_LOG_V(DEBUG, "Topic Name(state): %s(%d)", topicName, topicState);
\r
328 if(NS_TOPIC_SUBSCRIBED == (NSTopicState)topicState)
\r
330 NSCacheTopicSubData * topicSubData = (NSCacheTopicSubData *)
\r
331 OICMalloc(sizeof(NSCacheTopicSubData));
\r
333 OICStrcpy(topicSubData->id, NS_UUID_STRING_SIZE, consumerId);
\r
334 topicSubData->topicName = OICStrdup(topicName);
\r
336 NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
\r
337 newObj->data = (NSCacheData *) topicSubData;
\r
338 newObj->next = NULL;
\r
339 NSStorageWrite(consumerTopicList, newObj);
\r
343 NS_LOG(DEBUG, "NSPostConsumerTopics() - OUT");
\r
347 void * NSTopicSchedule(void * ptr)
\r
351 NS_LOG(DEBUG, "Create NSTopicSchedule");
\r
354 while (NSIsRunning[TOPIC_SCHEDULER])
\r
356 sem_wait(&NSSemaphore[TOPIC_SCHEDULER]);
\r
357 pthread_mutex_lock(&NSMutex[TOPIC_SCHEDULER]);
\r
359 if (NSHeadMsg[TOPIC_SCHEDULER] != NULL)
\r
361 NSTask *node = NSHeadMsg[TOPIC_SCHEDULER];
\r
362 NSHeadMsg[TOPIC_SCHEDULER] = node->nextTask;
\r
364 switch (node->taskType)
\r
366 case TASK_SEND_TOPICS:
\r
367 NS_LOG(DEBUG, "CASE TASK_SEND_TOPICS : ");
\r
368 NSSendTopicList((OCEntityHandlerRequest*) node->taskData);
\r
369 NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);
\r
371 case TASK_SUBSCRIBE_TOPIC:
\r
372 NS_LOG(DEBUG, "CASE TASK_SUBSCRIBE_TOPIC : ");
\r
373 NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
\r
374 newObj->data = node->taskData;
\r
375 newObj->next = NULL;
\r
376 NSStorageWrite(consumerTopicList, newObj);
\r
378 case TASK_UNSUBSCRIBE_TOPIC:
\r
379 NS_LOG(DEBUG, "CASE TASK_SUBSCRIBE_TOPIC : ");
\r
380 NSProviderDeleteConsumerTopic(consumerTopicList,
\r
381 (NSCacheTopicSubData *) node->taskData);
\r
382 NS_LOG(DEBUG, "CASE TASK_SUBSCRIBE_TOPIC AFter: ");
\r
384 case TASK_ADD_TOPIC:
\r
386 NS_LOG(DEBUG, "CASE TASK_ADD_TOPIC : ");
\r
387 NSAddTopics((const char *) node->taskData);
\r
390 case TASK_DELETE_TOPIC:
\r
392 NS_LOG(DEBUG, "CASE_TASK_DELETE_TOPIC : ");
\r
393 NSDeleteTopics((const char *) node->taskData);
\r
396 case TASK_POST_TOPIC:
\r
398 NS_LOG(DEBUG, "TASK_POST_TOPIC : ");
\r
399 NSPostConsumerTopics((OCEntityHandlerRequest*) node->taskData);
\r
400 NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);
\r
410 pthread_mutex_unlock(&NSMutex[TOPIC_SCHEDULER]);
\r
413 NS_LOG(DEBUG, "Destroy NSTopicSchedule");
\r