Remove uninitialised bytes
[platform/core/appfw/message-port.git] / src / IpcClient.cpp
1 //
2 // Open Service Platform
3 // Copyright (c) 2012 Samsung Electronics Co., Ltd.
4 //
5 // Licensed under the Apache License, Version 2.0 (the License);
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17
18 /**
19  * @file        IpcClient.cpp
20  * @brief       This is the implementation file for the IpcClient class.
21  *
22  */
23
24 #include <stdio.h>
25 #include <errno.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <sys/un.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <poll.h>
33 #include <pthread.h>
34 #include <fcntl.h>
35
36 #include <iostream>
37 #include <queue>
38 #include <map>
39
40 #include "message-port-log.h"
41
42 #include "IpcClient.h"
43 #include "IIpcClientEventListener.h"
44
45 using namespace IPC;
46 using namespace std;
47
48
49 IpcClient::IpcClient(void)
50         : __pReverseSource(NULL)
51         , __pMutex(NULL)
52         , __pListener(NULL)
53 {
54         __messageBuffer[0] = '\0';
55 }
56
57 IpcClient::~IpcClient(void)
58 {
59         int fd = 0;
60
61         if (__pReverseSource != NULL)
62         {
63                 g_source_destroy(__pReverseSource);
64                 g_source_unref(__pReverseSource);
65                 __pReverseSource = NULL;
66         }
67
68         while (__fds.size() > 0)
69         {
70                 fd = __fds.back();
71                 __fds.pop_back();
72
73                 close(fd);
74         }
75
76         pthread_mutex_destroy(__pMutex);
77 }
78
79 int
80 IpcClient::Construct(const string& serverName, const IIpcClientEventListener* pListener)
81 {
82         __name = serverName;
83         __pListener = const_cast <IIpcClientEventListener*>(pListener);
84
85         pthread_mutex_t* pMutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
86         if (pMutex == NULL)
87         {
88                 return -2;
89         }
90
91         pthread_mutex_init(pMutex, NULL);
92
93         __pMutex = pMutex;
94
95         int ret = MakeConnection();
96         if (ret != 0)
97         {
98                 return ret;
99         }
100
101         if (__pListener)
102         {
103                 ret = MakeConnection(true);
104                 if (ret != 0)
105                 {
106                         return ret;
107                 }
108         }
109
110         return 0;
111
112 }
113
114 string
115 IpcClient::GetName(void) const
116 {
117         return __name;
118 }
119
120 struct HelloMessage
121 {
122         int pid;
123         int reverse;
124 };
125
126 int
127 IpcClient::MakeConnection(bool forReverse)
128 {
129         int ret = 0;
130
131         size_t socketNameLength = 0;
132         string socketName;
133
134         socketName.append("/tmp/");
135         socketName.append(__name);
136
137         socketNameLength = socketName.size() + 1;
138
139         HelloMessage helloMessage = {0, 0};
140
141         helloMessage.pid = getpid();
142         if (forReverse)
143         {
144                 helloMessage.reverse = 1;
145         }
146         else
147         {
148                 helloMessage.reverse = 0;
149         }
150
151         struct sockaddr_un server;
152
153         bzero(&server, sizeof(server));
154         server.sun_family = AF_UNIX;
155         strncpy(server.sun_path, socketName.c_str(), socketNameLength);
156         socklen_t serverLen = sizeof(server);
157
158         int client = socket(AF_UNIX, SOCK_STREAM, 0);
159         if (client < 0)
160         {
161                 _LOGE("Failed to create a socket : %s.", strerror(errno));
162                 return -1;
163         }
164
165         int flags = fcntl(client, F_GETFL, 0);
166         ret = fcntl(client, F_SETFL, flags | O_NONBLOCK);
167         if (ret != 0)
168         {
169                 _LOGE("Failed to set file status flags (%d, %s).", errno, strerror(errno));
170                 goto CATCH;
171         }
172
173         ret = connect(client, (struct sockaddr*) &server, serverLen);
174         if (ret != 0)
175         {
176                 if (errno != EINPROGRESS)
177                 {
178                         _LOGE("Failed to connect to server(%s) : %s", socketName.c_str(), strerror(errno));
179                         goto CATCH;
180                 }
181
182                 fd_set rset;
183                 fd_set wset;
184                 struct timeval timeout;
185                 int length = 0;
186                 int error = 0;
187                 socklen_t socketLength = 0;
188
189                 FD_ZERO(&rset);
190                 FD_SET(client, &rset);
191                 wset = rset;
192                 timeout.tv_sec = 10;
193                 timeout.tv_usec = 0;
194
195                 while (true)
196                 {
197                         ret = select(client+1, &rset, &wset, NULL, &timeout);
198                         if (ret < 0)
199                         {
200                                 _LOGE("Failed to connect due to system error : %s.", strerror(errno));
201                                 if (errno != EINTR)
202                                 {
203                                         goto CATCH;
204                                 }
205
206                                 continue;
207                         }
208                         else if (ret == 0)
209                         {
210                                 _LOGE("Failed to connect due to timeout.");
211                                 goto CATCH;
212                         }
213                         else
214                         {
215                                 break;
216                         }
217                 }
218
219                 if (FD_ISSET(client, &rset) || FD_ISSET(client, &wset))
220                 {
221                         length = sizeof(error);
222                         ret = getsockopt(client, SOL_SOCKET, SO_ERROR, &error, &socketLength);
223                         if (ret < 0)
224                         {
225                                 _LOGE("Failed to connect to server(%s) : %s", socketName.c_str(), strerror(errno));
226                                 goto CATCH;
227                         }
228                 }
229                 else
230                 {
231                         _LOGE("Failed to connect due to system error.");
232                         goto CATCH;
233                 }
234         }
235
236         ret = fcntl(client, F_SETFL, flags);
237         if (ret < 0)
238         {
239                 _LOGE("Failed to set file status flags (%d, %s).", errno, strerror(errno));
240                 goto CATCH;
241         }
242
243         ret = write(client, &helloMessage, sizeof(helloMessage));
244         if (ret < 0)
245         {
246                 goto CATCH;
247         }
248
249         if (forReverse)
250         {
251                 GError* pGError = NULL;
252                 GSource* pGSource = NULL;
253
254                 GIOChannel* pChannel = g_io_channel_unix_new(client);
255                 GMainContext* pGMainContext = g_main_context_default();
256
257                 g_io_channel_set_encoding(pChannel, NULL, &pGError);
258                 g_io_channel_set_flags(pChannel, G_IO_FLAG_NONBLOCK, &pGError);
259                 g_io_channel_set_close_on_unref(pChannel, TRUE);
260
261                 pGSource = g_io_create_watch(pChannel, (GIOCondition) (G_IO_IN | G_IO_ERR | G_IO_NVAL | G_IO_HUP));
262                 g_source_set_callback(pGSource, (GSourceFunc) OnReadMessage, this, NULL);
263                 g_source_attach(pGSource, pGMainContext);
264
265                 g_io_channel_unref(pChannel);
266                 __pReverseSource = pGSource;
267         }
268         else
269         {
270                 ReleaseFd(client);
271         }
272
273         return 0;
274
275 CATCH:
276         if (client != -1)
277         {
278                 close(client);
279         }
280
281         return -1;
282
283 }
284
285 gboolean
286 IpcClient::OnReadMessage(GIOChannel* source, GIOCondition condition, gpointer data)
287 {
288         IpcClient* pIpcClient = (IpcClient*) data;
289
290         return pIpcClient->HandleReceivedMessage(source, condition);
291 }
292
293 gboolean
294 IpcClient::HandleReceivedMessage(GIOChannel* source, GIOCondition condition)
295 {
296         GError* pGError = NULL;
297         GIOStatus status;
298         IPC::Message* pMessage = NULL;
299
300         if (condition & G_IO_HUP)
301         {
302                 _LOGD("G_IO_HUP, the connection is closed.");
303
304                 g_source_destroy(__pReverseSource);
305                 g_source_unref(__pReverseSource);
306                 __pReverseSource = NULL;
307
308                 if (__pListener)
309                 {
310                         __pListener->OnIpcServerDisconnected(*this);
311                 }
312
313                 return FALSE;
314         }
315         else if (condition & G_IO_IN)
316         {
317                 gsize readSize = 0;
318                 const char* pStart = NULL;
319                 const char* pEnd = NULL;
320                 const char* pEndOfMessage = NULL;
321
322                 while (true)
323                 {
324                         pGError = NULL;
325                         status = g_io_channel_read_chars(source, (char*) __messageBuffer, __MAX_MESSAGE_BUFFER_SIZE, &readSize, &pGError);
326                         if (status == G_IO_STATUS_EOF || status == G_IO_STATUS_ERROR)
327                         {
328                                 if (status == G_IO_STATUS_EOF)
329                                 {
330                                         _LOGD("G_IO_STATUS_EOF, the connection is closed.");
331                                 }
332                                 else
333                                 {
334                                         _LOGD("G_IO_STATUS_ERROR, the connection is closed.");
335                                 }
336
337                                 pGError = NULL;
338
339                                 g_io_channel_shutdown(source, FALSE, &pGError);
340
341                                 g_source_destroy(__pReverseSource);
342                                 g_source_unref(__pReverseSource);
343                                 __pReverseSource = NULL;
344
345                                 if (__pListener)
346                                 {
347                                         __pListener->OnIpcServerDisconnected(*this);
348                                 }
349
350                                 return FALSE;
351                         }
352
353                         if (readSize == 0)
354                         {
355                                 break;
356                         }
357
358                         if (__pending.empty())
359                         {
360                                 pStart = __messageBuffer;
361                                 pEnd = pStart + readSize;
362                         }
363                         else
364                         {
365                                 __pending.append(__messageBuffer, readSize);
366                                 pStart = __pending.data();
367                                 pEnd = pStart + __pending.size();
368                         }
369
370                         while (true)
371                         {
372                                 pEndOfMessage = IPC::Message::FindNext(pStart, pEnd);
373                                 if (pEndOfMessage == NULL)
374                                 {
375                                         __pending.assign(pStart, pEnd - pStart);
376                                         break;
377                                 }
378
379                                 pMessage = new (std::nothrow) IPC::Message(pStart, pEndOfMessage - pStart);
380                                 if (pMessage == NULL)
381                                 {
382                                         _LOGE("The memory is insufficient");
383                                         return -2;
384                                 }
385
386                                 if (__pListener)
387                                 {
388                                         __pListener->OnIpcResponseReceived(*this, *pMessage);
389                                 }
390
391                                 delete pMessage;
392
393                                 pStart = pEndOfMessage;
394                         }
395                 }
396         }
397         else
398         {
399                 // empty statement.
400         }
401
402         return TRUE;
403 }
404
405 int
406 IpcClient::AcquireFd(void)
407 {
408         int ret = 0;
409         int fd = -1;
410
411         while (fd == -1)
412         {
413                 pthread_mutex_lock(__pMutex);
414                 if (__fds.size() == 0)
415                 {
416                         pthread_mutex_unlock(__pMutex);
417                         ret = MakeConnection(false);
418                         if (ret < 0)
419                         {
420                                 _LOGE("Failed to connect to the server.");
421                                 return -1;
422                         }
423
424                         continue;
425                 }
426
427                 fd = __fds.back();
428                 __fds.pop_back();
429
430                 pthread_mutex_unlock(__pMutex);
431         }
432
433         return fd;
434 }
435
436 void
437 IpcClient::ReleaseFd(int fd)
438 {
439         pthread_mutex_lock(__pMutex);
440
441         __fds.push_back(fd);
442
443         pthread_mutex_unlock(__pMutex);
444 }
445
446 int
447 IpcClient::SendAsync(IPC::Message* pMessage)
448 {
449         char* pData = (char*) pMessage->data();
450         int remain = pMessage->size();
451         int fd = AcquireFd();
452         if (fd == -1)
453         {
454                 _LOGE("Failed to get fd.");
455                 return -1;
456         }
457
458         int written = 0;
459         while (remain > 0)
460         {
461                 written = write(fd, (char*) pData, remain);
462                 remain -= written;
463                 pData += written;
464         }
465
466         ReleaseFd(fd);
467
468         return 0;
469 }
470
471 int
472 IpcClient::SendSync(IPC::Message* pMessage)
473 {
474         IPC::Message* pReply = NULL;
475         MessageReplyDeserializer* pReplyDeserializer = NULL;
476         IPC::SyncMessage* pSyncMessage = dynamic_cast <IPC::SyncMessage*>(pMessage);
477         if (pSyncMessage == NULL)
478         {
479                 _LOGE("pMessage is not a sync message.");
480                 return -1;
481         }
482
483         int messageId = SyncMessage::GetMessageId(*pSyncMessage);
484
485         int fd = AcquireFd();
486         if (fd < 0)
487         {
488                 _LOGE("Failed to get fd.");
489                 return -1;
490         }
491
492         char* pData = (char*) pSyncMessage->data();
493         int remain = pSyncMessage->size();
494         int written = 0;
495
496         while (remain > 0)
497         {
498                 written = write(fd, (char*) pData, remain);
499                 remain -= written;
500                 pData += written;
501         }
502
503         // Wait reply
504         struct pollfd pfd;
505
506         pfd.fd = fd;
507         pfd.events = POLLIN | POLLRDHUP;
508         pfd.revents = 0;
509
510         char buffer[1024];
511         std::string message;
512         int readSize = 0;
513         char* pEndOfMessage = NULL;
514
515         while (true)
516         {
517                 poll(&pfd, 1, -1);
518
519                 if (pfd.revents & POLLRDHUP)
520                 {
521                         return -1;
522                 }
523
524                 if (pfd.revents & POLLIN)
525                 {
526                         readSize = read(fd, buffer, 1024);
527                 }
528
529                 message.append(buffer, readSize);
530
531                 pEndOfMessage = (char*) IPC::Message::FindNext(message.data(), message.data() + message.size());
532                 if (pEndOfMessage)
533                 {
534                         pReply = new (std::nothrow) IPC::Message(message.data(), pEndOfMessage - message.data());
535                         if (pReply == NULL)
536                         {
537                                 _LOGE("The memory is insufficient.");
538                                 return -2;
539                         }
540
541                         break;
542                 }
543         }
544
545         pReplyDeserializer = pSyncMessage->GetReplyDeserializer();
546         pReplyDeserializer->SerializeOutputParameters(*pReply);
547
548         delete pReply;
549         delete pReplyDeserializer;
550
551         ReleaseFd(fd);
552
553         return 0;
554 }
555
556 int
557 IpcClient::Send(IPC::Message* pMessage)
558 {
559         int ret = 0;
560
561         if (pMessage->is_sync())
562         {
563                 ret = SendSync(pMessage);
564         }
565         else
566         {
567                 ret = SendAsync(pMessage);
568         }
569
570         return ret;
571 }
572
573 int
574 IpcClient::SendRequest(IPC::Message* pMessage)
575 {
576         return Send(pMessage);
577 }
578
579 int
580 IpcClient::SendRequest(const IPC::Message& message)
581 {
582         int ret = 0;
583
584         if (message.is_sync())
585         {
586                 ret = SendSync(const_cast<IPC::Message*>(&message));
587         }
588         else
589         {
590                 ret = SendAsync(const_cast<IPC::Message*>(&message));
591         }
592
593         return ret;
594 }
595