2 // Open Service Platform
3 // Copyright (c) 2012 Samsung Electronics Co., Ltd.
5 // Licensed under the Apache License, Version 2.0 (the License);
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
20 * @brief This is the implementation file for the IpcClient class.
26 #include <sys/types.h>
27 #include <sys/socket.h>
40 #include "message-port-log.h"
42 #include "IpcClient.h"
43 #include "IIpcClientEventListener.h"
49 IpcClient::IpcClient(void)
50 : __pReverseSource(NULL)
54 __messageBuffer[0] = '\0';
57 IpcClient::~IpcClient(void)
61 if (__pReverseSource != NULL)
63 g_source_destroy(__pReverseSource);
64 g_source_unref(__pReverseSource);
65 __pReverseSource = NULL;
68 while (__fds.size() > 0)
76 pthread_mutex_destroy(__pMutex);
80 IpcClient::Construct(const string& serverName, const IIpcClientEventListener* pListener)
83 __pListener = const_cast <IIpcClientEventListener*>(pListener);
85 pthread_mutex_t* pMutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
91 pthread_mutex_init(pMutex, NULL);
95 int ret = MakeConnection();
103 ret = MakeConnection(true);
115 IpcClient::GetName(void) const
127 IpcClient::MakeConnection(bool forReverse)
131 size_t socketNameLength = 0;
134 socketName.append("/tmp/");
135 socketName.append(__name);
137 socketNameLength = socketName.size() + 1;
139 HelloMessage helloMessage = {0, 0};
141 helloMessage.pid = getpid();
144 helloMessage.reverse = 1;
148 helloMessage.reverse = 0;
151 struct sockaddr_un server;
153 bzero(&server, sizeof(server));
154 server.sun_family = AF_UNIX;
155 strncpy(server.sun_path, socketName.c_str(), socketNameLength);
156 socklen_t serverLen = sizeof(server);
158 int client = socket(AF_UNIX, SOCK_STREAM, 0);
161 _LOGE("Failed to create a socket : %s.", strerror(errno));
165 int flags = fcntl(client, F_GETFL, 0);
166 ret = fcntl(client, F_SETFL, flags | O_NONBLOCK);
169 _LOGE("Failed to set file status flags (%d, %s).", errno, strerror(errno));
173 ret = connect(client, (struct sockaddr*) &server, serverLen);
176 if (errno != EINPROGRESS)
178 _LOGE("Failed to connect to server(%s) : %s", socketName.c_str(), strerror(errno));
184 struct timeval timeout;
187 socklen_t socketLength = 0;
190 FD_SET(client, &rset);
197 ret = select(client+1, &rset, &wset, NULL, &timeout);
200 _LOGE("Failed to connect due to system error : %s.", strerror(errno));
210 _LOGE("Failed to connect due to timeout.");
219 if (FD_ISSET(client, &rset) || FD_ISSET(client, &wset))
221 length = sizeof(error);
222 ret = getsockopt(client, SOL_SOCKET, SO_ERROR, &error, &socketLength);
225 _LOGE("Failed to connect to server(%s) : %s", socketName.c_str(), strerror(errno));
231 _LOGE("Failed to connect due to system error.");
236 ret = fcntl(client, F_SETFL, flags);
239 _LOGE("Failed to set file status flags (%d, %s).", errno, strerror(errno));
243 ret = write(client, &helloMessage, sizeof(helloMessage));
251 GError* pGError = NULL;
252 GSource* pGSource = NULL;
254 GIOChannel* pChannel = g_io_channel_unix_new(client);
255 GMainContext* pGMainContext = g_main_context_default();
257 g_io_channel_set_encoding(pChannel, NULL, &pGError);
258 g_io_channel_set_flags(pChannel, G_IO_FLAG_NONBLOCK, &pGError);
259 g_io_channel_set_close_on_unref(pChannel, TRUE);
261 pGSource = g_io_create_watch(pChannel, (GIOCondition) (G_IO_IN | G_IO_ERR | G_IO_NVAL | G_IO_HUP));
262 g_source_set_callback(pGSource, (GSourceFunc) OnReadMessage, this, NULL);
263 g_source_attach(pGSource, pGMainContext);
265 g_io_channel_unref(pChannel);
266 __pReverseSource = pGSource;
286 IpcClient::OnReadMessage(GIOChannel* source, GIOCondition condition, gpointer data)
288 IpcClient* pIpcClient = (IpcClient*) data;
290 return pIpcClient->HandleReceivedMessage(source, condition);
294 IpcClient::HandleReceivedMessage(GIOChannel* source, GIOCondition condition)
296 GError* pGError = NULL;
298 IPC::Message* pMessage = NULL;
300 if (condition & G_IO_HUP)
302 _LOGD("G_IO_HUP, the connection is closed.");
304 g_source_destroy(__pReverseSource);
305 g_source_unref(__pReverseSource);
306 __pReverseSource = NULL;
310 __pListener->OnIpcServerDisconnected(*this);
315 else if (condition & G_IO_IN)
318 const char* pStart = NULL;
319 const char* pEnd = NULL;
320 const char* pEndOfMessage = NULL;
325 status = g_io_channel_read_chars(source, (char*) __messageBuffer, __MAX_MESSAGE_BUFFER_SIZE, &readSize, &pGError);
326 if (status == G_IO_STATUS_EOF || status == G_IO_STATUS_ERROR)
328 if (status == G_IO_STATUS_EOF)
330 _LOGD("G_IO_STATUS_EOF, the connection is closed.");
334 _LOGD("G_IO_STATUS_ERROR, the connection is closed.");
339 g_io_channel_shutdown(source, FALSE, &pGError);
341 g_source_destroy(__pReverseSource);
342 g_source_unref(__pReverseSource);
343 __pReverseSource = NULL;
347 __pListener->OnIpcServerDisconnected(*this);
358 if (__pending.empty())
360 pStart = __messageBuffer;
361 pEnd = pStart + readSize;
365 __pending.append(__messageBuffer, readSize);
366 pStart = __pending.data();
367 pEnd = pStart + __pending.size();
372 pEndOfMessage = IPC::Message::FindNext(pStart, pEnd);
373 if (pEndOfMessage == NULL)
375 __pending.assign(pStart, pEnd - pStart);
379 pMessage = new (std::nothrow) IPC::Message(pStart, pEndOfMessage - pStart);
380 if (pMessage == NULL)
382 _LOGE("The memory is insufficient");
388 __pListener->OnIpcResponseReceived(*this, *pMessage);
393 pStart = pEndOfMessage;
406 IpcClient::AcquireFd(void)
413 pthread_mutex_lock(__pMutex);
414 if (__fds.size() == 0)
416 pthread_mutex_unlock(__pMutex);
417 ret = MakeConnection(false);
420 _LOGE("Failed to connect to the server.");
430 pthread_mutex_unlock(__pMutex);
437 IpcClient::ReleaseFd(int fd)
439 pthread_mutex_lock(__pMutex);
443 pthread_mutex_unlock(__pMutex);
447 IpcClient::SendAsync(IPC::Message* pMessage)
449 char* pData = (char*) pMessage->data();
450 int remain = pMessage->size();
451 int fd = AcquireFd();
454 _LOGE("Failed to get fd.");
461 written = write(fd, (char*) pData, remain);
472 IpcClient::SendSync(IPC::Message* pMessage)
474 IPC::Message* pReply = NULL;
475 MessageReplyDeserializer* pReplyDeserializer = NULL;
476 IPC::SyncMessage* pSyncMessage = dynamic_cast <IPC::SyncMessage*>(pMessage);
477 if (pSyncMessage == NULL)
479 _LOGE("pMessage is not a sync message.");
483 int messageId = SyncMessage::GetMessageId(*pSyncMessage);
485 int fd = AcquireFd();
488 _LOGE("Failed to get fd.");
492 char* pData = (char*) pSyncMessage->data();
493 int remain = pSyncMessage->size();
498 written = write(fd, (char*) pData, remain);
507 pfd.events = POLLIN | POLLRDHUP;
513 char* pEndOfMessage = NULL;
519 if (pfd.revents & POLLRDHUP)
524 if (pfd.revents & POLLIN)
526 readSize = read(fd, buffer, 1024);
529 message.append(buffer, readSize);
531 pEndOfMessage = (char*) IPC::Message::FindNext(message.data(), message.data() + message.size());
534 pReply = new (std::nothrow) IPC::Message(message.data(), pEndOfMessage - message.data());
537 _LOGE("The memory is insufficient.");
545 pReplyDeserializer = pSyncMessage->GetReplyDeserializer();
546 pReplyDeserializer->SerializeOutputParameters(*pReply);
549 delete pReplyDeserializer;
557 IpcClient::Send(IPC::Message* pMessage)
561 if (pMessage->is_sync())
563 ret = SendSync(pMessage);
567 ret = SendAsync(pMessage);
574 IpcClient::SendRequest(IPC::Message* pMessage)
576 return Send(pMessage);
580 IpcClient::SendRequest(const IPC::Message& message)
584 if (message.is_sync())
586 ret = SendSync(const_cast<IPC::Message*>(&message));
590 ret = SendAsync(const_cast<IPC::Message*>(&message));