[ecore] merged svn latest code (svn54830)
[profile/ivi/ecore.git] / src / lib / ecore / ecore_pipe.c
1 #ifdef HAVE_CONFIG_H
2 # include <config.h>
3 #endif
4
5 #include <stdlib.h>
6 #include <stdio.h>
7 #include <errno.h>
8
9 #ifdef HAVE_EVIL
10 # include <Evil.h>
11 #endif
12
13 #include "Ecore.h"
14 #include "ecore_private.h"
15
16 /* How of then we should retry to write to the pipe */
17 #define ECORE_PIPE_WRITE_RETRY 6
18
19 /*
20  * On Windows, pipe() is implemented with sockets.
21  * Contrary to Linux, Windows uses different functions
22  * for sockets and fd's: write() is for fd's and send
23  * is for sockets. So I need to put some win32 code
24  * here. I can't think of a solution where the win32
25  * code is in Evil and not here.
26  */
27
28 #ifdef _WIN32
29
30 # include <winsock2.h>
31
32 # define pipe_write(fd, buffer, size) send((fd), (char *)(buffer), size, 0)
33 # define pipe_read(fd, buffer, size)  recv((fd), (char *)(buffer), size, 0)
34 # define pipe_close(fd)               closesocket(fd)
35 # define PIPE_FD_INVALID              INVALID_SOCKET
36 # define PIPE_FD_ERROR                SOCKET_ERROR
37
38 #else
39
40 # include <unistd.h>
41 # include <fcntl.h>
42
43 # define pipe_write(fd, buffer, size) write((fd), buffer, size)
44 # define pipe_read(fd, buffer, size)  read((fd), buffer, size)
45 # define pipe_close(fd)               close(fd)
46 # define PIPE_FD_INVALID              -1
47 # define PIPE_FD_ERROR                -1
48
49 #endif /* ! _WIN32 */
50
51 struct _Ecore_Pipe
52 {
53    ECORE_MAGIC;
54    int               fd_read;
55    int               fd_write;
56    Ecore_Fd_Handler *fd_handler;
57    const void       *data;
58    Ecore_Pipe_Cb     handler;
59    unsigned int      len;
60    int               handling;
61    size_t            already_read;
62    void             *passed_data;
63    Eina_Bool         delete_me : 1;
64 };
65
66
67 static Eina_Bool _ecore_pipe_read(void *data, Ecore_Fd_Handler *fd_handler);
68
69 /**
70  * @addtogroup Ecore_Group Ecore - Main Loop and Job Functions.
71  *
72  * @{
73  */
74
75 /**
76  * @addtogroup Ecore_Pipe_Group Pipe wrapper
77  *
78  * These functions wrap the pipe / write / read functions to
79  * easily integrate a loop that is in its own thread to the ecore
80  * main loop.
81  *
82  * The ecore_pipe_add() function creates file descriptors (sockets on
83  * Windows) and attach an handle to the ecore main loop. That handle is
84  * called when data is read in the pipe. To write data in the pipe,
85  * just call ecore_pipe_write(). When you are done, just call
86  * ecore_pipe_del().
87  *
88  * Here is an example that uses the pipe wrapper with a Gstreamer
89  * pipeline. For each decoded frame in the Gstreamer thread, a handle
90  * is called in the ecore thread.
91  *
92  * @code#include <gst/gst.h>
93  * #include <Ecore.h>
94  *
95  * static int nbr = 0;
96  *
97  * static GstElement *_buid_pipeline (gchar *filename, Ecore_Pipe *pipe);
98  *
99  * static void new_decoded_pad_cb (GstElement *demuxer,
100  *                                 GstPad     *new_pad,
101  *                                 gpointer    user_data);
102  *
103  * static void handler(void *data, void *buf, unsigned int len)
104  * {
105  *   GstBuffer  *buffer = *((GstBuffer **)buf);
106  *
107  *   printf ("handler : %p\n", buffer);
108  *   printf ("frame  : %d %p %lld %p\n", nbr++, data, (long long)GST_BUFFER_DURATION(buffer), buffer);
109  *   gst_buffer_unref (buffer);
110  * }
111  *
112  *
113  * static void handoff (GstElement* object,
114  *                      GstBuffer* arg0,
115  *                      GstPad* arg1,
116  *                      gpointer user_data)
117  * {
118  *   Ecore_Pipe *pipe;
119  *
120  *   pipe = (Ecore_Pipe *)user_data;
121  *   printf ("handoff : %p\n", arg0);
122  *   gst_buffer_ref (arg0);
123  *   ecore_pipe_write(pipe, &arg0, sizeof(arg0));
124  * }
125  *
126  * int
127  * main (int argc, char *argv[])
128  * {
129  *   GstElement *pipeline;
130  *   char *filename;
131  *   Ecore_Pipe *pipe;
132  *
133  *   gst_init (&argc, &argv);
134  *
135  *   if (!ecore_init ())
136  *     {
137  *       gst_deinit ();
138  *       return 0;
139  *     }
140  *
141  *   pipe = ecore_pipe_add (handler);
142  *   if (!pipe)
143  *     {
144  *       ecore_shutdown ();
145  *       gst_deinit ();
146  *       return 0;
147  *     }
148  *
149  *   if (argc < 2) {
150  *     g_print ("usage: %s file.avi\n", argv[0]);
151  *     ecore_pipe_del (pipe);
152  *     ecore_shutdown ();
153  *     gst_deinit ();
154  *     return 0;
155  *   }
156  *   filename = argv[1];
157  *
158  *   pipeline = _buid_pipeline (filename, pipe);
159  *   if (!pipeline) {
160  *     g_print ("Error during the pipeline building\n");
161  *     ecore_pipe_del (pipe);
162  *     ecore_shutdown ();
163  *     gst_deinit ();
164  *     return -1;
165  *   }
166  *
167  *   gst_element_set_state (pipeline, GST_STATE_PLAYING);
168  *
169  *   ecore_main_loop_begin();
170  *
171  *   ecore_pipe_del (pipe);
172  *   ecore_shutdown ();
173  *   gst_deinit ();
174  *
175  *   return 0;
176  * }
177  *
178  * static void
179  * new_decoded_pad_cb (GstElement *demuxer,
180  *                     GstPad     *new_pad,
181  *                     gpointer    user_data)
182  * {
183  *   GstElement *decoder;
184  *   GstPad *pad;
185  *   GstCaps *caps;
186  *   gchar *str;
187  *
188  *   caps = gst_pad_get_caps (new_pad);
189  *   str = gst_caps_to_string (caps);
190  *
191  *   if (g_str_has_prefix (str, "video/")) {
192  *     decoder = GST_ELEMENT (user_data);
193  *
194  *     pad = gst_element_get_pad (decoder, "sink");
195  *     if (GST_PAD_LINK_FAILED (gst_pad_link (new_pad, pad))) {
196  *       g_warning ("Failed to link %s:%s to %s:%s", GST_DEBUG_PAD_NAME (new_pad),
197  *                  GST_DEBUG_PAD_NAME (pad));
198  *     }
199  *   }
200  *   g_free (str);
201  *   gst_caps_unref (caps);
202  * }
203  *
204  * static GstElement *
205  * _buid_pipeline (gchar *filename, Ecore_Pipe *pipe)
206  * {
207  *   GstElement          *pipeline;
208  *   GstElement          *filesrc;
209  *   GstElement          *demuxer;
210  *   GstElement          *decoder;
211  *   GstElement          *sink;
212   GstStateChangeReturn res;
213  *
214  *   pipeline = gst_pipeline_new ("pipeline");
215  *   if (!pipeline)
216  *     return NULL;
217  *
218  *   filesrc = gst_element_factory_make ("filesrc", "filesrc");
219  *   if (!filesrc) {
220  *     printf ("no filesrc");
221  *     goto failure;
222  *   }
223  *   g_object_set (G_OBJECT (filesrc), "location", filename, NULL);
224  *
225  *   demuxer = gst_element_factory_make ("oggdemux", "demuxer");
226  *   if (!demuxer) {
227  *     printf ("no demux");
228  *     goto failure;
229  *   }
230  *
231  *   decoder = gst_element_factory_make ("theoradec", "decoder");
232  *   if (!decoder) {
233  *     printf ("no dec");
234  *     goto failure;
235  *   }
236  *
237  *   g_signal_connect (demuxer, "pad-added",
238  *                     G_CALLBACK (new_decoded_pad_cb), decoder);
239  *
240  *   sink = gst_element_factory_make ("fakesink", "sink");
241  *   if (!sink) {
242  *     printf ("no sink");
243  *     goto failure;
244  *   }
245  *   g_object_set (G_OBJECT (sink), "sync", EINA_TRUE, NULL);
246  *   g_object_set (G_OBJECT (sink), "signal-handoffs", EINA_TRUE, NULL);
247  *   g_signal_connect (sink, "handoff",
248  *                     G_CALLBACK (handoff), pipe);
249  *
250  *   gst_bin_add_many (GST_BIN (pipeline),
251  *                     filesrc, demuxer, decoder, sink, NULL);
252  *
253  *   if (!gst_element_link (filesrc, demuxer))
254  *     goto failure;
255  *   if (!gst_element_link (decoder, sink))
256  *     goto failure;
257  *
258  *   res = gst_element_set_state (pipeline, GST_STATE_PAUSED);
259  *   if (res == GST_STATE_CHANGE_FAILURE)
260  *     goto failure;
261  *
262  *   res = gst_element_get_state( pipeline, NULL, NULL, GST_CLOCK_TIME_NONE );
263  *   if (res != GST_STATE_CHANGE_SUCCESS)
264  *     goto failure;
265  *
266  *   return pipeline;
267  *
268  *  failure:
269  *   gst_object_unref (GST_OBJECT (pipeline));
270  *   return NULL;
271  * }
272  * @endcode
273  */
274
275
276 /**
277  * Create two file descriptors (sockets on Windows). Add
278  * a callback that will be called when the file descriptor that
279  * is listened receives data. An event is also put in the event
280  * queue when data is received.
281  *
282  * @param handler The handler called when data is received.
283  * @param data    Data to pass to @p handler when it is called.
284  * @return        A newly created Ecore_Pipe object if successful.
285  *                @c NULL otherwise.
286  */
287 EAPI Ecore_Pipe *
288 ecore_pipe_add(Ecore_Pipe_Cb handler, const void *data)
289 {
290    Ecore_Pipe *p;
291    int         fds[2];
292
293    if (!handler) return NULL;
294
295    p = (Ecore_Pipe *)calloc(1, sizeof(Ecore_Pipe));
296    if (!p) return NULL;
297
298    if (pipe(fds))
299      {
300         free(p);
301         return NULL;
302      }
303
304    ECORE_MAGIC_SET(p, ECORE_MAGIC_PIPE);
305    p->fd_read = fds[0];
306    p->fd_write = fds[1];
307    p->handler = handler;
308    p->data = data;
309
310    fcntl(p->fd_read, F_SETFL, O_NONBLOCK);
311    p->fd_handler = ecore_main_fd_handler_add(p->fd_read,
312                                           ECORE_FD_READ,
313                                           _ecore_pipe_read,
314                                           p,
315                                           NULL, NULL);
316    return p;
317 }
318
319 /**
320  * Free an Ecore_Pipe object created with ecore_pipe_add().
321  *
322  * @param p The Ecore_Pipe object to be freed.
323  * @return The pointer to the private data
324  */
325 EAPI void *
326 ecore_pipe_del(Ecore_Pipe *p)
327 {
328    void *data;
329
330    if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
331      {
332         ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_del");
333         return NULL;
334      }
335    p->delete_me = EINA_TRUE;
336    if (p->handling > 0) return (void *)p->data;
337    if (p->fd_handler) ecore_main_fd_handler_del(p->fd_handler);
338    if (p->fd_read != PIPE_FD_INVALID) pipe_close(p->fd_read);
339    if (p->fd_write != PIPE_FD_INVALID) pipe_close(p->fd_write);
340    data = (void *)p->data;
341    free(p);
342    return data;
343 }
344
345 /**
346  * Close the read end of an Ecore_Pipe object created with ecore_pipe_add().
347  *
348  * @param p The Ecore_Pipe object.
349  */
350 EAPI void
351 ecore_pipe_read_close(Ecore_Pipe *p)
352 {
353    if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
354      {
355         ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_read_close");
356         return;
357      }
358    if (p->fd_handler)
359      {
360         ecore_main_fd_handler_del(p->fd_handler);
361         p->fd_handler = NULL;
362      }
363    if (p->fd_read != PIPE_FD_INVALID)
364      {
365         pipe_close(p->fd_read);
366         p->fd_read = PIPE_FD_INVALID;
367      }
368 }
369
370 /**
371  * Close the write end of an Ecore_Pipe object created with ecore_pipe_add().
372  *
373  * @param p The Ecore_Pipe object.
374  */
375 EAPI void
376 ecore_pipe_write_close(Ecore_Pipe *p)
377 {
378    if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
379      {
380         ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_write_close");
381         return;
382      }
383    if (p->fd_write != PIPE_FD_INVALID)
384      {
385         pipe_close(p->fd_write);
386         p->fd_write = PIPE_FD_INVALID;
387      }
388 }
389
390 /**
391  * Write on the file descriptor the data passed as parameter.
392  *
393  * @param p      The Ecore_Pipe object.
394  * @param buffer The data to write into the pipe.
395  * @param nbytes The size of the @p buffer in bytes
396  * @return       Returns EINA_TRUE on a successful write, EINA_FALSE on an error
397  */
398 EAPI Eina_Bool
399 ecore_pipe_write(Ecore_Pipe *p, const void *buffer, unsigned int nbytes)
400 {
401    ssize_t ret;
402    size_t  already_written = 0;
403    int     retry = ECORE_PIPE_WRITE_RETRY;
404
405    if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
406      {
407         ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_write");
408         return EINA_FALSE;
409      }
410
411    if (p->delete_me) return EINA_FALSE;
412    
413    if (p->fd_write == PIPE_FD_INVALID) return EINA_FALSE;
414
415    /* First write the len into the pipe */
416    do
417      {
418         ret = pipe_write(p->fd_write, &nbytes, sizeof(nbytes));
419         if (ret == sizeof(nbytes))
420           {
421              retry = ECORE_PIPE_WRITE_RETRY;
422              break;
423           }
424         else if (ret > 0)
425           {
426              /* XXX What should we do here? */
427              ERR("The length of the data was not written complete"
428                  " to the pipe");
429              return EINA_FALSE;
430           }
431         else if (ret == PIPE_FD_ERROR && errno == EPIPE)
432           {
433              pipe_close(p->fd_write);
434              p->fd_write = PIPE_FD_INVALID;
435              return EINA_FALSE;
436           }
437         else if (ret == PIPE_FD_ERROR && errno == EINTR)
438           /* try it again */
439           ;
440         else
441           {
442              ERR("An unhandled error (ret: %zd errno: %d)"
443                  "occurred while writing to the pipe the length",
444                  ret, errno);
445           }
446      }
447    while (retry--);
448
449    if (retry != ECORE_PIPE_WRITE_RETRY) return EINA_FALSE;
450
451    /* and now pass the data to the pipe */
452    do
453      {
454         ret = pipe_write(p->fd_write,
455                          ((unsigned char *)buffer) + already_written,
456                          nbytes - already_written);
457         
458         if (ret == (ssize_t)(nbytes - already_written))
459           return EINA_TRUE;
460         else if (ret >= 0)
461           {
462              already_written -= ret;
463              continue;
464           }
465         else if (ret == PIPE_FD_ERROR && errno == EPIPE)
466           {
467              pipe_close(p->fd_write);
468              p->fd_write = PIPE_FD_INVALID;
469              return EINA_FALSE;
470           }
471         else if (ret == PIPE_FD_ERROR && errno == EINTR)
472           /* try it again */
473           ;
474         else
475           {
476              ERR("An unhandled error (ret: %zd errno: %d)"
477                  "occurred while writing to the pipe the length",
478                  ret, errno);
479           }
480      }
481    while (retry--);
482
483    return EINA_FALSE;
484 }
485
486 /**
487  * @}
488  */
489
490 /**
491  * @}
492  */
493
494 /* Private function */
495 static void
496 _ecore_pipe_unhandle(Ecore_Pipe *p)
497 {
498    p->handling--;
499    if (p->delete_me)
500      {
501         ecore_pipe_del(p);
502      }
503 }
504
505 static Eina_Bool
506 _ecore_pipe_read(void *data, Ecore_Fd_Handler *fd_handler __UNUSED__)
507 {
508    Ecore_Pipe  *p;
509    double       start_time;
510    int          i;
511
512    p = (Ecore_Pipe *)data;
513    start_time = ecore_time_get();
514    
515    p->handling++;
516    for (i = 0; i < 16; i++)
517      {
518         ssize_t       ret;
519
520         /* if we already have read some data we don't need to read the len
521          * but to finish the already started job
522          */
523         if (p->len == 0)
524           {
525              /* read the len of the passed data */
526              ret = pipe_read(p->fd_read, &p->len, sizeof(p->len));
527
528              /* catch the non error case first */
529              if (ret == sizeof(p->len))
530                ;
531              else if (ret > 0)
532                {
533                   /* XXX What should we do here? */
534                   ERR("Only read %zd bytes from the pipe, although"
535                       " we need to read %zd bytes.", ret, sizeof(p->len));
536                   _ecore_pipe_unhandle(p);
537                   return ECORE_CALLBACK_CANCEL;
538                }
539              else if (ret == 0)
540                {
541                   p->handler((void *)p->data, NULL, 0);
542                   pipe_close(p->fd_read);
543                   p->fd_read = PIPE_FD_INVALID;
544                   p->fd_handler = NULL;
545                   _ecore_pipe_unhandle(p);
546                   return ECORE_CALLBACK_CANCEL;
547                }
548 #ifndef _WIN32
549              else if ((ret == PIPE_FD_ERROR) && ((errno == EINTR) || (errno == EAGAIN)))
550                {
551                   _ecore_pipe_unhandle(p);
552                   return ECORE_CALLBACK_RENEW;
553                }
554              else
555                {
556                   ERR("An unhandled error (ret: %zd errno: %d [%s])"
557                       "occurred while reading from the pipe the length",
558                       ret, errno, strerror(errno));
559                   _ecore_pipe_unhandle(p);
560                   return ECORE_CALLBACK_RENEW;
561                }
562 #else
563              else /* ret == PIPE_FD_ERROR is the only other case on Windows */
564                {
565                   if (WSAGetLastError() != WSAEWOULDBLOCK)
566                     {
567                        p->handler((void *)p->data, NULL, 0);
568                        pipe_close(p->fd_read);
569                        p->fd_read = PIPE_FD_INVALID;
570                        p->fd_handler = NULL;
571                        _ecore_pipe_unhandle(p);
572                        return ECORE_CALLBACK_CANCEL;
573                     }
574                }
575 #endif
576           }
577
578         if (!p->passed_data)
579           p->passed_data = malloc(p->len);
580
581         /* and read the passed data */
582         ret = pipe_read(p->fd_read,
583                         ((unsigned char *)p->passed_data) + p->already_read,
584                         p->len - p->already_read);
585         
586         /* catch the non error case first */
587         if (ret == (ssize_t)(p->len - p->already_read))
588           {
589              p->handler((void *)p->data, p->passed_data, p->len);
590              free(p->passed_data);
591              /* reset all values to 0 */
592              p->passed_data = NULL;
593              p->already_read = 0;
594              p->len = 0;
595           }
596         else if (ret >= 0)
597           {
598              p->already_read += ret;
599              _ecore_pipe_unhandle(p);
600              return ECORE_CALLBACK_RENEW;
601           }
602         else if (ret == 0)
603           {
604              p->handler((void *)p->data, NULL, 0);
605              pipe_close(p->fd_read);
606              p->fd_read = PIPE_FD_INVALID;
607              p->fd_handler = NULL;
608              _ecore_pipe_unhandle(p);
609              return ECORE_CALLBACK_CANCEL;
610           }
611 #ifndef _WIN32
612         else if (ret == PIPE_FD_ERROR && (errno == EINTR || errno == EAGAIN))
613           {
614              return ECORE_CALLBACK_RENEW;
615           }
616         else
617           {
618              ERR("An unhandled error (ret: %zd errno: %d)"
619                  "occurred while reading from the pipe the data",
620                  ret, errno);
621              _ecore_pipe_unhandle(p);
622              return ECORE_CALLBACK_RENEW;
623           }
624 #else
625         else /* ret == PIPE_FD_ERROR is the only other case on Windows */
626           {
627              if (WSAGetLastError() != WSAEWOULDBLOCK)
628                {
629                   p->handler((void *)p->data, NULL, 0);
630                   pipe_close(p->fd_read);
631                   p->fd_read = PIPE_FD_INVALID;
632                   p->fd_handler = NULL;
633                   _ecore_pipe_unhandle(p);
634                   return ECORE_CALLBACK_CANCEL;
635                }
636              else
637                break;
638           }
639 #endif
640      }
641    
642    _ecore_pipe_unhandle(p);
643    return ECORE_CALLBACK_RENEW;
644 }