2 * Copyright (c) 2013, TOYOTA MOTOR CORPORATION.
4 * This program is licensed under the terms and conditions of the
5 * Apache License, version 2.0. The full text of the Apache License is at
6 * http://www.apache.org/licenses/LICENSE-2.0
10 * @brief AMB plug-inn Communication I/F
18 #include "AmbpiComm.h"
20 #include "ico-util/ico_log.h"
24 static vector<AmbpiCommIF*> _commList;
25 const int RECONNECT_WS = 5;
27 bool _addAmbCommIFList(AmbpiCommIF* comm);
28 void _eraseAmbCommIFList(const AmbpiCommIF* comm);
29 static void _icoUwsCallback(const struct ico_uws_context *context,
30 const ico_uws_evt_e event, const void *id,
31 const ico_uws_detail *detail, void *user_data);
34 AmbpiCommRecvQueue::AmbpiCommRecvQueue()
35 : msize(0), maxqueuesize(10)
40 AmbpiCommRecvQueue::AmbpiCommRecvQueue(int queuesize)
43 maxqueuesize = queuesize;
47 AmbpiCommRecvQueue::~AmbpiCommRecvQueue()
53 bool AmbpiCommRecvQueue::push(char *data, int datasize)
55 if (datasize > maxdatasize || (msize + 1) > maxqueuesize) {
58 memset(mdata[msize], 0, maxdatasize);
59 memcpy(mdata[msize], data, datasize);
60 mdatasize[msize] = datasize;
65 bool AmbpiCommRecvQueue::front(char *buf) const
70 memcpy(buf, mdata[0], mdatasize[0]);
74 bool AmbpiCommRecvQueue::empty() const
79 void AmbpiCommRecvQueue::pop()
83 for (int i = 0; i < msize; i++) {
84 memset(mdata[i], 0, maxdatasize);
85 memcpy(mdata[i], mdata[i + 1], mdatasize[i + 1]);
86 mdatasize[i] = mdatasize[i + 1];
88 memset(mdata[msize], 0, maxdatasize);
93 int AmbpiCommRecvQueue::size() const
98 void AmbpiCommRecvQueue::init()
100 mdata = new char[maxqueuesize][maxdatasize];
101 memset((void *) mdata, 0, maxqueuesize * maxdatasize);
103 mdatasize = new int[maxqueuesize];
105 memset(mdatasize, 0, maxqueuesize);
108 AmbpiCommIF::AmbpiCommIF()
112 m_mutex = PTHREAD_MUTEX_INITIALIZER;
113 m_cond = PTHREAD_COND_INITIALIZER;
119 AmbpiCommIF::AmbpiCommIF(const char* uri, const char* protocolName)
123 m_mutex = PTHREAD_MUTEX_INITIALIZER;
124 m_cond = PTHREAD_COND_INITIALIZER;
128 start(uri, protocolName);
131 AmbpiCommIF::~AmbpiCommIF()
135 if (m_threadid != 0) {
136 ret = pthread_cancel(m_threadid);
138 cerr << m_pNm << ":Failed to pthread_cancel" << endl;
141 ret = pthread_join(m_threadid, NULL);
143 cerr << m_pNm << ":Failed to pthread_join" << endl;
145 if (m_context != NULL) {
146 ico_uws_close(m_context);
149 _eraseAmbCommIFList(this);
153 bool AmbpiCommIF::start(const char* uri, const char* protocolName)
158 isready = init(uri, protocolName);
162 bool AmbpiCommIF::send(const char *msg, const int size)
168 ico_uws_send(m_context, m_id, (unsigned char*)msg, (size_t)size);
169 if ((ICO_UWS_ERR_UNKNOWN != m_ercode) && (ICO_UWS_ERR_NONE != m_ercode)) {
170 ICO_DBG("uri[%s], protocol[%s], m_context[%p], m_id[%p], msg[%s], size[%d]",
171 m_uri.c_str(), m_pNm.c_str(), m_context, m_id, msg, size);
177 bool AmbpiCommIF::recv(char *msg, bool fblocking)
184 if (m_queue.empty()) {
185 if (pthread_cond_wait(&m_cond, &m_mutex) != 0) {
186 cerr << m_pNm << ":Failed to wait signal" << endl;
193 if (m_queue.empty()) {
204 bool AmbpiCommIF::poll()
211 ico_uws_service(m_context);
216 void *AmbpiCommIF::loop(void *arg)
218 AmbpiCommIF *src = reinterpret_cast < AmbpiCommIF * >(arg);
219 return (void *) src->poll();
222 bool AmbpiCommIF::init(const char* uri, const char* protocolName)
225 cerr << "Failed create context param" << endl;
228 if (NULL == protocolName) {
229 cerr << "Failed create context param 2" << endl;
233 m_pNm = protocolName;
236 m_context = ico_uws_create_context(uri, protocolName);
237 if (NULL != m_context) {
240 cerr << m_pNm << ":Failed to create context." << endl;
242 } while (m_context == NULL && ++loopcount < RECONNECT_WS);
243 int r = ico_uws_set_event_cb(m_context, _icoUwsCallback, (void*)this);
244 if (ICO_UWS_ERR_NONE != r) {
245 cerr << m_pNm << ":Failed to callback entry(" << r << ")." << endl;
248 _addAmbCommIFList(this);
249 if (pthread_create(&m_threadid, NULL, AmbpiCommIF::loop,
250 (void *) this) != 0) {
251 cerr << m_pNm << ":Failed to create thread." << endl;
259 * @param event ico_uws_evt_e event code
260 * @param id unique id
261 * @param detail infomation or data detail
262 * @param user_data added user data
264 void AmbpiCommIF::event_cb(const ico_uws_evt_e event, const void *id,
265 const ico_uws_detail *d, void *user_data)
268 case ICO_UWS_EVT_RECEIVE:
270 ICO_DBG("uri[%s], protocol[%s], ICO_UWS_EVT_RECEIVE[m_id=%p]",
271 m_uri.c_str(), m_pNm.c_str(), m_id);
273 cerr << m_pNm << ":Failed Receive event" << endl;
276 if (0 != pthread_mutex_lock(&m_mutex)) {
277 cerr << m_pNm << ":Failed to lock mutex" << endl;
279 char buf[sizeof(KeyDataMsg_t) + MsgQueueMaxMsgSize];
280 memset(buf, 0, sizeof(buf));
281 memcpy(buf, reinterpret_cast<char*>(d->_ico_uws_message.recv_data),
282 d->_ico_uws_message.recv_len);
283 m_queue.push((&buf[0]), sizeof(buf));
284 pthread_cond_signal(&m_cond);
285 if (0 != pthread_mutex_unlock(&m_mutex)) {
286 cerr << m_pNm << ":Failed to unlock mutex" << endl;
290 case ICO_UWS_EVT_OPEN:
293 ICO_DBG("uri[%s], protocol[%s], ICO_UWS_EVT_OPEN[m_id=%p]",
294 m_uri.c_str(), m_pNm.c_str(), m_id);
297 case ICO_UWS_EVT_ERROR:
300 cerr << m_pNm << ":Failed ERROR event" << endl;
303 m_ercode = d->_ico_uws_error.code;
313 * @brief _addAmbCommIFList
314 * @param comm entry item address
315 * @return true:succes false:fail
317 bool _addAmbCommIFList(AmbpiCommIF* comm)
322 _commList.push_back(comm);
327 * @brief _eraseAmbCommIFList
328 * @param comm erase target
330 void _eraseAmbCommIFList(const AmbpiCommIF* comm)
333 for (int i = 0; i < _commList.size(); i++) {
334 if (_commList[i] == comm) {
336 break; // break of for i
339 if (false == bMatch) {
342 vector<AmbpiCommIF*> v_tmp(_commList);
344 for (int i = 0; i < v_tmp.size(); i++) {
345 if (v_tmp[i] != comm) {
346 _commList.push_back(v_tmp[i]);
353 * @brief _icoUwsCallback
354 * @param context identify connect
355 * @param event ico_uws_evt_e event code
356 * @param id unique id
357 * @param detail infomation or data detail
358 * @param user_data added user data
360 static void _icoUwsCallback(const struct ico_uws_context *context,
361 const ico_uws_evt_e event, const void *id,
362 const ico_uws_detail *detail, void *user_data)
364 for (int i = 0; i < _commList.size(); i++) {
365 AmbpiCommIF* comm = _commList[i];
369 if (false == comm->isContextMatch(context)) {
372 comm->event_cb(event, id, detail, user_data);