Fix mq sample client crash
[platform/upstream/iotivity.git] / cloud / samples / client / messagequeue / mq_publisher.cpp
1 //******************************************************************
2 //
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 ///
22 /// This sample provides the way to create cloud sample
23 ///
24 #include <memory>
25 #include <iostream>
26 #include <stdexcept>
27 #include <condition_variable>
28 #include <map>
29 #include <vector>
30 #include <string>
31 #include <unistd.h>
32 #include <stdio.h>
33
34 #include "ocstack.h"
35 #include "ocpayload.h"
36
37 #include <OCApi.h>
38 #include <OCPlatform.h>
39
40 using namespace OC;
41 using namespace std;
42
43 #define DEFAULT_MQ_BROKER_URI "/oic/ps"
44
45 OC::OCResource::Ptr g_mqBrokerResource = nullptr;
46 OC::OCResource::Ptr g_mqSelectedTopicResource = nullptr;
47
48 vector<shared_ptr<OC::OCResource> > gTopicList;
49
50 void printRepresentation(OCRepresentation rep)
51 {
52     for (auto itr = rep.begin(); itr != rep.end(); ++itr)
53     {
54         cout << "\t" << itr->attrname() << ":\t" << itr->getValueToString() << endl;
55         if (itr->type() == AttributeType::Vector)
56         {
57             switch (itr->base_type())
58             {
59                 case AttributeType::OCRepresentation:
60                     for (auto itr2 : (*itr).getValue<vector<OCRepresentation> >())
61                     {
62                         printRepresentation(itr2);
63                     }
64                     break;
65
66                 case AttributeType::Integer:
67                     for (auto itr2 : (*itr).getValue<vector<int> >())
68                     {
69                         cout << "\t\t" << itr2 << endl;
70                     }
71                     break;
72
73                 case AttributeType::String:
74                     for (auto itr2 : (*itr).getValue<vector<string> >())
75                     {
76                         cout << "\t\t" << itr2 << endl;
77                     }
78                     break;
79
80                 default:
81                     cout << "Unhandled base type " << itr->base_type() << endl;
82                     break;
83             }
84         }
85         else if (itr->type() == AttributeType::OCRepresentation)
86         {
87             printRepresentation((*itr).getValue<OCRepresentation>());
88         }
89     }
90 }
91
92 ////////////////////////////////////////Publisher Sample
93
94 void createTopicCB(const int ecode, const string &originUri,
95                    shared_ptr<OC::OCResource> topic)
96 {
97     cout << "Create topic response received, code: " << ecode << endl;
98
99     if (ecode == OCStackResult::OC_STACK_RESOURCE_CREATED)
100     {
101         cout << "Created topic : " << topic->uri() << endl;
102     }
103     else
104     {
105         cout << "Topic creation failed : " << originUri << endl;
106     }
107 }
108
109 void publishMessageCB(const HeaderOptions &, const OCRepresentation &, const int eCode)
110 {
111     cout << "Publish message response received, code: " << eCode << endl;
112 }
113
114 void discoverTopicCB(const int ecode, const string &, shared_ptr<OC::OCResource> topic)
115 {
116     cout << "Topic discovered code: " << ecode << endl;
117     gTopicList.push_back(topic);
118     cout << "Topic [" << gTopicList.size() - 1 << "] " << topic->uri() << " discovered" << endl;
119 }
120 ////////////////////////////////////////End of Publisher Sample
121
122 condition_variable g_callbackLock;
123 string             g_uid;
124 string             g_accesstoken;
125
126 void handleLoginoutCB(const HeaderOptions &,
127                       const OCRepresentation &rep, const int ecode)
128 {
129     cout << "Auth response received code: " << ecode << endl;
130
131     if (rep.getPayload() != NULL)
132     {
133         printRepresentation(rep);
134     }
135
136     if (ecode == 4)
137     {
138         g_accesstoken = rep.getValueToString("accesstoken");
139
140         g_uid = rep.getValueToString("uid");
141     }
142
143     g_callbackLock.notify_all();
144 }
145
146 static FILE *client_open(const char * /*path*/, const char *mode)
147 {
148     return fopen("./mq_publisher.dat", mode);
149 }
150
151 int main(int argc, char *argv[])
152 {
153     if (argc != 4 && argc != 5)
154     {
155         cout << "Put \"[host-ipaddress:port] [authprovider] [authcode]\" for sign-up and sign-in"
156              << endl;
157         cout << "Put \"[host-ipaddress:port] [uid] [accessToken] 1\" for sign-in" <<
158              endl;
159         return 0;
160     }
161
162     OCPersistentStorage ps{ client_open, fread, fwrite, fclose, unlink };
163
164     PlatformConfig cfg
165     {
166         ServiceType::InProc,
167         ModeType::Both,
168         "0.0.0.0", // By setting to "0.0.0.0", it binds to all available interfaces
169         0,         // Uses randomly available port
170         QualityOfService::LowQos,
171         &ps
172     };
173
174     OCPlatform::Configure(cfg);
175
176     OCStackResult result = OC_STACK_ERROR;
177
178     string host = "coap+tcp://";
179     host += argv[1];
180
181     OCAccountManager::Ptr accountMgr = OCPlatform::constructAccountManagerObject(host,
182                                        CT_ADAPTER_TCP);
183
184     mutex blocker;
185     unique_lock<mutex> lock(blocker);
186
187     if (argc == 5)
188     {
189         accountMgr->signIn(argv[2], argv[3], &handleLoginoutCB);
190         g_callbackLock.wait(lock);
191     }
192     else
193     {
194         accountMgr->signUp(argv[2], argv[3], &handleLoginoutCB);
195         g_callbackLock.wait(lock);
196         accountMgr->signIn(g_uid, g_accesstoken, &handleLoginoutCB);
197         g_callbackLock.wait(lock);
198     }
199
200     // MQ broker resource
201     g_mqBrokerResource = OCPlatform::constructResourceObject(host, DEFAULT_MQ_BROKER_URI,
202                          static_cast<OCConnectivityType>(CT_ADAPTER_TCP | CT_IP_USE_V4), false,
203     { string("oic.wk.ps") }, { string(DEFAULT_INTERFACE) });
204
205     cout << "===Message Queue publisher sample===" << endl;
206     cout << "PUT 0 to discover all topics" << endl;
207     cout << "PUT 1 to discover type based topics" << endl;
208     cout << "PUT 2 to select topic index for publishing data" << endl;
209     cout << "PUT 3 to publish data to selected topic" << endl;
210     cout << "PUT 4 to create topic" << endl;
211     cout << "PUT 5 to create type based topic" << endl;
212
213     string cmd;
214
215     while (true)
216     {
217         cin >> cmd;
218
219         try
220         {
221
222             QueryParamsMap query;
223             OCRepresentation rep;
224             string      topicType;
225
226             switch (cmd[0])
227             {
228                 case '0':
229                     gTopicList.clear();
230                     cout << "Discovering topics" << endl;
231                     result = g_mqBrokerResource->discoveryMQTopics(query, &discoverTopicCB, QualityOfService::LowQos);
232                     break;
233
234                 case '1':
235                     gTopicList.clear();
236                     cout << "Put topic type to discover: ";
237                     cin >> cmd;
238                     query["rt"] = cmd;
239                     result = g_mqBrokerResource->discoveryMQTopics(query, &discoverTopicCB, QualityOfService::LowQos);
240                     break;
241
242                 case '2':
243                     cout << "Put discovered topic index to select: ";
244                     cin >> cmd;
245                     {
246                         int index = atoi(cmd.c_str());
247                         if (index < 0 || (unsigned int) index >= gTopicList.size())
248                         {
249                             cout << "invalid topic index selected" << endl;
250                             continue;
251                         }
252
253                         g_mqSelectedTopicResource = gTopicList[index];
254                         cout << g_mqSelectedTopicResource->uri() << " selected" << endl;
255                     }
256                     break;
257
258                 case '3':
259                     if (g_mqSelectedTopicResource == nullptr)
260                     {
261                         cout << "Topic is not selected." << endl;
262                         continue;
263                     }
264
265                     cout << "Put message to selected topic: ";
266                     cin >> cmd;
267                     rep["message"] = cmd;
268                     result = g_mqSelectedTopicResource->publishMQTopic(rep, query, &publishMessageCB,
269                              QualityOfService::LowQos);
270                     break;
271
272                 case '4':
273                     cout << "Put topic uri to create: ";
274                     cin >> cmd;
275                     result = g_mqBrokerResource->createMQTopic(rep, cmd, query, &createTopicCB,
276                              QualityOfService::LowQos);
277                     break;
278
279                 case '5':
280                     cout << "Put topic uri to create: ";
281                     cin >> cmd;
282                     cout << "Put topic type: ";
283                     cin >> topicType;
284                     query["rt"] = topicType;
285                     result = g_mqBrokerResource->createMQTopic(rep, cmd, query, &createTopicCB,
286                              QualityOfService::LowQos);
287                     break;
288
289                 case 'q':
290                     goto exit;
291                     break;
292             }
293
294             if (result != OC_STACK_OK)
295             {
296                 cout << "Error, return code: " << result << endl;
297             }
298         }
299         catch (const exception &e)
300         {
301             cout << "Precondition failed: " << e.what() << endl;
302         }
303     }
304
305 exit:
306     return 0;
307 }