1 //******************************************************************
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
22 #include "NSConstants.h"
23 #include "NSConsumerCommon.h"
24 #include "NSConsumerMQPlugin.h"
27 #include "oic_malloc.h"
28 #include "oic_string.h"
29 #include "ocpayload.h"
31 void NSHandleMQSubscription(NSMQTopicAddress * address);
33 OCStackApplicationResult NSConsumerIntrospectMQTopic(
34 void * ctx, OCDoHandle handle, OCClientResponse * clientResponse);
36 OCStackApplicationResult NSConsumerMQListener(
37 void * ctx, OCDoHandle handle, OCClientResponse * clientResponse);
39 void NSConsumerMQTaskProcessing(NSTask * task)
41 NS_VERIFY_NOT_NULL_V(task);
43 NS_LOG_V(DEBUG, "Receive Event : %d", (int) task->taskType);
45 switch (task->taskType)
47 case TASK_MQ_REQ_SUBSCRIBE:
49 NSMQTopicAddress * mqTopic = task->taskData;
50 NSHandleMQSubscription(mqTopic);
56 NS_LOG(ERROR, "Unknown type of task");
64 void NSHandleMQSubscription(NSMQTopicAddress * topicAddr)
66 char * serverUri = topicAddr->serverAddr;
67 char * topicName = topicAddr->topicName;
69 OCDevAddr * addr = NSChangeAddress(serverUri);
70 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(addr,
72 NSOICFree(topicAddr->serverAddr);
73 NSOICFree(topicAddr->topicName);
76 char requestUri[100] = "coap+tcp://";
77 OICStrcat(requestUri, strlen(requestUri)+strlen(serverUri)+1, serverUri);
78 OICStrcat(requestUri, strlen(requestUri)+ strlen("/oic/ps") + 1, "/oic/ps");
79 NS_LOG_V(DEBUG, "requestUri = %s", requestUri);
81 OCStackResult ret = NSInvokeRequest(NULL, OC_REST_GET, addr, requestUri, NULL,
82 NSConsumerIntrospectMQTopic, topicName, OICFree, CT_DEFAULT);
83 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(NSOCResultToSuccess(ret) == true ? (void *)1 : NULL,
92 OCStackApplicationResult NSConsumerIntrospectMQTopic(
93 void * ctx, OCDoHandle handle, OCClientResponse * clientResponse)
97 if (!NSIsStartedConsumer())
99 return OC_STACK_DELETE_TRANSACTION;
102 NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION);
103 NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION);
104 NS_VERIFY_NOT_NULL(clientResponse->payload, OC_STACK_KEEP_TRANSACTION);
106 NS_LOG(DEBUG, "income get response of MQ broker");
107 NS_LOG_V(INFO_PRIVATE, "MQ GET response income : %s:%d",
108 clientResponse->devAddr.addr, clientResponse->devAddr.port);
109 NS_LOG_V(DEBUG, "MQ GET response result : %d",
110 clientResponse->result);
111 NS_LOG_V(DEBUG, "MQ GET response sequenceNum : %d",
112 clientResponse->sequenceNumber);
113 NS_LOG_V(DEBUG, "MQ GET response resource uri : %s",
114 clientResponse->resourceUri);
115 NS_LOG_V(DEBUG, "MQ GET response Transport Type : %d",
116 clientResponse->devAddr.adapter);
118 char ** topicList = NULL;
119 size_t dimensions[MAX_REP_ARRAY_DEPTH] = {0};
120 OCRepPayloadGetStringArray((OCRepPayload *) clientResponse->payload,
121 NS_ATTIRBUTE_MQ_TOPICLIST, & topicList, dimensions);
123 char * interestTopicName = (char *) ctx;
124 for (size_t i = 0; i < dimensions[0]; ++i)
126 NS_LOG_V(DEBUG, "found MQ topic : %s", topicList[i]);
127 if (!strcmp(topicList[i], interestTopicName))
129 NS_LOG(DEBUG, "subscribe to MQ notification");
131 OCStackResult ret = NSInvokeRequest(NULL,
132 OC_REST_OBSERVE, clientResponse->addr, topicList[i], NULL,
133 NSConsumerMQListener, NULL, NULL, CT_DEFAULT);
135 if (!NSOCResultToSuccess(ret))
137 NS_LOG(DEBUG, "fail to subscribe to MQ notification");
144 return OC_STACK_KEEP_TRANSACTION;
147 OCStackApplicationResult NSConsumerMQListener(
148 void * ctx, OCDoHandle handle, OCClientResponse * clientResponse)
153 if (!NSIsStartedConsumer())
155 return OC_STACK_DELETE_TRANSACTION;
158 NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION);
159 NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION);
160 NS_VERIFY_NOT_NULL(clientResponse->payload, OC_STACK_KEEP_TRANSACTION);
162 NS_LOG(DEBUG, "income observe response of MQ notification");
163 NS_LOG_V(INFO_PRIVATE, "MQ OBS response income : %s:%d",
164 clientResponse->devAddr.addr, clientResponse->devAddr.port);
165 NS_LOG_V(DEBUG, "MQ OBS response result : %d",
166 clientResponse->result);
167 NS_LOG_V(DEBUG, "MQ OBS response sequenceNum : %d",
168 clientResponse->sequenceNumber);
169 NS_LOG_V(DEBUG, "MQ OBS response resource uri : %s",
170 clientResponse->resourceUri);
171 NS_LOG_V(DEBUG, "MQ OBS response Transport Type : %d",
172 clientResponse->devAddr.adapter);
174 NSMessage * newMsg = NSGetMessage((OCRepPayload *)clientResponse->payload);
175 NS_VERIFY_NOT_NULL(newMsg, OC_STACK_KEEP_TRANSACTION);
177 NSTask * task = NULL;
179 if (newMsg->type == NS_MESSAGE_READ || newMsg->type == NS_MESSAGE_DELETED)
181 NSSyncInfo * syncInfo = (NSSyncInfo *)OICMalloc(sizeof(NSSyncInfo));
182 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(syncInfo,
183 OC_STACK_KEEP_TRANSACTION, NSRemoveMessage(newMsg));
185 syncInfo->messageId = newMsg->messageId;
186 syncInfo->state = (newMsg->type == NS_MESSAGE_READ) ? NS_SYNC_READ : NS_SYNC_DELETED;
187 OICStrcpy(syncInfo->providerId, sizeof(char) * NS_DEVICE_ID_LENGTH, newMsg->providerId);
189 NSRemoveMessage(newMsg);
191 NS_LOG(DEBUG, "build NSTask for MQ message sync");
192 task = NSMakeTask(TASK_RECV_SYNCINFO, (void *) syncInfo);
193 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(task, OC_STACK_KEEP_TRANSACTION, NSOICFree(syncInfo));
197 NS_LOG(DEBUG, "build NSTask for MQ message receive");
198 task = NSMakeTask(TASK_CONSUMER_RECV_MESSAGE, (void *) newMsg);
199 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(task, OC_STACK_KEEP_TRANSACTION,
200 NSRemoveMessage(newMsg));
203 NSConsumerPushEvent(task);
205 return OC_STACK_KEEP_TRANSACTION;