2 // Open Service Platform
3 // Copyright (c) 2012 Samsung Electronics Co., Ltd.
5 // Licensed under the Apache License, Version 2.0 (the License);
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
20 * @brief This is the implementation file for the IpcClient class.
26 #include <sys/types.h>
27 #include <sys/socket.h>
40 #include "message-port.h"
41 #include "message-port-log.h"
43 #include "IpcClient.h"
44 #include "IIpcClientEventListener.h"
50 IpcClient::IpcClient(void)
51 : __pReverseSource(NULL)
55 __messageBuffer[0] = '\0';
58 IpcClient::~IpcClient(void)
62 if (__pReverseSource != NULL)
64 g_source_destroy(__pReverseSource);
65 g_source_unref(__pReverseSource);
66 __pReverseSource = NULL;
69 while (__fds.size() > 0)
77 pthread_mutex_destroy(__pMutex);
81 IpcClient::Construct(const string& serverName, const IIpcClientEventListener* pListener)
84 __pListener = const_cast <IIpcClientEventListener*>(pListener);
86 pthread_mutex_t* pMutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
89 return MESSAGEPORT_ERROR_OUT_OF_MEMORY;
92 pthread_mutex_init(pMutex, NULL);
96 int ret = MakeConnection();
104 ret = MakeConnection(true);
111 return MESSAGEPORT_ERROR_NONE;
116 IpcClient::GetName(void) const
127 IpcClient::MakeConnection(bool forReverse)
132 size_t socketNameLength = 0;
135 socketName.append("/tmp/");
136 socketName.append(__name);
138 socketNameLength = socketName.size() + 1;
140 HelloMessage helloMessage = {0};
144 helloMessage.reverse = 1;
148 helloMessage.reverse = 0;
151 struct sockaddr_un server;
153 bzero(&server, sizeof(server));
154 server.sun_family = AF_UNIX;
155 strncpy(server.sun_path, socketName.c_str(), socketNameLength);
156 socklen_t serverLen = sizeof(server);
158 int client = socket(AF_UNIX, SOCK_STREAM, 0);
161 _LOGE("Failed to create a socket : %s.", strerror(errno));
162 return MESSAGEPORT_ERROR_IO_ERROR;
165 int flags = fcntl(client, F_GETFL, 0);
166 ret = fcntl(client, F_SETFL, flags | O_NONBLOCK);
169 _LOGE("Failed to set file status flags (%d, %s).", errno, strerror(errno));
173 // Retry if the server is not ready
177 ret = connect(client, (struct sockaddr*) &server, serverLen);
178 if (ret < 0 && errno == ENOENT)
180 _LOGI("The server is not ready. %d", retry);
194 if (errno != EINPROGRESS)
196 _LOGE("Failed to connect to server(%s) : %d, %s", socketName.c_str(), errno, strerror(errno));
202 struct timeval timeout;
205 socklen_t socketLength = 0;
208 FD_SET(client, &rset);
215 ret = select(client+1, &rset, &wset, NULL, &timeout);
218 _LOGE("Failed to connect due to system error : %s.", strerror(errno));
228 _LOGE("Failed to connect due to timeout.");
237 if (FD_ISSET(client, &rset) || FD_ISSET(client, &wset))
239 length = sizeof(error);
240 ret = getsockopt(client, SOL_SOCKET, SO_ERROR, &error, &socketLength);
243 _LOGE("Failed to connect to server(%s) : %s", socketName.c_str(), strerror(errno));
249 _LOGE("Failed to connect due to system error.");
254 ret = fcntl(client, F_SETFL, flags);
257 _LOGE("Failed to set file status flags (%d, %s).", errno, strerror(errno));
261 ret = write(client, &helloMessage, sizeof(helloMessage));
269 GError* pGError = NULL;
270 GSource* pGSource = NULL;
272 GIOChannel* pChannel = g_io_channel_unix_new(client);
273 GMainContext* pGMainContext = g_main_context_default();
275 g_io_channel_set_encoding(pChannel, NULL, &pGError);
276 g_io_channel_set_flags(pChannel, G_IO_FLAG_NONBLOCK, &pGError);
277 g_io_channel_set_close_on_unref(pChannel, TRUE);
279 pGSource = g_io_create_watch(pChannel, (GIOCondition) (G_IO_IN | G_IO_ERR | G_IO_NVAL | G_IO_HUP));
280 g_source_set_callback(pGSource, (GSourceFunc) OnReadMessage, this, NULL);
281 g_source_attach(pGSource, pGMainContext);
283 g_io_channel_unref(pChannel);
284 __pReverseSource = pGSource;
291 return MESSAGEPORT_ERROR_NONE;
299 return MESSAGEPORT_ERROR_IO_ERROR;
304 IpcClient::OnReadMessage(GIOChannel* source, GIOCondition condition, gpointer data)
306 IpcClient* pIpcClient = (IpcClient*) data;
308 return pIpcClient->HandleReceivedMessage(source, condition);
312 IpcClient::HandleReceivedMessage(GIOChannel* source, GIOCondition condition)
314 GError* pGError = NULL;
316 IPC::Message* pMessage = NULL;
318 if (condition & G_IO_HUP)
320 _LOGI("G_IO_HUP, the connection is closed.");
322 g_source_destroy(__pReverseSource);
323 g_source_unref(__pReverseSource);
324 __pReverseSource = NULL;
328 __pListener->OnIpcServerDisconnected(*this);
333 else if (condition & G_IO_IN)
336 const char* pStart = NULL;
337 const char* pEnd = NULL;
338 const char* pEndOfMessage = NULL;
343 status = g_io_channel_read_chars(source, (char*) __messageBuffer, __MAX_MESSAGE_BUFFER_SIZE, &readSize, &pGError);
344 if (status == G_IO_STATUS_EOF || status == G_IO_STATUS_ERROR)
346 if (status == G_IO_STATUS_EOF)
348 _LOGI("G_IO_STATUS_EOF, the connection is closed.");
352 _LOGI("G_IO_STATUS_ERROR, the connection is closed.");
357 g_io_channel_shutdown(source, FALSE, &pGError);
359 g_source_destroy(__pReverseSource);
360 g_source_unref(__pReverseSource);
361 __pReverseSource = NULL;
365 __pListener->OnIpcServerDisconnected(*this);
376 if (__pending.empty())
378 pStart = __messageBuffer;
379 pEnd = pStart + readSize;
383 __pending.append(__messageBuffer, readSize);
384 pStart = __pending.data();
385 pEnd = pStart + __pending.size();
390 pEndOfMessage = IPC::Message::FindNext(pStart, pEnd);
391 if (pEndOfMessage == NULL)
393 __pending.assign(pStart, pEnd - pStart);
397 pMessage = new (std::nothrow) IPC::Message(pStart, pEndOfMessage - pStart);
398 if (pMessage == NULL)
400 _LOGE("The memory is insufficient");
401 return MESSAGEPORT_ERROR_OUT_OF_MEMORY;
406 __pListener->OnIpcResponseReceived(*this, *pMessage);
411 pStart = pEndOfMessage;
424 IpcClient::AcquireFd(void)
431 pthread_mutex_lock(__pMutex);
432 if (__fds.size() == 0)
434 pthread_mutex_unlock(__pMutex);
435 ret = MakeConnection(false);
438 _LOGE("Failed to connect to the server.");
439 return MESSAGEPORT_ERROR_IO_ERROR;
448 pthread_mutex_unlock(__pMutex);
455 IpcClient::ReleaseFd(int fd)
457 pthread_mutex_lock(__pMutex);
461 pthread_mutex_unlock(__pMutex);
465 IpcClient::SendAsync(IPC::Message* pMessage)
467 char* pData = (char*) pMessage->data();
468 int remain = pMessage->size();
469 int fd = AcquireFd();
472 _LOGE("Failed to get fd.");
473 return MESSAGEPORT_ERROR_IO_ERROR;
479 written = write(fd, (char*) pData, remain);
482 _LOGE("Failed to send a request: %d, %s", errno, strerror(errno));
485 return MESSAGEPORT_ERROR_IO_ERROR;
494 return MESSAGEPORT_ERROR_NONE;
498 IpcClient::SendSync(IPC::Message* pMessage)
500 IPC::Message* pReply = NULL;
501 MessageReplyDeserializer* pReplyDeserializer = NULL;
502 IPC::SyncMessage* pSyncMessage = dynamic_cast <IPC::SyncMessage*>(pMessage);
503 if (pSyncMessage == NULL)
505 _LOGE("pMessage is not a sync message.");
506 return MESSAGEPORT_ERROR_IO_ERROR;
509 int messageId = SyncMessage::GetMessageId(*pSyncMessage);
511 int fd = AcquireFd();
514 _LOGE("Failed to get fd.");
515 return MESSAGEPORT_ERROR_IO_ERROR;
518 char* pData = (char*) pSyncMessage->data();
519 int remain = pSyncMessage->size();
524 written = write(fd, (char*) pData, remain);
527 _LOGE("Failed to send a request: %d, %s", errno, strerror(errno));
530 return MESSAGEPORT_ERROR_IO_ERROR;
541 pfd.events = POLLIN | POLLRDHUP;
547 char* pEndOfMessage = NULL;
553 ret = poll(&pfd, 1, -1);
561 _LOGE("Failed to poll (%d, %s).", errno, strerror(errno));
564 return MESSAGEPORT_ERROR_IO_ERROR;
567 if (pfd.revents & POLLRDHUP)
572 return MESSAGEPORT_ERROR_IO_ERROR;
575 if (pfd.revents & POLLIN)
577 readSize = read(fd, buffer, 1024);
582 message.append(buffer, readSize);
585 pEndOfMessage = (char*) IPC::Message::FindNext(message.data(), message.data() + message.size());
588 pReply = new (std::nothrow) IPC::Message(message.data(), pEndOfMessage - message.data());
591 _LOGE("The memory is insufficient.");
594 return MESSAGEPORT_ERROR_OUT_OF_MEMORY;
601 pReplyDeserializer = pSyncMessage->GetReplyDeserializer();
602 pReplyDeserializer->SerializeOutputParameters(*pReply);
605 delete pReplyDeserializer;
609 return MESSAGEPORT_ERROR_NONE;
613 IpcClient::Send(IPC::Message* pMessage)
617 if (pMessage->is_sync())
619 ret = SendSync(pMessage);
623 ret = SendAsync(pMessage);
630 IpcClient::SendRequest(IPC::Message* pMessage)
632 return Send(pMessage);
636 IpcClient::SendRequest(const IPC::Message& message)
640 if (message.is_sync())
642 ret = SendSync(const_cast<IPC::Message*>(&message));
646 ret = SendAsync(const_cast<IPC::Message*>(&message));