9c26133efdfa896150117059e235410695471b3d
[profile/ivi/ecore.git] / src / lib / ecore / ecore_pipe.c
1 #ifdef HAVE_CONFIG_H
2 # include <config.h>
3 #endif
4
5 #include <stdlib.h>
6 #include <stdio.h>
7 #include <errno.h>
8
9 #ifdef HAVE_EVIL
10 # include <Evil.h>
11 #endif
12
13 #include "Ecore.h"
14 #include "ecore_private.h"
15
16 /* How of then we should retry to write to the pipe */
17 #define ECORE_PIPE_WRITE_RETRY 6
18
19 /*
20  * On Windows, pipe() is implemented with sockets.
21  * Contrary to Linux, Windows uses different functions
22  * for sockets and fd's: write() is for fd's and send
23  * is for sockets. So I need to put some win32 code
24  * here. I can't think of a solution where the win32
25  * code is in Evil and not here.
26  */
27
28 #ifdef _WIN32
29
30 # include <winsock2.h>
31
32 # define pipe_write(fd, buffer, size) send((fd), (char *)(buffer), size, 0)
33 # define pipe_read(fd, buffer, size)  recv((fd), (char *)(buffer), size, 0)
34 # define pipe_close(fd)               closesocket(fd)
35 # define PIPE_FD_INVALID              INVALID_SOCKET
36 # define PIPE_FD_ERROR                SOCKET_ERROR
37
38 #else
39
40 # include <unistd.h>
41 # include <fcntl.h>
42
43 # define pipe_write(fd, buffer, size) write((fd), buffer, size)
44 # define pipe_read(fd, buffer, size)  read((fd), buffer, size)
45 # define pipe_close(fd)               close(fd)
46 # define PIPE_FD_INVALID              -1
47 # define PIPE_FD_ERROR                -1
48
49 #endif /* ! _WIN32 */
50
51 struct _Ecore_Pipe
52 {
53    ECORE_MAGIC;
54    int               fd_read;
55    int               fd_write;
56    Ecore_Fd_Handler *fd_handler;
57    const void       *data;
58    Ecore_Pipe_Cb     handler;
59    unsigned int      len;
60    size_t            already_read;
61    void             *passed_data;
62 };
63
64
65 static Eina_Bool _ecore_pipe_read(void *data, Ecore_Fd_Handler *fd_handler);
66
67 /**
68  * @defgroup Ecore_Pipe_Group Pipe wrapper
69  *
70  * These functions wrap the pipe / write / read functions to
71  * easily integrate a loop that is in its own thread to the ecore
72  * main loop.
73  *
74  * The ecore_pipe_add() function creates file descriptors (sockets on
75  * Windows) and attach an handle to the ecore main loop. That handle is
76  * called when data is read in the pipe. To write data in the pipe,
77  * just call ecore_pipe_write(). When you are done, just call
78  * ecore_pipe_del().
79  *
80  * Here is an example that uses the pipe wrapper with a Gstreamer
81  * pipeline. For each decoded frame in the Gstreamer thread, a handle
82  * is called in the ecore thread.
83  *
84  * @code#include <gst/gst.h>
85  * #include <Ecore.h>
86  *
87  * static int nbr = 0;
88  *
89  * static GstElement *_buid_pipeline (gchar *filename, Ecore_Pipe *pipe);
90  *
91  * static void new_decoded_pad_cb (GstElement *demuxer,
92  *                                 GstPad     *new_pad,
93  *                                 gpointer    user_data);
94  *
95  * static void handler(void *data, void *buf, unsigned int len)
96  * {
97  *   GstBuffer  *buffer = *((GstBuffer **)buf);
98  *
99  *   printf ("handler : %p\n", buffer);
100  *   printf ("frame  : %d %p %lld %p\n", nbr++, data, (long long)GST_BUFFER_DURATION(buffer), buffer);
101  *   gst_buffer_unref (buffer);
102  * }
103  *
104  *
105  * static void handoff (GstElement* object,
106  *                      GstBuffer* arg0,
107  *                      GstPad* arg1,
108  *                      gpointer user_data)
109  * {
110  *   Ecore_Pipe *pipe;
111  *
112  *   pipe = (Ecore_Pipe *)user_data;
113  *   printf ("handoff : %p\n", arg0);
114  *   gst_buffer_ref (arg0);
115  *   ecore_pipe_write(pipe, &arg0, sizeof(arg0));
116  * }
117  *
118  * int
119  * main (int argc, char *argv[])
120  * {
121  *   GstElement *pipeline;
122  *   char *filename;
123  *   Ecore_Pipe *pipe;
124  *
125  *   gst_init (&argc, &argv);
126  *
127  *   if (!ecore_init ())
128  *     {
129  *       gst_deinit ();
130  *       return 0;
131  *     }
132  *
133  *   pipe = ecore_pipe_add (handler);
134  *   if (!pipe)
135  *     {
136  *       ecore_shutdown ();
137  *       gst_deinit ();
138  *       return 0;
139  *     }
140  *
141  *   if (argc < 2) {
142  *     g_print ("usage: %s file.avi\n", argv[0]);
143  *     ecore_pipe_del (pipe);
144  *     ecore_shutdown ();
145  *     gst_deinit ();
146  *     return 0;
147  *   }
148  *   filename = argv[1];
149  *
150  *   pipeline = _buid_pipeline (filename, pipe);
151  *   if (!pipeline) {
152  *     g_print ("Error during the pipeline building\n");
153  *     ecore_pipe_del (pipe);
154  *     ecore_shutdown ();
155  *     gst_deinit ();
156  *     return -1;
157  *   }
158  *
159  *   gst_element_set_state (pipeline, GST_STATE_PLAYING);
160  *
161  *   ecore_main_loop_begin();
162  *
163  *   ecore_pipe_del (pipe);
164  *   ecore_shutdown ();
165  *   gst_deinit ();
166  *
167  *   return 0;
168  * }
169  *
170  * static void
171  * new_decoded_pad_cb (GstElement *demuxer,
172  *                     GstPad     *new_pad,
173  *                     gpointer    user_data)
174  * {
175  *   GstElement *decoder;
176  *   GstPad *pad;
177  *   GstCaps *caps;
178  *   gchar *str;
179  *
180  *   caps = gst_pad_get_caps (new_pad);
181  *   str = gst_caps_to_string (caps);
182  *
183  *   if (g_str_has_prefix (str, "video/")) {
184  *     decoder = GST_ELEMENT (user_data);
185  *
186  *     pad = gst_element_get_pad (decoder, "sink");
187  *     if (GST_PAD_LINK_FAILED (gst_pad_link (new_pad, pad))) {
188  *       g_warning ("Failed to link %s:%s to %s:%s", GST_DEBUG_PAD_NAME (new_pad),
189  *                  GST_DEBUG_PAD_NAME (pad));
190  *     }
191  *   }
192  *   g_free (str);
193  *   gst_caps_unref (caps);
194  * }
195  *
196  * static GstElement *
197  * _buid_pipeline (gchar *filename, Ecore_Pipe *pipe)
198  * {
199  *   GstElement          *pipeline;
200  *   GstElement          *filesrc;
201  *   GstElement          *demuxer;
202  *   GstElement          *decoder;
203  *   GstElement          *sink;
204   GstStateChangeReturn res;
205  *
206  *   pipeline = gst_pipeline_new ("pipeline");
207  *   if (!pipeline)
208  *     return NULL;
209  *
210  *   filesrc = gst_element_factory_make ("filesrc", "filesrc");
211  *   if (!filesrc) {
212  *     printf ("no filesrc");
213  *     goto failure;
214  *   }
215  *   g_object_set (G_OBJECT (filesrc), "location", filename, NULL);
216  *
217  *   demuxer = gst_element_factory_make ("oggdemux", "demuxer");
218  *   if (!demuxer) {
219  *     printf ("no demux");
220  *     goto failure;
221  *   }
222  *
223  *   decoder = gst_element_factory_make ("theoradec", "decoder");
224  *   if (!decoder) {
225  *     printf ("no dec");
226  *     goto failure;
227  *   }
228  *
229  *   g_signal_connect (demuxer, "pad-added",
230  *                     G_CALLBACK (new_decoded_pad_cb), decoder);
231  *
232  *   sink = gst_element_factory_make ("fakesink", "sink");
233  *   if (!sink) {
234  *     printf ("no sink");
235  *     goto failure;
236  *   }
237  *   g_object_set (G_OBJECT (sink), "sync", EINA_TRUE, NULL);
238  *   g_object_set (G_OBJECT (sink), "signal-handoffs", EINA_TRUE, NULL);
239  *   g_signal_connect (sink, "handoff",
240  *                     G_CALLBACK (handoff), pipe);
241  *
242  *   gst_bin_add_many (GST_BIN (pipeline),
243  *                     filesrc, demuxer, decoder, sink, NULL);
244  *
245  *   if (!gst_element_link (filesrc, demuxer))
246  *     goto failure;
247  *   if (!gst_element_link (decoder, sink))
248  *     goto failure;
249  *
250  *   res = gst_element_set_state (pipeline, GST_STATE_PAUSED);
251  *   if (res == GST_STATE_CHANGE_FAILURE)
252  *     goto failure;
253  *
254  *   res = gst_element_get_state( pipeline, NULL, NULL, GST_CLOCK_TIME_NONE );
255  *   if (res != GST_STATE_CHANGE_SUCCESS)
256  *     goto failure;
257  *
258  *   return pipeline;
259  *
260  *  failure:
261  *   gst_object_unref (GST_OBJECT (pipeline));
262  *   return NULL;
263  * }
264  * @endcode
265  */
266
267
268 /**
269  * Create two file descriptors (sockets on Windows). Add
270  * a callback that will be called when the file descriptor that
271  * is listened receives data. An event is also put in the event
272  * queue when data is received.
273  *
274  * @param handler The handler called when data is received.
275  * @param data    Data to pass to @p handler when it is called.
276  * @return        A newly created Ecore_Pipe object if successful.
277  *                @c NULL otherwise.
278  * @ingroup Ecore_Pipe_Group
279  */
280 EAPI Ecore_Pipe *
281 ecore_pipe_add(Ecore_Pipe_Cb handler, const void *data)
282 {
283    Ecore_Pipe *p;
284    int         fds[2];
285
286    if (!handler) return NULL;
287
288    p = (Ecore_Pipe *)calloc(1, sizeof(Ecore_Pipe));
289    if (!p) return NULL;
290
291    if (pipe(fds))
292      {
293         free(p);
294         return NULL;
295      }
296
297    ECORE_MAGIC_SET(p, ECORE_MAGIC_PIPE);
298    p->fd_read = fds[0];
299    p->fd_write = fds[1];
300    p->handler = handler;
301    p->data = data;
302
303    fcntl(p->fd_read, F_SETFL, O_NONBLOCK);
304    p->fd_handler = ecore_main_fd_handler_add(p->fd_read,
305                                           ECORE_FD_READ,
306                                           _ecore_pipe_read,
307                                           p,
308                                           NULL, NULL);
309    return p;
310 }
311
312 /**
313  * Free an Ecore_Pipe object created with ecore_pipe_add().
314  *
315  * @param p The Ecore_Pipe object to be freed.
316  * @return The pointer to the private data
317  * @ingroup Ecore_Pipe_Group
318  */
319 EAPI void *
320 ecore_pipe_del(Ecore_Pipe *p)
321 {
322    void *data;
323
324    if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
325      {
326         ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_del");
327         return NULL;
328      }
329    if (p->fd_handler) ecore_main_fd_handler_del(p->fd_handler);
330    if (p->fd_read != PIPE_FD_INVALID) pipe_close(p->fd_read);
331    if (p->fd_write != PIPE_FD_INVALID) pipe_close(p->fd_write);
332    data = (void *)p->data;
333    free(p);
334    return data;
335 }
336
337 /**
338  * Close the read end of an Ecore_Pipe object created with ecore_pipe_add().
339  *
340  * @param p The Ecore_Pipe object.
341  * @ingroup Ecore_Pipe_Group
342  */
343 EAPI void
344 ecore_pipe_read_close(Ecore_Pipe *p)
345 {
346    if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
347      {
348         ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_read_close");
349         return;
350      }
351    ecore_main_fd_handler_del(p->fd_handler);
352    p->fd_handler = NULL;
353    pipe_close(p->fd_read);
354    p->fd_read = PIPE_FD_INVALID;
355 }
356
357 /**
358  * Close the write end of an Ecore_Pipe object created with ecore_pipe_add().
359  *
360  * @param p The Ecore_Pipe object.
361  * @ingroup Ecore_Pipe_Group
362  */
363 EAPI void
364 ecore_pipe_write_close(Ecore_Pipe *p)
365 {
366    if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
367      {
368         ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_write_close");
369         return;
370      }
371    pipe_close(p->fd_write);
372    p->fd_write = PIPE_FD_INVALID;
373 }
374
375 /**
376  * Write on the file descriptor the data passed as parameter.
377  *
378  * @param p      The Ecore_Pipe object.
379  * @param buffer The data to write into the pipe.
380  * @param nbytes The size of the @p buffer in bytes
381  * @return       Returns EINA_TRUE on a successful write, EINA_FALSE on an error
382  * @ingroup Ecore_Pipe_Group
383  */
384 EAPI Eina_Bool
385 ecore_pipe_write(Ecore_Pipe *p, const void *buffer, unsigned int nbytes)
386 {
387    ssize_t ret;
388    size_t  already_written = 0;
389    int     retry = ECORE_PIPE_WRITE_RETRY;
390
391    if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
392      {
393         ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_write");
394         return EINA_FALSE;
395      }
396
397    if (p->fd_write == PIPE_FD_INVALID) return EINA_FALSE;
398
399    /* First write the len into the pipe */
400    do
401      {
402         ret = pipe_write(p->fd_write, &nbytes, sizeof(nbytes));
403         if (ret == sizeof(nbytes))
404           {
405              retry = ECORE_PIPE_WRITE_RETRY;
406              break;
407           }
408         else if (ret > 0)
409           {
410              /* XXX What should we do here? */
411              ERR("The length of the data was not written complete"
412                  " to the pipe");
413              return EINA_FALSE;
414           }
415         else if (ret == PIPE_FD_ERROR && errno == EPIPE)
416           {
417              pipe_close(p->fd_write);
418              p->fd_write = PIPE_FD_INVALID;
419              return EINA_FALSE;
420           }
421         else if (ret == PIPE_FD_ERROR && errno == EINTR)
422           /* try it again */
423           ;
424         else
425           {
426              ERR("An unhandled error (ret: %zd errno: %d)"
427                  "occured while writing to the pipe the length",
428                  ret, errno);
429           }
430      }
431    while (retry--);
432
433    if (retry != ECORE_PIPE_WRITE_RETRY) return EINA_FALSE;
434
435    /* and now pass the data to the pipe */
436    do
437      {
438         ret = pipe_write(p->fd_write,
439                          ((unsigned char *)buffer) + already_written,
440                          nbytes - already_written);
441         
442         if (ret == (ssize_t)(nbytes - already_written))
443           return EINA_TRUE;
444         else if (ret >= 0)
445           {
446              already_written -= ret;
447              continue;
448           }
449         else if (ret == PIPE_FD_ERROR && errno == EPIPE)
450           {
451              pipe_close(p->fd_write);
452              p->fd_write = PIPE_FD_INVALID;
453              return EINA_FALSE;
454           }
455         else if (ret == PIPE_FD_ERROR && errno == EINTR)
456           /* try it again */
457           ;
458         else
459           {
460              ERR("An unhandled error (ret: %zd errno: %d)"
461                  "occured while writing to the pipe the length",
462                  ret, errno);
463           }
464      }
465    while (retry--);
466
467    return EINA_FALSE;
468 }
469
470 /* Private function */
471
472 static Eina_Bool
473 _ecore_pipe_read(void *data, Ecore_Fd_Handler *fd_handler __UNUSED__)
474 {
475    Ecore_Pipe  *p;
476    double       start_time;
477
478    p = (Ecore_Pipe *)data;
479    start_time = ecore_time_get();
480
481    do
482      {
483         ssize_t       ret;
484
485         /* if we already have read some data we don't need to read the len
486          * but to finish the already started job
487          */
488         if (p->len == 0)
489           {
490              /* read the len of the passed data */
491              ret = pipe_read(p->fd_read, &p->len, sizeof(p->len));
492
493              /* catch the non error case first */
494              if (ret == sizeof(p->len))
495                ;
496              else if (ret > 0)
497                {
498                   /* XXX What should we do here? */
499                   ERR("Only read %zd bytes from the pipe, although"
500                       " we need to read %zd bytes.", ret, sizeof(p->len));
501                }
502              else if (ret == 0)
503                {
504                   p->handler((void *)p->data, NULL, 0);
505                   pipe_close(p->fd_read);
506                   p->fd_read = PIPE_FD_INVALID;
507                   p->fd_handler = NULL;
508                   return ECORE_CALLBACK_CANCEL;
509                }
510 #ifndef _WIN32
511              else if ((ret == PIPE_FD_ERROR) && ((errno == EINTR) || (errno == EAGAIN)))
512                return ECORE_CALLBACK_RENEW;
513              else
514                {
515                   ERR("An unhandled error (ret: %zd errno: %d)"
516                       "occured while reading from the pipe the length",
517                       ret, errno);
518                   return ECORE_CALLBACK_RENEW;
519                }
520 #else
521              else /* ret == PIPE_FD_ERROR is the only other case on Windows */
522                {
523                   if (WSAGetLastError() != WSAEWOULDBLOCK)
524                     {
525                        p->handler((void *)p->data, NULL, 0);
526                        pipe_close(p->fd_read);
527                        p->fd_read = PIPE_FD_INVALID;
528                        p->fd_handler = NULL;
529                        return ECORE_CALLBACK_CANCEL;
530                     }
531                }
532 #endif
533           }
534
535         if (!p->passed_data)
536           p->passed_data = malloc(p->len);
537
538         /* and read the passed data */
539         ret = pipe_read(p->fd_read,
540                         ((unsigned char *)p->passed_data) + p->already_read,
541                         p->len - p->already_read);
542         
543         /* catch the non error case first */
544         if (ret == (ssize_t)(p->len - p->already_read))
545           {
546              p->handler((void *)p->data, p->passed_data, p->len);
547              free(p->passed_data);
548              /* reset all values to 0 */
549              p->passed_data = NULL;
550              p->already_read = 0;
551              p->len = 0;
552           }
553         else if (ret >= 0)
554           {
555              p->already_read += ret;
556              return ECORE_CALLBACK_RENEW;
557           }
558         else if (ret == 0)
559           {
560              p->handler((void *)p->data, NULL, 0);
561              pipe_close(p->fd_read);
562              p->fd_read = PIPE_FD_INVALID;
563              p->fd_handler = NULL;
564              return ECORE_CALLBACK_CANCEL;
565           }
566 #ifndef _WIN32
567         else if (ret == PIPE_FD_ERROR && (errno == EINTR || errno == EAGAIN))
568           return ECORE_CALLBACK_RENEW;
569         else
570           {
571              ERR("An unhandled error (ret: %zd errno: %d)"
572                  "occured while reading from the pipe the data",
573                  ret, errno);
574              return ECORE_CALLBACK_RENEW;
575           }
576 #else
577         else /* ret == PIPE_FD_ERROR is the only other case on Windows */
578           {
579              if (WSAGetLastError() != WSAEWOULDBLOCK)
580                {
581                   p->handler((void *)p->data, NULL, 0);
582                   pipe_close(p->fd_read);
583                   p->fd_read = PIPE_FD_INVALID;
584                   p->fd_handler = NULL;
585                   return ECORE_CALLBACK_CANCEL;
586                }
587              else
588                break;
589           }
590 #endif
591      }
592    while (ecore_time_get() - start_time < ecore_animator_frametime_get());
593    
594    return ECORE_CALLBACK_RENEW;
595 }