Fix mq sample client crash
[platform/upstream/iotivity.git] / cloud / samples / client / messagequeue / mq_subscriber.cpp
1 //******************************************************************
2 //
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 ///
22 /// This sample provides the way to create cloud sample
23 ///
24 #include <memory>
25 #include <iostream>
26 #include <stdexcept>
27 #include <condition_variable>
28 #include <map>
29 #include <vector>
30 #include <string>
31 #include <unistd.h>
32 #include <stdio.h>
33
34 #include "ocstack.h"
35 #include "ocpayload.h"
36
37 #include <OCApi.h>
38 #include <OCPlatform.h>
39
40 using namespace OC;
41 using namespace std;
42
43 #define DEFAULT_MQ_BROKER_URI "/oic/ps"
44 #define maxSequenceNumber 0xFFFFFF
45
46 OC::OCResource::Ptr g_mqBrokerResource = nullptr;
47 OC::OCResource::Ptr g_mqSelectedTopicResource = nullptr;
48
49 vector<shared_ptr<OC::OCResource> > gTopicList;
50
51 void printRepresentation(OCRepresentation rep)
52 {
53     for (auto itr = rep.begin(); itr != rep.end(); ++itr)
54     {
55         cout << "\t" << itr->attrname() << ":\t" << itr->getValueToString() << endl;
56         if (itr->type() == AttributeType::Vector)
57         {
58             switch (itr->base_type())
59             {
60                 case AttributeType::OCRepresentation:
61                     for (auto itr2 : (*itr).getValue<vector<OCRepresentation> >())
62                     {
63                         printRepresentation(itr2);
64                     }
65                     break;
66
67                 case AttributeType::Integer:
68                     for (auto itr2 : (*itr).getValue<vector<int> >())
69                     {
70                         cout << "\t\t" << itr2 << endl;
71                     }
72                     break;
73
74                 case AttributeType::String:
75                     for (auto itr2 : (*itr).getValue<vector<string> >())
76                     {
77                         cout << "\t\t" << itr2 << endl;
78                     }
79                     break;
80
81                 default:
82                     cout << "Unhandled base type " << itr->base_type() << endl;
83                     break;
84             }
85         }
86         else if (itr->type() == AttributeType::OCRepresentation)
87         {
88             printRepresentation((*itr).getValue<OCRepresentation>());
89         }
90     }
91 }
92
93 ////////////////////////////////////////Subscriber Sample
94 void subscribeCB(const HeaderOptions &,
95                  const OCRepresentation &rep, const int eCode, const int sequenceNumber)
96 {
97     try
98     {
99         if (eCode == OC_STACK_OK && sequenceNumber != maxSequenceNumber + 1)
100         {
101             if (sequenceNumber == OC_OBSERVE_REGISTER)
102             {
103                 cout << "Observe registration action is successful" << endl;
104             }
105
106             cout << "OBSERVE RESULT:" << endl;
107             printRepresentation(rep);
108         }
109         else
110         {
111             if (eCode == OC_STACK_OK)
112             {
113                 cout << "Observe registration failed or de-registration action failed/succeeded" << endl;
114             }
115             else
116             {
117                 cout << "onObserve Response error: " << eCode << endl;
118                 exit(-1);
119             }
120         }
121     }
122     catch (exception &e)
123     {
124         cout << "Exception: " << e.what() << " in onObserve" << endl;
125     }
126 }
127
128 void discoverTopicCB(const int ecode, const string &brokerUri,
129                      shared_ptr<OC::OCResource> topic)
130 {
131     cout << "Topic discovered from " << brokerUri << " code: " << ecode << endl;
132     gTopicList.push_back(topic);
133     cout << "Topic [" << gTopicList.size() - 1 << "] " << topic->uri() << " discovered" << endl;
134 }
135 ////////////////////////////////////////Subscriber Sample
136
137 condition_variable  g_callbackLock;
138 string             g_uid;
139 string             g_accesstoken;
140
141 void handleLoginoutCB(const HeaderOptions &,
142                       const OCRepresentation &rep, const int ecode)
143 {
144     cout << "Auth response received code: " << ecode << endl;
145
146     if (rep.getPayload() != NULL)
147     {
148         printRepresentation(rep);
149     }
150
151     if (ecode == 4)
152     {
153         g_accesstoken = rep.getValueToString("accesstoken");
154
155         g_uid = rep.getValueToString("uid");
156     }
157
158     g_callbackLock.notify_all();
159 }
160
161 static FILE *client_open(const char * /*path*/, const char *mode)
162 {
163     return fopen("./mq_subscriber.dat", mode);
164 }
165
166 int main(int argc, char *argv[])
167 {
168     if (argc != 4 && argc != 5)
169     {
170         cout << "Put \"[host-ipaddress:port] [authprovider] [authcode]\" for sign-up and sign-in"
171              << endl;
172         cout << "Put \"[host-ipaddress:port] [uid] [accessToken] 1\" for sign-in" <<
173              endl;
174         return 0;
175     }
176
177     OCPersistentStorage ps{ client_open, fread, fwrite, fclose, unlink };
178
179     PlatformConfig cfg
180     {
181         ServiceType::InProc,
182         ModeType::Both,
183         "0.0.0.0", // By setting to "0.0.0.0", it binds to all available interfaces
184         0,         // Uses randomly available port
185         QualityOfService::LowQos,
186         &ps
187     };
188
189     OCPlatform::Configure(cfg);
190
191     OCStackResult result = OC_STACK_ERROR;
192
193     string host = "coap+tcp://";
194     host += argv[1];
195
196     OCAccountManager::Ptr accountMgr = OCPlatform::constructAccountManagerObject(host,
197                                        CT_ADAPTER_TCP);
198
199     mutex blocker;
200     unique_lock<mutex> lock(blocker);
201
202     if (argc == 5)
203     {
204         accountMgr->signIn(argv[2], argv[3], &handleLoginoutCB);
205         g_callbackLock.wait(lock);
206     }
207     else
208     {
209         accountMgr->signUp(argv[2], argv[3], &handleLoginoutCB);
210         g_callbackLock.wait(lock);
211         accountMgr->signIn(g_uid, g_accesstoken, &handleLoginoutCB);
212         g_callbackLock.wait(lock);
213     }
214
215     // MQ broker resource
216     g_mqBrokerResource = OCPlatform::constructResourceObject(host, DEFAULT_MQ_BROKER_URI,
217                          static_cast<OCConnectivityType>(CT_ADAPTER_TCP | CT_IP_USE_V4), false,
218     { string("oic.wk.ps") }, { string(DEFAULT_INTERFACE) });
219
220     cout << "===Message Queue subscriber sample===" << endl;
221     cout << "PUT 0 to discover all topics" << endl;
222     cout << "PUT 1 to discover type based topics" << endl;
223     cout << "PUT 2 to select topic for subscribing or unsubscribing" << endl;
224     cout << "PUT 3 to subscribe to selected topic" << endl;
225     cout << "PUT 4 to unsubscribe to selected topic" << endl;
226
227     string cmd;
228
229     while (true)
230     {
231         cin >> cmd;
232
233         try
234         {
235             QueryParamsMap query;
236             OCRepresentation rep;
237
238             switch (cmd[0])
239             {
240                 case '0':
241                     gTopicList.clear();
242                     cout << "Discovering topics" << endl;
243                     result = g_mqBrokerResource->discoveryMQTopics(query, &discoverTopicCB, QualityOfService::LowQos);
244                     break;
245
246                 case '1':
247                     gTopicList.clear();
248                     cout << "Put topic type to discover: ";
249                     cin >> cmd;
250                     query["rt"] = cmd;
251                     result = g_mqBrokerResource->discoveryMQTopics(query, &discoverTopicCB, QualityOfService::LowQos);
252                     break;
253
254                 case '2':
255                     cout << "Put discovered topic index to select: ";
256                     cin >> cmd;
257                     {
258                         int index = atoi(cmd.c_str());
259                         if (index < 0 || (unsigned int) index >= gTopicList.size())
260                         {
261                             cout << "invalid topic index selected" << endl;
262                             continue;
263                         }
264
265                         g_mqSelectedTopicResource = gTopicList[index];
266                         cout << g_mqSelectedTopicResource->uri() << " selected" << endl;
267                     }
268                     break;
269
270                 case '3':
271                     if (g_mqSelectedTopicResource == nullptr)
272                     {
273                         cout << "Topic is not selected." << endl;
274                         continue;
275                     }
276
277                     cout << "Subscribe to selected topic" << endl;
278                     result = g_mqSelectedTopicResource->subscribeMQTopic(ObserveType::Observe, query, &subscribeCB,
279                              QualityOfService::LowQos);
280                     break;
281
282                 case '4':
283                     if (g_mqSelectedTopicResource == nullptr)
284                     {
285                         cout << "Topic is not selected." << endl;
286                         continue;
287                     }
288                     cout << "Unsubscribe to selected topic" << endl;
289                     result = g_mqSelectedTopicResource->unsubscribeMQTopic(QualityOfService::LowQos);
290                     break;
291
292                 case 'q':
293                     goto exit;
294                     break;
295             }
296
297             if (result != OC_STACK_OK)
298             {
299                 cout << "Error, return code: " << result << endl;
300             }
301         }
302         catch (const exception &e)
303         {
304             cout << "Precondition failed: " << e.what() << endl;
305         }
306     }
307
308 exit:
309     return 0;
310 }