Update to use the aul api instead of app-manager
[platform/core/appfw/message-port.git] / src / IpcClient.cpp
1 //
2 // Open Service Platform
3 // Copyright (c) 2012 Samsung Electronics Co., Ltd.
4 //
5 // Licensed under the Apache License, Version 2.0 (the License);
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17
18 /**
19  * @file        IpcClient.cpp
20  * @brief       This is the implementation file for the IpcClient class.
21  *
22  */
23
24 #include <stdio.h>
25 #include <errno.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <sys/un.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <poll.h>
33 #include <pthread.h>
34 #include <fcntl.h>
35
36 #include <iostream>
37 #include <queue>
38 #include <map>
39
40 #include "message-port.h"
41 #include "message-port-log.h"
42
43 #include "IpcClient.h"
44 #include "IIpcClientEventListener.h"
45
46 using namespace IPC;
47 using namespace std;
48
49
50 IpcClient::IpcClient(void)
51         : __pReverseSource(NULL)
52         , __pMutex(NULL)
53         , __pListener(NULL)
54 {
55         __messageBuffer[0] = '\0';
56 }
57
58 IpcClient::~IpcClient(void)
59 {
60         int fd = 0;
61
62         if (__pReverseSource != NULL)
63         {
64                 g_source_destroy(__pReverseSource);
65                 g_source_unref(__pReverseSource);
66                 __pReverseSource = NULL;
67         }
68
69         while (__fds.size() > 0)
70         {
71                 fd = __fds.back();
72                 __fds.pop_back();
73
74                 close(fd);
75         }
76
77         pthread_mutex_destroy(__pMutex);
78 }
79
80 int
81 IpcClient::Construct(const string& serverName, const IIpcClientEventListener* pListener)
82 {
83         __name = serverName;
84         __pListener = const_cast <IIpcClientEventListener*>(pListener);
85
86         pthread_mutex_t* pMutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
87         if (pMutex == NULL)
88         {
89                 return MESSAGEPORT_ERROR_OUT_OF_MEMORY;
90         }
91
92         pthread_mutex_init(pMutex, NULL);
93
94         __pMutex = pMutex;
95
96         int ret = MakeConnection();
97         if (ret != 0)
98         {
99                 return ret;
100         }
101
102         if (__pListener)
103         {
104                 ret = MakeConnection(true);
105                 if (ret != 0)
106                 {
107                         return ret;
108                 }
109         }
110
111         return MESSAGEPORT_ERROR_NONE;
112
113 }
114
115 string
116 IpcClient::GetName(void) const
117 {
118         return __name;
119 }
120
121 struct HelloMessage
122 {
123         int reverse;
124 };
125
126 int
127 IpcClient::MakeConnection(bool forReverse)
128 {
129         int ret = 0;
130         int retry = 0;
131
132         size_t socketNameLength = 0;
133         string socketName;
134
135         socketName.append("/tmp/");
136         socketName.append(__name);
137
138         socketNameLength = socketName.size() + 1;
139
140         HelloMessage helloMessage = {0};
141
142         if (forReverse)
143         {
144                 helloMessage.reverse = 1;
145         }
146         else
147         {
148                 helloMessage.reverse = 0;
149         }
150
151         struct sockaddr_un server;
152
153         bzero(&server, sizeof(server));
154         server.sun_family = AF_UNIX;
155         strncpy(server.sun_path, socketName.c_str(), socketNameLength);
156         socklen_t serverLen = sizeof(server);
157
158         int client = socket(AF_UNIX, SOCK_STREAM, 0);
159         if (client < 0)
160         {
161                 _LOGE("Failed to create a socket : %s.", strerror(errno));
162                 return MESSAGEPORT_ERROR_IO_ERROR;
163         }
164
165         int flags = fcntl(client, F_GETFL, 0);
166         ret = fcntl(client, F_SETFL, flags | O_NONBLOCK);
167         if (ret != 0)
168         {
169                 _LOGE("Failed to set file status flags (%d, %s).", errno, strerror(errno));
170                 goto CATCH;
171         }
172
173         // Retry if the server is not ready
174         retry = 5;
175         while (retry > 0)
176         {
177                 ret = connect(client, (struct sockaddr*) &server, serverLen);
178                 if (ret < 0 && errno == ENOENT)
179                 {
180                         _LOGI("The server is not ready. %d", retry);
181
182                         usleep(1000 * 1000);
183
184                         --retry;
185                 }
186                 else
187                 {
188                         break;
189                 }
190         }
191
192         if (ret < 0)
193         {
194                 if (errno != EINPROGRESS)
195                 {
196                         _LOGE("Failed to connect to server(%s) : %d, %s", socketName.c_str(), errno, strerror(errno));
197                         goto CATCH;
198                 }
199
200                 fd_set rset;
201                 fd_set wset;
202                 struct timeval timeout;
203                 int length = 0;
204                 int error = 0;
205                 socklen_t socketLength = 0;
206
207                 FD_ZERO(&rset);
208                 FD_SET(client, &rset);
209                 wset = rset;
210                 timeout.tv_sec = 10;
211                 timeout.tv_usec = 0;
212
213                 while (true)
214                 {
215                         ret = select(client+1, &rset, &wset, NULL, &timeout);
216                         if (ret < 0)
217                         {
218                                 _LOGE("Failed to connect due to system error : %s.", strerror(errno));
219                                 if (errno != EINTR)
220                                 {
221                                         goto CATCH;
222                                 }
223
224                                 continue;
225                         }
226                         else if (ret == 0)
227                         {
228                                 _LOGE("Failed to connect due to timeout.");
229                                 goto CATCH;
230                         }
231                         else
232                         {
233                                 break;
234                         }
235                 }
236
237                 if (FD_ISSET(client, &rset) || FD_ISSET(client, &wset))
238                 {
239                         length = sizeof(error);
240                         ret = getsockopt(client, SOL_SOCKET, SO_ERROR, &error, &socketLength);
241                         if (ret < 0)
242                         {
243                                 _LOGE("Failed to connect to server(%s) : %s", socketName.c_str(), strerror(errno));
244                                 goto CATCH;
245                         }
246                 }
247                 else
248                 {
249                         _LOGE("Failed to connect due to system error.");
250                         goto CATCH;
251                 }
252         }
253
254         ret = fcntl(client, F_SETFL, flags);
255         if (ret < 0)
256         {
257                 _LOGE("Failed to set file status flags (%d, %s).", errno, strerror(errno));
258                 goto CATCH;
259         }
260
261         ret = write(client, &helloMessage, sizeof(helloMessage));
262         if (ret < 0)
263         {
264                 goto CATCH;
265         }
266
267         if (forReverse)
268         {
269                 GError* pGError = NULL;
270                 GSource* pGSource = NULL;
271
272                 GIOChannel* pChannel = g_io_channel_unix_new(client);
273                 GMainContext* pGMainContext = g_main_context_default();
274
275                 g_io_channel_set_encoding(pChannel, NULL, &pGError);
276                 g_io_channel_set_flags(pChannel, G_IO_FLAG_NONBLOCK, &pGError);
277                 g_io_channel_set_close_on_unref(pChannel, TRUE);
278
279                 pGSource = g_io_create_watch(pChannel, (GIOCondition) (G_IO_IN | G_IO_ERR | G_IO_NVAL | G_IO_HUP));
280                 g_source_set_callback(pGSource, (GSourceFunc) OnReadMessage, this, NULL);
281                 g_source_attach(pGSource, pGMainContext);
282
283                 g_io_channel_unref(pChannel);
284                 __pReverseSource = pGSource;
285         }
286         else
287         {
288                 ReleaseFd(client);
289         }
290
291         return MESSAGEPORT_ERROR_NONE;
292
293 CATCH:
294         if (client != -1)
295         {
296                 close(client);
297         }
298
299         return MESSAGEPORT_ERROR_IO_ERROR;
300
301 }
302
303 gboolean
304 IpcClient::OnReadMessage(GIOChannel* source, GIOCondition condition, gpointer data)
305 {
306         IpcClient* pIpcClient = (IpcClient*) data;
307
308         return pIpcClient->HandleReceivedMessage(source, condition);
309 }
310
311 gboolean
312 IpcClient::HandleReceivedMessage(GIOChannel* source, GIOCondition condition)
313 {
314         GError* pGError = NULL;
315         GIOStatus status;
316         IPC::Message* pMessage = NULL;
317
318         if (condition & G_IO_HUP)
319         {
320                 _LOGI("G_IO_HUP, the connection is closed.");
321
322                 g_source_destroy(__pReverseSource);
323                 g_source_unref(__pReverseSource);
324                 __pReverseSource = NULL;
325
326                 if (__pListener)
327                 {
328                         __pListener->OnIpcServerDisconnected(*this);
329                 }
330
331                 return FALSE;
332         }
333         else if (condition & G_IO_IN)
334         {
335                 gsize readSize = 0;
336                 const char* pStart = NULL;
337                 const char* pEnd = NULL;
338                 const char* pEndOfMessage = NULL;
339
340                 while (true)
341                 {
342                         pGError = NULL;
343                         status = g_io_channel_read_chars(source, (char*) __messageBuffer, __MAX_MESSAGE_BUFFER_SIZE, &readSize, &pGError);
344                         if (status == G_IO_STATUS_EOF || status == G_IO_STATUS_ERROR)
345                         {
346                                 if (status == G_IO_STATUS_EOF)
347                                 {
348                                         _LOGI("G_IO_STATUS_EOF, the connection is closed.");
349                                 }
350                                 else
351                                 {
352                                         _LOGI("G_IO_STATUS_ERROR, the connection is closed.");
353                                 }
354
355                                 pGError = NULL;
356
357                                 g_io_channel_shutdown(source, FALSE, &pGError);
358
359                                 g_source_destroy(__pReverseSource);
360                                 g_source_unref(__pReverseSource);
361                                 __pReverseSource = NULL;
362
363                                 if (__pListener)
364                                 {
365                                         __pListener->OnIpcServerDisconnected(*this);
366                                 }
367
368                                 return FALSE;
369                         }
370
371                         if (readSize == 0)
372                         {
373                                 break;
374                         }
375
376                         if (__pending.empty())
377                         {
378                                 pStart = __messageBuffer;
379                                 pEnd = pStart + readSize;
380                         }
381                         else
382                         {
383                                 __pending.append(__messageBuffer, readSize);
384                                 pStart = __pending.data();
385                                 pEnd = pStart + __pending.size();
386                         }
387
388                         while (true)
389                         {
390                                 pEndOfMessage = IPC::Message::FindNext(pStart, pEnd);
391                                 if (pEndOfMessage == NULL)
392                                 {
393                                         __pending.assign(pStart, pEnd - pStart);
394                                         break;
395                                 }
396
397                                 pMessage = new (std::nothrow) IPC::Message(pStart, pEndOfMessage - pStart);
398                                 if (pMessage == NULL)
399                                 {
400                                         _LOGE("The memory is insufficient");
401                                         return MESSAGEPORT_ERROR_OUT_OF_MEMORY;
402                                 }
403
404                                 if (__pListener)
405                                 {
406                                         __pListener->OnIpcResponseReceived(*this, *pMessage);
407                                 }
408
409                                 delete pMessage;
410
411                                 pStart = pEndOfMessage;
412                         }
413                 }
414         }
415         else
416         {
417                 // empty statement.
418         }
419
420         return TRUE;
421 }
422
423 int
424 IpcClient::AcquireFd(void)
425 {
426         int ret = 0;
427         int fd = -1;
428
429         while (fd == -1)
430         {
431                 pthread_mutex_lock(__pMutex);
432                 if (__fds.size() == 0)
433                 {
434                         pthread_mutex_unlock(__pMutex);
435                         ret = MakeConnection(false);
436                         if (ret < 0)
437                         {
438                                 _LOGE("Failed to connect to the server.");
439                                 return MESSAGEPORT_ERROR_IO_ERROR;
440                         }
441
442                         continue;
443                 }
444
445                 fd = __fds.back();
446                 __fds.pop_back();
447
448                 pthread_mutex_unlock(__pMutex);
449         }
450
451         return fd;
452 }
453
454 void
455 IpcClient::ReleaseFd(int fd)
456 {
457         pthread_mutex_lock(__pMutex);
458
459         __fds.push_back(fd);
460
461         pthread_mutex_unlock(__pMutex);
462 }
463
464 int
465 IpcClient::SendAsync(IPC::Message* pMessage)
466 {
467         char* pData = (char*) pMessage->data();
468         int remain = pMessage->size();
469         int fd = AcquireFd();
470         if (fd == -1)
471         {
472                 _LOGE("Failed to get fd.");
473                 return MESSAGEPORT_ERROR_IO_ERROR;
474         }
475
476         int written = 0;
477         while (remain > 0)
478         {
479                 written = write(fd, (char*) pData, remain);
480                 if (written < 0)
481                 {
482                         _LOGE("Failed to send a request: %d, %s", errno, strerror(errno));
483
484                         ReleaseFd(fd);
485                         return MESSAGEPORT_ERROR_IO_ERROR;
486                 }
487
488                 remain -= written;
489                 pData += written;
490         }
491
492         ReleaseFd(fd);
493
494         return MESSAGEPORT_ERROR_NONE;
495 }
496
497 int
498 IpcClient::SendSync(IPC::Message* pMessage)
499 {
500         IPC::Message* pReply = NULL;
501         MessageReplyDeserializer* pReplyDeserializer = NULL;
502         IPC::SyncMessage* pSyncMessage = dynamic_cast <IPC::SyncMessage*>(pMessage);
503         if (pSyncMessage == NULL)
504         {
505                 _LOGE("pMessage is not a sync message.");
506                 return MESSAGEPORT_ERROR_IO_ERROR;
507         }
508
509         int messageId = SyncMessage::GetMessageId(*pSyncMessage);
510
511         int fd = AcquireFd();
512         if (fd < 0)
513         {
514                 _LOGE("Failed to get fd.");
515                 return MESSAGEPORT_ERROR_IO_ERROR;
516         }
517
518         char* pData = (char*) pSyncMessage->data();
519         int remain = pSyncMessage->size();
520         int written = 0;
521
522         while (remain > 0)
523         {
524                 written = write(fd, (char*) pData, remain);
525                 if (written < 0)
526                 {
527                         _LOGE("Failed to send a request: %d, %s", errno, strerror(errno));
528
529                         ReleaseFd(fd);
530                         return MESSAGEPORT_ERROR_IO_ERROR;
531                 }
532
533                 remain -= written;
534                 pData += written;
535         }
536
537         // Wait reply
538         struct pollfd pfd;
539
540         pfd.fd = fd;
541         pfd.events = POLLIN | POLLRDHUP;
542         pfd.revents = 0;
543
544         char buffer[1024];
545         std::string message;
546         int readSize = 0;
547         char* pEndOfMessage = NULL;
548
549         int ret = 0;
550
551         while (true)
552         {
553                 ret = poll(&pfd, 1, -1);
554                 if (ret < 0)
555                 {
556                         if (errno == EINTR)
557                         {
558                                 continue;
559                         }
560
561                         _LOGE("Failed to poll (%d, %s).", errno, strerror(errno));
562
563                         ReleaseFd(fd);
564                         return MESSAGEPORT_ERROR_IO_ERROR;
565                 }
566
567                 if (pfd.revents & POLLRDHUP)
568                 {
569                         _LOGE("POLLRDHUP");
570
571                         ReleaseFd(fd);
572                         return MESSAGEPORT_ERROR_IO_ERROR;
573                 }
574
575                 if (pfd.revents & POLLIN)
576                 {
577                         readSize = read(fd, buffer, 1024);
578                 }
579
580                 if (readSize > 0)
581                 {
582                         message.append(buffer, readSize);
583                 }
584
585                 pEndOfMessage = (char*) IPC::Message::FindNext(message.data(), message.data() + message.size());
586                 if (pEndOfMessage)
587                 {
588                         pReply = new (std::nothrow) IPC::Message(message.data(), pEndOfMessage - message.data());
589                         if (pReply == NULL)
590                         {
591                                 _LOGE("The memory is insufficient.");
592
593                                 ReleaseFd(fd);
594                                 return MESSAGEPORT_ERROR_OUT_OF_MEMORY;
595                         }
596
597                         break;
598                 }
599         }
600
601         pReplyDeserializer = pSyncMessage->GetReplyDeserializer();
602         pReplyDeserializer->SerializeOutputParameters(*pReply);
603
604         delete pReply;
605         delete pReplyDeserializer;
606
607         ReleaseFd(fd);
608
609         return MESSAGEPORT_ERROR_NONE;
610 }
611
612 int
613 IpcClient::Send(IPC::Message* pMessage)
614 {
615         int ret = 0;
616
617         if (pMessage->is_sync())
618         {
619                 ret = SendSync(pMessage);
620         }
621         else
622         {
623                 ret = SendAsync(pMessage);
624         }
625
626         return ret;
627 }
628
629 int
630 IpcClient::SendRequest(IPC::Message* pMessage)
631 {
632         return Send(pMessage);
633 }
634
635 int
636 IpcClient::SendRequest(const IPC::Message& message)
637 {
638         int ret = 0;
639
640         if (message.is_sync())
641         {
642                 ret = SendSync(const_cast<IPC::Message*>(&message));
643         }
644         else
645         {
646                 ret = SendAsync(const_cast<IPC::Message*>(&message));
647         }
648
649         return ret;
650 }
651