BugFix: A establishment of websockets fails.
[profile/ivi/ico-vic-carsimulator.git] / src / AmbpiComm.cpp
1 /*
2  * Copyright (c) 2013, TOYOTA MOTOR CORPORATION.
3  *
4  * This program is licensed under the terms and conditions of the 
5  * Apache License, version 2.0.  The full text of the Apache License is at
6  * http://www.apache.org/licenses/LICENSE-2.0
7  *
8  */
9 /**
10  * @brief   AMB plug-inn Communication I/F
11  *
12  */
13
14 #include <string.h>
15 #include <iostream>
16 #include <vector>
17 #include <unistd.h>
18 #include "AmbpiComm.h"
19
20 #include "ico-util/ico_log.h"
21
22 using namespace std;
23
24 static vector<AmbpiCommIF*> _commList;
25 const int RECONNECT_WS = 5;
26
27 bool _addAmbCommIFList(AmbpiCommIF* comm);
28 void _eraseAmbCommIFList(const AmbpiCommIF* comm);
29 static void _icoUwsCallback(const struct ico_uws_context *context,
30                            const ico_uws_evt_e event, const void *id,
31                            const ico_uws_detail *detail, void *user_data);
32
33
34 AmbpiCommRecvQueue::AmbpiCommRecvQueue()
35 : msize(0), maxqueuesize(10)
36 {
37     init();
38 }
39
40 AmbpiCommRecvQueue::AmbpiCommRecvQueue(int queuesize)
41 : msize(0)
42 {
43     maxqueuesize = queuesize;
44     init();
45 }
46
47 AmbpiCommRecvQueue::~AmbpiCommRecvQueue()
48 {
49     delete[] mdata;
50     delete[] mdatasize;
51 }
52
53 bool AmbpiCommRecvQueue::push(char *data, int datasize)
54 {
55     if (datasize > maxdatasize || (msize + 1) > maxqueuesize) {
56         return false;
57     }
58     memset(mdata[msize], 0, maxdatasize);
59     memcpy(mdata[msize], data, datasize);
60     mdatasize[msize] = datasize;
61     msize++;
62     return true;
63 }
64
65 bool AmbpiCommRecvQueue::front(char *buf) const
66 {
67     if (msize == 0) {
68         return false;
69     }
70     memcpy(buf, mdata[0], mdatasize[0]);
71     return true;
72 }
73
74 bool AmbpiCommRecvQueue::empty() const
75 {
76     return (msize == 0);
77 }
78
79 void AmbpiCommRecvQueue::pop()
80 {
81     if (msize > 0) {
82         msize--;
83         for (int i = 0; i < msize; i++) {
84             memset(mdata[i], 0, maxdatasize);
85             memcpy(mdata[i], mdata[i + 1], mdatasize[i + 1]);
86             mdatasize[i] = mdatasize[i + 1];
87         }
88         memset(mdata[msize], 0, maxdatasize);
89         mdatasize[msize] = 0;
90     }
91 }
92
93 int AmbpiCommRecvQueue::size() const
94 {
95     return msize;
96 }
97
98 void AmbpiCommRecvQueue::init()
99 {
100     mdata = new char[maxqueuesize][maxdatasize];
101     memset((void *) mdata, 0, maxqueuesize * maxdatasize);
102
103     mdatasize = new int[maxqueuesize];
104
105     memset(mdatasize, 0, maxqueuesize);
106 }
107
108 AmbpiCommIF::AmbpiCommIF()
109 {
110     isready = false;
111     m_threadid = 0;
112     m_mutex = PTHREAD_MUTEX_INITIALIZER;
113     m_cond = PTHREAD_COND_INITIALIZER;
114     m_id = NULL;
115     m_context = NULL;
116     reset_ercode();
117 }
118
119 AmbpiCommIF::AmbpiCommIF(const char* uri, const char* protocolName)
120 {
121     isready = false;
122     m_threadid = 0;
123     m_mutex = PTHREAD_MUTEX_INITIALIZER;
124     m_cond = PTHREAD_COND_INITIALIZER;
125     m_id = NULL;
126     m_context = NULL;
127     reset_ercode();
128     start(uri, protocolName);
129 }
130
131 AmbpiCommIF::~AmbpiCommIF()
132 {
133     if (isready) {
134         int ret;
135         if (m_threadid != 0) {
136             ret = pthread_cancel(m_threadid);
137             if (ret != 0) {
138                 cerr << m_pNm << ":Failed to pthread_cancel" << endl;
139             }
140         }
141         ret = pthread_join(m_threadid, NULL);
142         if (ret != 0) {
143             cerr << m_pNm << ":Failed to pthread_join" << endl;
144         }
145         if (m_context != NULL) {
146             ico_uws_close(m_context);
147             m_context = NULL;
148         }
149         _eraseAmbCommIFList(this);
150     }
151 }
152
153 bool AmbpiCommIF::start(const char* uri, const char* protocolName)
154 {
155     if (isready) {
156         return isready;
157     }
158     isready = init(uri, protocolName);
159     return isready;
160 }
161
162 bool AmbpiCommIF::send(const char *msg, const int size)
163 {
164     if (!isready) {
165         return isready;
166     }
167     reset_ercode();
168     ico_uws_send(m_context, m_id, (unsigned char*)msg, (size_t)size);
169     if ((ICO_UWS_ERR_UNKNOWN != m_ercode) && (ICO_UWS_ERR_NONE != m_ercode)) {
170         ICO_DBG("uri[%s], protocol[%s], m_context[%x], m_id[%x], msg[%s], size[%d]",
171             m_uri.c_str(), m_pNm.c_str(), (int)m_context, (int)m_id, msg, size);
172         return false;
173     }
174     return true;
175 }
176
177 bool AmbpiCommIF::recv(char *msg, bool fblocking)
178 {
179     if (!isready) {
180         return isready;
181     }
182
183     if (fblocking) {
184         if (m_queue.empty()) {
185             if (pthread_cond_wait(&m_cond, &m_mutex) != 0) {
186                 cerr << m_pNm << ":Failed to wait signal" << endl;
187             }
188         }
189         m_queue.front(msg);
190         m_queue.pop();
191     }
192     else {
193         if (m_queue.empty()) {
194             return false;
195         }
196         else {
197             m_queue.front(msg);
198             m_queue.pop();
199         }
200     }
201     return true;
202 }
203
204 bool AmbpiCommIF::poll()
205 {
206     if (!isready) {
207         return isready;
208     }
209
210     while (true) {
211         ico_uws_service(m_context);
212         usleep(100000);
213     }
214 }
215
216 void *AmbpiCommIF::loop(void *arg)
217 {
218     AmbpiCommIF *src = reinterpret_cast < AmbpiCommIF * >(arg);
219     return (void *) src->poll();
220 }
221
222 bool AmbpiCommIF::init(const char* uri, const char* protocolName)
223 {
224     if (NULL == uri) {
225         cerr << "Failed create context param" << endl;
226         return false;
227     }
228     if (NULL == protocolName) {
229         cerr << "Failed create context param 2" << endl;
230         return false;
231     }
232     m_uri = uri;
233     m_pNm = protocolName;
234     int loopcount = 0;
235     do {
236         m_context = ico_uws_create_context(uri, protocolName);
237         if (NULL != m_context) {
238             break;
239         }
240         cerr << m_pNm << ":Failed to create context." << endl;
241         usleep (500 * 1000);
242     } while (m_context == NULL && ++loopcount < RECONNECT_WS);
243     int r = ico_uws_set_event_cb(m_context, _icoUwsCallback, (void*)this);
244     if (ICO_UWS_ERR_NONE != r) {
245         cerr << m_pNm << ":Failed to callback entry(" << r << ")." << endl;
246         return false;
247     }
248     _addAmbCommIFList(this);
249     if (pthread_create(&m_threadid, NULL, AmbpiCommIF::loop,
250                        (void *) this) != 0) {
251         cerr << m_pNm << ":Failed to create thread." << endl;
252         return false;
253     }
254     return true;
255 }
256
257 /**
258  * @brief event
259  * @param event ico_uws_evt_e event code
260  * @param id unique id
261  * @param detail infomation or data detail
262  * @param user_data added user data
263  */
264 void AmbpiCommIF::event_cb(const ico_uws_evt_e event, const void *id,
265                            const ico_uws_detail *d, void *user_data)
266 {
267     switch (event) {
268     case ICO_UWS_EVT_RECEIVE:
269     {
270         ICO_DBG("uri[%s], protocol[%s], ICO_UWS_EVT_RECEIVE[m_id=%x]",
271             m_uri.c_str(), m_pNm.c_str(), (int)m_id);
272         if (NULL == d) {
273             cerr << m_pNm << ":Failed Receive event" << endl;
274             break;
275         }
276         if (0 != pthread_mutex_lock(&m_mutex)) {
277             cerr << m_pNm << ":Failed to lock mutex" << endl;
278         }
279         char buf[sizeof(KeyDataMsg_t) + MsgQueueMaxMsgSize];
280         memset(buf, 0, sizeof(buf));
281         memcpy(buf, reinterpret_cast<char*>(d->_ico_uws_message.recv_data),
282                d->_ico_uws_message.recv_len);
283         m_queue.push((&buf[0]), sizeof(buf));
284         pthread_cond_signal(&m_cond);
285         if (0 != pthread_mutex_unlock(&m_mutex)) {
286             cerr << m_pNm << ":Failed to unlock mutex" << endl;
287         }
288         break;
289     }
290     case ICO_UWS_EVT_OPEN:
291     {
292         m_id = (void*)id;
293         ICO_DBG("uri[%s], protocol[%s], ICO_UWS_EVT_OPEN[m_id=%x]",
294             m_uri.c_str(), m_pNm.c_str(), (int)m_id);
295         break;
296     }
297     case ICO_UWS_EVT_ERROR:
298     {
299         if (NULL == d) {
300             cerr << m_pNm << ":Failed ERROR event" << endl;
301             break;
302         }
303         m_ercode = d->_ico_uws_error.code;
304         break;
305     }
306     default:
307         break;
308     }
309     return;
310 }
311
312 /**
313  * @brief _addAmbCommIFList
314  * @param comm entry item address
315  * @return true:succes false:fail
316  */
317 bool _addAmbCommIFList(AmbpiCommIF* comm)
318 {
319     if (NULL == comm) {
320         return false;
321     }
322     _commList.push_back(comm);
323     return true;
324 }
325
326 /**
327  * @brief _eraseAmbCommIFList
328  * @param comm erase target
329  */
330 void _eraseAmbCommIFList(const AmbpiCommIF* comm)
331 {
332     bool bMatch = false;
333     for (int i = 0; i < _commList.size(); i++) {
334         if (_commList[i] == comm) {
335             bMatch = true;
336             break; // break of for i
337         }
338     }
339     if (false == bMatch) {
340         return;
341     }
342     vector<AmbpiCommIF*> v_tmp(_commList);
343     _commList.clear();
344     for (int i = 0; i < v_tmp.size(); i++) {
345         if (v_tmp[i] != comm) {
346             _commList.push_back(v_tmp[i]);
347         }
348     }
349     return;
350 }
351
352 /**
353  * @brief _icoUwsCallback
354  * @param context identify connect
355  * @param event ico_uws_evt_e event code
356  * @param id unique id
357  * @param detail infomation or data detail
358  * @param user_data added user data
359  */
360 static void _icoUwsCallback(const struct ico_uws_context *context,
361                            const ico_uws_evt_e event, const void *id,
362                            const ico_uws_detail *detail, void *user_data)
363 {
364     for (int i = 0; i < _commList.size(); i++) {
365         AmbpiCommIF* comm = _commList[i];
366         if (NULL == comm) {
367             continue;
368         }
369         if (false == comm->isContextMatch(context)) {
370             continue;
371         }
372         comm->event_cb(event, id, detail, user_data);
373     }
374 }