2 // Open Service Platform
3 // Copyright (c) 2012 Samsung Electronics Co., Ltd.
5 // Licensed under the Apache License, Version 2.0 (the License);
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
20 * @brief This is the implementation file for the IpcClient class.
26 #include <sys/types.h>
27 #include <sys/socket.h>
40 #include "message-port-log.h"
42 #include "IpcClient.h"
43 #include "IIpcClientEventListener.h"
49 IpcClient::IpcClient(void)
50 : __pReverseSource(NULL)
54 __messageBuffer[0] = '\0';
57 IpcClient::~IpcClient(void)
61 if (__pReverseSource != NULL)
63 g_source_destroy(__pReverseSource);
64 g_source_unref(__pReverseSource);
65 __pReverseSource = NULL;
68 while (__fds.size() > 0)
76 pthread_mutex_destroy(__pMutex);
80 IpcClient::Construct(const string& serverName, const IIpcClientEventListener* pListener)
83 __pListener = const_cast <IIpcClientEventListener*>(pListener);
85 pthread_mutex_t* pMutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
91 pthread_mutex_init(pMutex, NULL);
95 int ret = MakeConnection();
103 ret = MakeConnection(true);
115 IpcClient::GetName(void) const
127 IpcClient::MakeConnection(bool forReverse)
131 size_t socketNameLength = 0;
134 socketName.append("/tmp/");
135 socketName.append(__name);
137 socketNameLength = socketName.size() + 1;
139 HelloMessage helloMessage = {0, false};
141 helloMessage.pid = getpid();
142 helloMessage.reverse = forReverse;
144 struct sockaddr_un server;
146 bzero(&server, sizeof(server));
147 server.sun_family = AF_UNIX;
148 strncpy(server.sun_path, socketName.c_str(), socketNameLength);
149 socklen_t serverLen = sizeof(server);
151 int client = socket(AF_UNIX, SOCK_STREAM, 0);
154 _LOGE("Failed to create a socket : %s.", strerror(errno));
158 int flags = fcntl(client, F_GETFL, 0);
159 ret = fcntl(client, F_SETFL, flags | O_NONBLOCK);
162 _LOGE("Failed to set file status flags (%d, %s).", errno, strerror(errno));
166 ret = connect(client, (struct sockaddr*) &server, serverLen);
169 if (errno != EINPROGRESS)
171 _LOGE("Failed to connect to server(%s) : %s", socketName.c_str(), strerror(errno));
177 struct timeval timeout;
180 socklen_t socketLength = 0;
183 FD_SET(client, &rset);
190 ret = select(client+1, &rset, &wset, NULL, &timeout);
193 _LOGE("Failed to connect due to system error : %s.", strerror(errno));
203 _LOGE("Failed to connect due to timeout.");
212 if (FD_ISSET(client, &rset) || FD_ISSET(client, &wset))
214 length = sizeof(error);
215 ret = getsockopt(client, SOL_SOCKET, SO_ERROR, &error, &socketLength);
218 _LOGE("Failed to connect to server(%s) : %s", socketName.c_str(), strerror(errno));
224 _LOGE("Failed to connect due to system error.");
229 ret = fcntl(client, F_SETFL, flags);
232 _LOGE("Failed to set file status flags (%d, %s).", errno, strerror(errno));
236 ret = write(client, &helloMessage, sizeof(helloMessage));
244 GError* pGError = NULL;
245 GSource* pGSource = NULL;
247 GIOChannel* pChannel = g_io_channel_unix_new(client);
248 GMainContext* pGMainContext = g_main_context_default();
250 g_io_channel_set_encoding(pChannel, NULL, &pGError);
251 g_io_channel_set_flags(pChannel, G_IO_FLAG_NONBLOCK, &pGError);
252 g_io_channel_set_close_on_unref(pChannel, TRUE);
254 pGSource = g_io_create_watch(pChannel, (GIOCondition) (G_IO_IN | G_IO_ERR | G_IO_NVAL | G_IO_HUP));
255 g_source_set_callback(pGSource, (GSourceFunc) OnReadMessage, this, NULL);
256 g_source_attach(pGSource, pGMainContext);
258 g_io_channel_unref(pChannel);
259 __pReverseSource = pGSource;
279 IpcClient::OnReadMessage(GIOChannel* source, GIOCondition condition, gpointer data)
281 IpcClient* pIpcClient = (IpcClient*) data;
283 return pIpcClient->HandleReceivedMessage(source, condition);
287 IpcClient::HandleReceivedMessage(GIOChannel* source, GIOCondition condition)
289 GError* pGError = NULL;
291 IPC::Message* pMessage = NULL;
293 if (condition & G_IO_HUP)
295 _LOGD("G_IO_HUP, the connection is closed.");
297 g_source_destroy(__pReverseSource);
298 g_source_unref(__pReverseSource);
299 __pReverseSource = NULL;
303 __pListener->OnIpcServerDisconnected(*this);
308 else if (condition & G_IO_IN)
311 const char* pStart = NULL;
312 const char* pEnd = NULL;
313 const char* pEndOfMessage = NULL;
318 status = g_io_channel_read_chars(source, (char*) __messageBuffer, __MAX_MESSAGE_BUFFER_SIZE, &readSize, &pGError);
319 if (status == G_IO_STATUS_EOF || status == G_IO_STATUS_ERROR)
321 if (status == G_IO_STATUS_EOF)
323 _LOGD("G_IO_STATUS_EOF, the connection is closed.");
327 _LOGD("G_IO_STATUS_ERROR, the connection is closed.");
332 g_io_channel_shutdown(source, FALSE, &pGError);
334 g_source_destroy(__pReverseSource);
335 g_source_unref(__pReverseSource);
336 __pReverseSource = NULL;
340 __pListener->OnIpcServerDisconnected(*this);
351 if (__pending.empty())
353 pStart = __messageBuffer;
354 pEnd = pStart + readSize;
358 __pending.append(__messageBuffer, readSize);
359 pStart = __pending.data();
360 pEnd = pStart + __pending.size();
365 pEndOfMessage = IPC::Message::FindNext(pStart, pEnd);
366 if (pEndOfMessage == NULL)
368 __pending.assign(pStart, pEnd - pStart);
372 pMessage = new (std::nothrow) IPC::Message(pStart, pEndOfMessage - pStart);
373 if (pMessage == NULL)
375 _LOGE("The memory is insufficient");
381 __pListener->OnIpcResponseReceived(*this, *pMessage);
386 pStart = pEndOfMessage;
399 IpcClient::AcquireFd(void)
406 pthread_mutex_lock(__pMutex);
407 if (__fds.size() == 0)
409 pthread_mutex_unlock(__pMutex);
410 ret = MakeConnection(false);
413 _LOGE("Failed to connect to the server.");
423 pthread_mutex_unlock(__pMutex);
430 IpcClient::ReleaseFd(int fd)
432 pthread_mutex_lock(__pMutex);
436 pthread_mutex_unlock(__pMutex);
440 IpcClient::SendAsync(IPC::Message* pMessage)
442 char* pData = (char*) pMessage->data();
443 int remain = pMessage->size();
444 int fd = AcquireFd();
447 _LOGE("Failed to get fd.");
454 written = write(fd, (char*) pData, remain);
465 IpcClient::SendSync(IPC::Message* pMessage)
467 IPC::Message* pReply = NULL;
468 MessageReplyDeserializer* pReplyDeserializer = NULL;
469 IPC::SyncMessage* pSyncMessage = dynamic_cast <IPC::SyncMessage*>(pMessage);
470 if (pSyncMessage == NULL)
472 _LOGE("pMessage is not a sync message.");
476 int messageId = SyncMessage::GetMessageId(*pSyncMessage);
478 int fd = AcquireFd();
481 _LOGE("Failed to get fd.");
485 char* pData = (char*) pSyncMessage->data();
486 int remain = pSyncMessage->size();
491 written = write(fd, (char*) pData, remain);
500 pfd.events = POLLIN | POLLRDHUP;
506 char* pEndOfMessage = NULL;
512 if (pfd.revents & POLLRDHUP)
517 if (pfd.revents & POLLIN)
519 readSize = read(fd, buffer, 1024);
522 message.append(buffer, readSize);
524 pEndOfMessage = (char*) IPC::Message::FindNext(message.data(), message.data() + message.size());
527 pReply = new (std::nothrow) IPC::Message(message.data(), pEndOfMessage - message.data());
530 _LOGE("The memory is insufficient.");
538 pReplyDeserializer = pSyncMessage->GetReplyDeserializer();
539 pReplyDeserializer->SerializeOutputParameters(*pReply);
542 delete pReplyDeserializer;
550 IpcClient::Send(IPC::Message* pMessage)
554 if (pMessage->is_sync())
556 ret = SendSync(pMessage);
560 ret = SendAsync(pMessage);
567 IpcClient::SendRequest(IPC::Message* pMessage)
569 return Send(pMessage);
573 IpcClient::SendRequest(const IPC::Message& message)
577 if (message.is_sync())
579 ret = SendSync(const_cast<IPC::Message*>(&message));
583 ret = SendAsync(const_cast<IPC::Message*>(&message));