replace : iotivity -> iotivity-sec
[platform/upstream/iotivity.git] / service / notification / src / consumer / NSConsumerMQPlugin.c
1 //******************************************************************
2 //
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #ifdef WITH_MQ
22 #include "NSConstants.h"
23 #include "NSConsumerCommon.h"
24 #include "NSConsumerMQPlugin.h"
25 #include "NSUtil.h"
26
27 #include "oic_malloc.h"
28 #include "oic_string.h"
29 #include "ocpayload.h"
30
31 void NSHandleMQSubscription(NSMQTopicAddress * address);
32
33 OCStackApplicationResult NSConsumerIntrospectMQTopic(
34         void * ctx, OCDoHandle handle, OCClientResponse * clientResponse);
35
36 OCStackApplicationResult NSConsumerMQListener(
37         void * ctx, OCDoHandle handle, OCClientResponse * clientResponse);
38
39 void NSConsumerMQTaskProcessing(NSTask * task)
40 {
41     NS_VERIFY_NOT_NULL_V(task);
42
43     NS_LOG_V(DEBUG, "Receive Event : %d", (int) task->taskType);
44
45     switch (task->taskType)
46     {
47         case TASK_MQ_REQ_SUBSCRIBE:
48         {
49             NSMQTopicAddress * mqTopic = task->taskData;
50             NSHandleMQSubscription(mqTopic);
51             NSOICFree(mqTopic);
52             break;
53         }
54         default:
55         {
56             NS_LOG(ERROR, "Unknown type of task");
57             break;
58         }
59     }
60
61     NSOICFree(task);
62 }
63
64 void NSHandleMQSubscription(NSMQTopicAddress * topicAddr)
65 {
66     char * serverUri = topicAddr->serverAddr;
67     char * topicName = topicAddr->topicName;
68
69     OCDevAddr * addr = NSChangeAddress(serverUri);
70     NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(addr,
71                     {
72                         NSOICFree(topicAddr->serverAddr);
73                         NSOICFree(topicAddr->topicName);
74                     });
75
76     char requestUri[100] = "coap+tcp://";
77     OICStrcat(requestUri, strlen(requestUri)+strlen(serverUri)+1, serverUri);
78     OICStrcat(requestUri, strlen(requestUri)+ strlen("/oic/ps") + 1, "/oic/ps");
79     NS_LOG_V(DEBUG, "requestUri = %s", requestUri);
80
81     OCStackResult ret = NSInvokeRequest(NULL, OC_REST_GET, addr, requestUri, NULL,
82                       NSConsumerIntrospectMQTopic, topicName, OICFree, CT_DEFAULT);
83     NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(NSOCResultToSuccess(ret) == true ? (void *)1 : NULL,
84                    {
85                        NSOICFree(serverUri);
86                        NSOICFree(topicName);
87                    });
88
89     NSOICFree(serverUri);
90 }
91
92 OCStackApplicationResult NSConsumerIntrospectMQTopic(
93         void * ctx, OCDoHandle handle, OCClientResponse * clientResponse)
94 {
95     (void) handle;
96
97     if (!NSIsStartedConsumer())
98     {
99         return OC_STACK_DELETE_TRANSACTION;
100     }
101
102     NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION);
103     NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION);
104     NS_VERIFY_NOT_NULL(clientResponse->payload, OC_STACK_KEEP_TRANSACTION);
105
106     NS_LOG(DEBUG, "income get response of MQ broker");
107     NS_LOG_V(INFO_PRIVATE, "MQ GET response income : %s:%d",
108             clientResponse->devAddr.addr, clientResponse->devAddr.port);
109     NS_LOG_V(DEBUG, "MQ GET response result : %d",
110             clientResponse->result);
111     NS_LOG_V(DEBUG, "MQ GET response sequenceNum : %d",
112             clientResponse->sequenceNumber);
113     NS_LOG_V(DEBUG, "MQ GET response resource uri : %s",
114             clientResponse->resourceUri);
115     NS_LOG_V(DEBUG, "MQ GET response Transport Type : %d",
116                     clientResponse->devAddr.adapter);
117
118     char ** topicList = NULL;
119     size_t dimensions[MAX_REP_ARRAY_DEPTH] = {0};
120     OCRepPayloadGetStringArray((OCRepPayload *) clientResponse->payload,
121                                NS_ATTIRBUTE_MQ_TOPICLIST, & topicList, dimensions);
122
123     char * interestTopicName = (char *) ctx;
124     for (size_t i = 0; i < dimensions[0]; ++i)
125     {
126         NS_LOG_V(DEBUG, "found MQ topic : %s", topicList[i]);
127         if (!strcmp(topicList[i], interestTopicName))
128         {
129             NS_LOG(DEBUG, "subscribe to MQ notification");
130
131             OCStackResult ret = NSInvokeRequest(NULL,
132                                   OC_REST_OBSERVE, clientResponse->addr, topicList[i], NULL,
133                                   NSConsumerMQListener, NULL, NULL, CT_DEFAULT);
134
135             if (!NSOCResultToSuccess(ret))
136             {
137                 NS_LOG(DEBUG, "fail to subscribe to MQ notification");
138                 continue;
139             }
140         }
141     }
142
143
144     return OC_STACK_KEEP_TRANSACTION;
145 }
146
147 OCStackApplicationResult NSConsumerMQListener(
148         void * ctx, OCDoHandle handle, OCClientResponse * clientResponse)
149 {
150     (void) ctx;
151     (void) handle;
152
153     if (!NSIsStartedConsumer())
154     {
155         return OC_STACK_DELETE_TRANSACTION;
156     }
157
158     NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION);
159     NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION);
160     NS_VERIFY_NOT_NULL(clientResponse->payload, OC_STACK_KEEP_TRANSACTION);
161
162     NS_LOG(DEBUG, "income observe response of MQ notification");
163     NS_LOG_V(INFO_PRIVATE, "MQ OBS response income : %s:%d",
164             clientResponse->devAddr.addr, clientResponse->devAddr.port);
165     NS_LOG_V(DEBUG, "MQ OBS response result : %d",
166             clientResponse->result);
167     NS_LOG_V(DEBUG, "MQ OBS response sequenceNum : %d",
168             clientResponse->sequenceNumber);
169     NS_LOG_V(DEBUG, "MQ OBS response resource uri : %s",
170             clientResponse->resourceUri);
171     NS_LOG_V(DEBUG, "MQ OBS response Transport Type : %d",
172                     clientResponse->devAddr.adapter);
173
174     NSMessage * newMsg = NSGetMessage((OCRepPayload *)clientResponse->payload);
175     NS_VERIFY_NOT_NULL(newMsg, OC_STACK_KEEP_TRANSACTION);
176
177     NSTask * task = NULL;
178
179     if (newMsg->type == NS_MESSAGE_READ || newMsg->type == NS_MESSAGE_DELETED)
180     {
181         NSSyncInfo * syncInfo = (NSSyncInfo *)OICMalloc(sizeof(NSSyncInfo));
182         NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(syncInfo,
183                               OC_STACK_KEEP_TRANSACTION, NSRemoveMessage(newMsg));
184
185         syncInfo->messageId = newMsg->messageId;
186         syncInfo->state = (newMsg->type == NS_MESSAGE_READ) ? NS_SYNC_READ : NS_SYNC_DELETED;
187         OICStrcpy(syncInfo->providerId, sizeof(char) * NS_DEVICE_ID_LENGTH, newMsg->providerId);
188
189         NSRemoveMessage(newMsg);
190
191         NS_LOG(DEBUG, "build NSTask for MQ message sync");
192         task = NSMakeTask(TASK_RECV_SYNCINFO, (void *) syncInfo);
193         NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(task, OC_STACK_KEEP_TRANSACTION, NSOICFree(syncInfo));
194     }
195     else
196     {
197         NS_LOG(DEBUG, "build NSTask for MQ message receive");
198         task = NSMakeTask(TASK_CONSUMER_RECV_MESSAGE, (void *) newMsg);
199         NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(task, OC_STACK_KEEP_TRANSACTION,
200                               NSRemoveMessage(newMsg));
201     }
202
203     NSConsumerPushEvent(task);
204
205     return OC_STACK_KEEP_TRANSACTION;
206 }
207 #endif