Update snapshot(2017-12-06)
[platform/upstream/iotivity.git] / service / notification / src / consumer / NSConsumerMQPlugin.c
1 //******************************************************************
2 //
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #ifdef WITH_MQ
22 #include "NSConstants.h"
23 #include "NSConsumerCommon.h"
24 #include "NSConsumerMQPlugin.h"
25 #include "NSUtil.h"
26
27 #include "oic_malloc.h"
28 #include "oic_string.h"
29 #include "ocpayload.h"
30
31 void NSHandleMQSubscription(NSMQTopicAddress * address);
32
33 OCStackApplicationResult NSConsumerIntrospectMQTopic(
34         void * ctx, OCDoHandle handle, OCClientResponse * clientResponse);
35
36 OCStackApplicationResult NSConsumerMQListener(
37         void * ctx, OCDoHandle handle, OCClientResponse * clientResponse);
38
39 void NSConsumerMQTaskProcessing(NSTask * task)
40 {
41     NS_VERIFY_NOT_NULL_V(task);
42
43     NS_LOG_V(DEBUG, "Receive Event : %d", (int) task->taskType);
44
45     switch (task->taskType)
46     {
47         case TASK_MQ_REQ_SUBSCRIBE:
48         {
49             NSMQTopicAddress * mqTopic = task->taskData;
50             NSHandleMQSubscription(mqTopic);
51             NSOICFree(mqTopic);
52             break;
53         }
54         default:
55         {
56             NS_LOG(ERROR, "Unknown type of task");
57             break;
58         }
59     }
60
61     NSOICFree(task);
62 }
63
64 void NSHandleMQSubscription(NSMQTopicAddress * topicAddr)
65 {
66     char * serverUri = topicAddr->serverAddr;
67     char * topicName = topicAddr->topicName;
68
69     OCDevAddr * addr = NSChangeAddress(serverUri);
70     NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(addr,
71                     {
72                         NSOICFree(topicAddr->serverAddr);
73                         NSOICFree(topicAddr->topicName);
74                     });
75
76     char requestUri[100] = "coap+tcp://";
77     OICStrcat(requestUri, strlen(requestUri)+strlen(serverUri)+1, serverUri);
78     OICStrcat(requestUri, strlen(requestUri)+ strlen("/oic/ps") + 1, "/oic/ps");
79     NS_LOG_V(DEBUG, "requestUri = %s", requestUri);
80
81     OCStackResult ret = NSInvokeRequest(NULL, OC_REST_GET, addr, requestUri, NULL,
82                       NSConsumerIntrospectMQTopic, topicName, OICFree, CT_DEFAULT);
83     NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(NSOCResultToSuccess(ret) == true ? (void *)1 : NULL,
84                    {
85                        NSOICFree(serverUri);
86                        NSOICFree(topicName);
87                    });
88
89     NSOICFree(serverUri);
90 }
91
92 OCStackApplicationResult NSConsumerIntrospectMQTopic(
93         void * ctx, OCDoHandle handle, OCClientResponse * clientResponse)
94 {
95     (void) handle;
96
97     if (!NSIsStartedConsumer())
98     {
99         return OC_STACK_DELETE_TRANSACTION;
100     }
101
102     NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION);
103     NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION);
104     NS_VERIFY_NOT_NULL(clientResponse->payload, OC_STACK_KEEP_TRANSACTION);
105
106     NS_LOG(DEBUG, "income get response of MQ broker");
107     NS_LOG_V(INFO_PRIVATE, "MQ GET response income : %s:%d",
108             clientResponse->devAddr.addr, clientResponse->devAddr.port);
109     NS_LOG_V(DEBUG, "MQ GET response result : %d",
110             clientResponse->result);
111     NS_LOG_V(DEBUG, "MQ GET response sequenceNum : %d",
112             clientResponse->sequenceNumber);
113     NS_LOG_V(DEBUG, "MQ GET response resource uri : %s",
114             clientResponse->resourceUri);
115     NS_LOG_V(DEBUG, "MQ GET response Transport Type : %d",
116                     clientResponse->devAddr.adapter);
117
118     char ** topicList = NULL;
119     size_t dimensions[MAX_REP_ARRAY_DEPTH] = {0};
120     OCRepPayloadGetStringArray((OCRepPayload *) clientResponse->payload,
121                                NS_ATTIRBUTE_MQ_TOPICLIST, & topicList, dimensions);
122
123     char * interestTopicName = (char *) ctx;
124     for (size_t i = 0; i < dimensions[0]; ++i)
125     {
126         NS_LOG_V(DEBUG, "found MQ topic : %s", topicList[i]);
127         if (!strcmp(topicList[i], interestTopicName))
128         {
129             NS_LOG(DEBUG, "subscribe to MQ notification");
130
131             OCStackResult ret = NSInvokeRequest(NULL,
132                                   OC_REST_OBSERVE, clientResponse->addr, topicList[i], NULL,
133                                   NSConsumerMQListener, NULL, NULL, CT_DEFAULT);
134
135             if (!NSOCResultToSuccess(ret))
136             {
137                 NS_LOG(DEBUG, "fail to subscribe to MQ notification");
138                 continue;
139             }
140         }
141     }
142
143     // Free topicList
144     size_t count = calcDimTotal(dimensions);
145     for (size_t k = 0; k < count; k++)
146     {
147         OICFree(topicList[k]);
148     }
149     OICFree(topicList);
150
151     return OC_STACK_KEEP_TRANSACTION;
152 }
153
154 OCStackApplicationResult NSConsumerMQListener(
155         void * ctx, OCDoHandle handle, OCClientResponse * clientResponse)
156 {
157     (void) ctx;
158     (void) handle;
159
160     if (!NSIsStartedConsumer())
161     {
162         return OC_STACK_DELETE_TRANSACTION;
163     }
164
165     NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION);
166     NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION);
167     NS_VERIFY_NOT_NULL(clientResponse->payload, OC_STACK_KEEP_TRANSACTION);
168
169     NS_LOG(DEBUG, "income observe response of MQ notification");
170     NS_LOG_V(INFO_PRIVATE, "MQ OBS response income : %s:%d",
171             clientResponse->devAddr.addr, clientResponse->devAddr.port);
172     NS_LOG_V(DEBUG, "MQ OBS response result : %d",
173             clientResponse->result);
174     NS_LOG_V(DEBUG, "MQ OBS response sequenceNum : %d",
175             clientResponse->sequenceNumber);
176     NS_LOG_V(DEBUG, "MQ OBS response resource uri : %s",
177             clientResponse->resourceUri);
178     NS_LOG_V(DEBUG, "MQ OBS response Transport Type : %d",
179                     clientResponse->devAddr.adapter);
180
181     NSMessage * newMsg = NSGetMessage((OCRepPayload *)clientResponse->payload);
182     NS_VERIFY_NOT_NULL(newMsg, OC_STACK_KEEP_TRANSACTION);
183
184     NSTask * task = NULL;
185
186     if (newMsg->type == NS_MESSAGE_READ || newMsg->type == NS_MESSAGE_DELETED)
187     {
188         NSSyncInfo * syncInfo = (NSSyncInfo *)OICMalloc(sizeof(NSSyncInfo));
189         NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(syncInfo,
190                               OC_STACK_KEEP_TRANSACTION, NSRemoveMessage(newMsg));
191
192         syncInfo->messageId = newMsg->messageId;
193         syncInfo->state = (newMsg->type == NS_MESSAGE_READ) ? NS_SYNC_READ : NS_SYNC_DELETED;
194         OICStrcpy(syncInfo->providerId, sizeof(char) * NS_DEVICE_ID_LENGTH, newMsg->providerId);
195
196         NSRemoveMessage(newMsg);
197
198         NS_LOG(DEBUG, "build NSTask for MQ message sync");
199         task = NSMakeTask(TASK_RECV_SYNCINFO, (void *) syncInfo);
200         NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(task, OC_STACK_KEEP_TRANSACTION, NSOICFree(syncInfo));
201     }
202     else
203     {
204         NS_LOG(DEBUG, "build NSTask for MQ message receive");
205         task = NSMakeTask(TASK_CONSUMER_RECV_MESSAGE, (void *) newMsg);
206         NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(task, OC_STACK_KEEP_TRANSACTION,
207                               NSRemoveMessage(newMsg));
208     }
209
210     NSConsumerPushEvent(task);
211
212     return OC_STACK_KEEP_TRANSACTION;
213 }
214 #endif