svn update: 48958 (latest:48959)
[framework/uifw/ecore.git] / src / lib / ecore / ecore_pipe.c
1 /*
2  * vim:ts=8:sw=3:sts=8:noexpandtab:cino=>5n-3f0^-2{2
3  */
4
5 #ifdef HAVE_CONFIG_H
6 # include <config.h>
7 #endif
8
9 #include <stdlib.h>
10 #include <stdio.h>
11 #include <errno.h>
12
13 #ifdef HAVE_EVIL
14 # include <Evil.h>
15 #endif
16
17 #include "Ecore.h"
18 #include "ecore_private.h"
19
20 /* How of then we should retry to write to the pipe */
21 #define ECORE_PIPE_WRITE_RETRY 6
22
23 /*
24  * On Windows, pipe() is implemented with sockets.
25  * Contrary to Linux, Windows uses different functions
26  * for sockets and fd's: write() is for fd's and send
27  * is for sockets. So I need to put some win32 code
28  * here. I can't think of a solution where the win32
29  * code is in Evil and not here.
30  */
31
32 #ifdef _WIN32
33
34 # include <winsock2.h>
35
36 # define pipe_write(fd, buffer, size) send((fd), (char *)(buffer), size, 0)
37 # define pipe_read(fd, buffer, size)  recv((fd), (char *)(buffer), size, 0)
38 # define pipe_close(fd)               closesocket(fd)
39 # define PIPE_FD_INVALID              INVALID_SOCKET
40 # define PIPE_FD_ERROR                SOCKET_ERROR
41
42 #else
43
44 # include <unistd.h>
45 # include <fcntl.h>
46
47 # define pipe_write(fd, buffer, size) write((fd), buffer, size)
48 # define pipe_read(fd, buffer, size)  read((fd), buffer, size)
49 # define pipe_close(fd)               close(fd)
50 # define PIPE_FD_INVALID              -1
51 # define PIPE_FD_ERROR                -1
52
53 #endif /* ! _WIN32 */
54
55 struct _Ecore_Pipe
56 {
57    ECORE_MAGIC;
58    int               fd_read;
59    int               fd_write;
60    Ecore_Fd_Handler *fd_handler;
61    const void       *data;
62    void            (*handler) (void *data, void *buffer, unsigned int nbyte);
63    unsigned int      len;
64    size_t            already_read;
65    void             *passed_data;
66 };
67
68
69 static int _ecore_pipe_read(void *data, Ecore_Fd_Handler *fd_handler);
70
71 /**
72  * @defgroup Ecore_Pipe_Group Pipe wrapper
73  *
74  * These functions wrap the pipe / write / read functions to
75  * easily integrate a loop that is in its own thread to the ecore
76  * main loop.
77  *
78  * The ecore_pipe_add() function creates file descriptors (sockets on
79  * Windows) and attach an handle to the ecore main loop. That handle is
80  * called when data is read in the pipe. To write data in the pipe,
81  * just call ecore_pipe_write(). When you are done, just call
82  * ecore_pipe_del().
83  *
84  * Here is an example that uses the pipe wrapper with a Gstreamer
85  * pipeline. For each decoded frame in the Gstreamer thread, a handle
86  * is called in the ecore thread.
87  *
88  * @code#include <gst/gst.h>
89  * #include <Ecore.h>
90  *
91  * static int nbr = 0;
92  *
93  * static GstElement *_buid_pipeline (gchar *filename, Ecore_Pipe *pipe);
94  *
95  * static void new_decoded_pad_cb (GstElement *demuxer,
96  *                                 GstPad     *new_pad,
97  *                                 gpointer    user_data);
98  *
99  * static void handler(void *data, void *buf, unsigned int len)
100  * {
101  *   GstBuffer  *buffer = *((GstBuffer **)buf);
102  *
103  *   printf ("handler : %p\n", buffer);
104  *   printf ("frame  : %d %p %lld %p\n", nbr++, data, (long long)GST_BUFFER_DURATION(buffer), buffer);
105  *   gst_buffer_unref (buffer);
106  * }
107  *
108  *
109  * static void handoff (GstElement* object,
110  *                      GstBuffer* arg0,
111  *                      GstPad* arg1,
112  *                      gpointer user_data)
113  * {
114  *   Ecore_Pipe *pipe;
115  *
116  *   pipe = (Ecore_Pipe *)user_data;
117  *   printf ("handoff : %p\n", arg0);
118  *   gst_buffer_ref (arg0);
119  *   ecore_pipe_write(pipe, &arg0, sizeof(arg0));
120  * }
121  *
122  * int
123  * main (int argc, char *argv[])
124  * {
125  *   GstElement *pipeline;
126  *   char *filename;
127  *   Ecore_Pipe *pipe;
128  *
129  *   gst_init (&argc, &argv);
130  *
131  *   if (!ecore_init ())
132  *     {
133  *       gst_deinit ();
134  *       return 0;
135  *     }
136  *
137  *   pipe = ecore_pipe_add (handler);
138  *   if (!pipe)
139  *     {
140  *       ecore_shutdown ();
141  *       gst_deinit ();
142  *       return 0;
143  *     }
144  *
145  *   if (argc < 2) {
146  *     g_print ("usage: %s file.avi\n", argv[0]);
147  *     ecore_pipe_del (pipe);
148  *     ecore_shutdown ();
149  *     gst_deinit ();
150  *     return 0;
151  *   }
152  *   filename = argv[1];
153  *
154  *   pipeline = _buid_pipeline (filename, pipe);
155  *   if (!pipeline) {
156  *     g_print ("Error during the pipeline building\n");
157  *     ecore_pipe_del (pipe);
158  *     ecore_shutdown ();
159  *     gst_deinit ();
160  *     return -1;
161  *   }
162  *
163  *   gst_element_set_state (pipeline, GST_STATE_PLAYING);
164  *
165  *   ecore_main_loop_begin();
166  *
167  *   ecore_pipe_del (pipe);
168  *   ecore_shutdown ();
169  *   gst_deinit ();
170  *
171  *   return 0;
172  * }
173  *
174  * static void
175  * new_decoded_pad_cb (GstElement *demuxer,
176  *                     GstPad     *new_pad,
177  *                     gpointer    user_data)
178  * {
179  *   GstElement *decoder;
180  *   GstPad *pad;
181  *   GstCaps *caps;
182  *   gchar *str;
183  *
184  *   caps = gst_pad_get_caps (new_pad);
185  *   str = gst_caps_to_string (caps);
186  *
187  *   if (g_str_has_prefix (str, "video/")) {
188  *     decoder = GST_ELEMENT (user_data);
189  *
190  *     pad = gst_element_get_pad (decoder, "sink");
191  *     if (GST_PAD_LINK_FAILED (gst_pad_link (new_pad, pad))) {
192  *       g_warning ("Failed to link %s:%s to %s:%s", GST_DEBUG_PAD_NAME (new_pad),
193  *                  GST_DEBUG_PAD_NAME (pad));
194  *     }
195  *   }
196  *   g_free (str);
197  *   gst_caps_unref (caps);
198  * }
199  *
200  * static GstElement *
201  * _buid_pipeline (gchar *filename, Ecore_Pipe *pipe)
202  * {
203  *   GstElement          *pipeline;
204  *   GstElement          *filesrc;
205  *   GstElement          *demuxer;
206  *   GstElement          *decoder;
207  *   GstElement          *sink;
208   GstStateChangeReturn res;
209  *
210  *   pipeline = gst_pipeline_new ("pipeline");
211  *   if (!pipeline)
212  *     return NULL;
213  *
214  *   filesrc = gst_element_factory_make ("filesrc", "filesrc");
215  *   if (!filesrc) {
216  *     printf ("no filesrc");
217  *     goto failure;
218  *   }
219  *   g_object_set (G_OBJECT (filesrc), "location", filename, NULL);
220  *
221  *   demuxer = gst_element_factory_make ("oggdemux", "demuxer");
222  *   if (!demuxer) {
223  *     printf ("no demux");
224  *     goto failure;
225  *   }
226  *
227  *   decoder = gst_element_factory_make ("theoradec", "decoder");
228  *   if (!decoder) {
229  *     printf ("no dec");
230  *     goto failure;
231  *   }
232  *
233  *   g_signal_connect (demuxer, "pad-added",
234  *                     G_CALLBACK (new_decoded_pad_cb), decoder);
235  *
236  *   sink = gst_element_factory_make ("fakesink", "sink");
237  *   if (!sink) {
238  *     printf ("no sink");
239  *     goto failure;
240  *   }
241  *   g_object_set (G_OBJECT (sink), "sync", EINA_TRUE, NULL);
242  *   g_object_set (G_OBJECT (sink), "signal-handoffs", EINA_TRUE, NULL);
243  *   g_signal_connect (sink, "handoff",
244  *                     G_CALLBACK (handoff), pipe);
245  *
246  *   gst_bin_add_many (GST_BIN (pipeline),
247  *                     filesrc, demuxer, decoder, sink, NULL);
248  *
249  *   if (!gst_element_link (filesrc, demuxer))
250  *     goto failure;
251  *   if (!gst_element_link (decoder, sink))
252  *     goto failure;
253  *
254  *   res = gst_element_set_state (pipeline, GST_STATE_PAUSED);
255  *   if (res == GST_STATE_CHANGE_FAILURE)
256  *     goto failure;
257  *
258  *   res = gst_element_get_state( pipeline, NULL, NULL, GST_CLOCK_TIME_NONE );
259  *   if (res != GST_STATE_CHANGE_SUCCESS)
260  *     goto failure;
261  *
262  *   return pipeline;
263  *
264  *  failure:
265  *   gst_object_unref (GST_OBJECT (pipeline));
266  *   return NULL;
267  * }
268  * @endcode
269  */
270
271
272 /**
273  * Create two file descriptors (sockets on Windows). Add
274  * a callback that will be called when the file descriptor that
275  * is listened receives data. An event is also put in the event
276  * queue when data is received.
277  *
278  * @param handler The handler called when data is received.
279  * @param data    Data to pass to @p handler when it is called.
280  * @return        A newly created Ecore_Pipe object if successful.
281  *                @c NULL otherwise.
282  * @ingroup Ecore_Pipe_Group
283  */
284 EAPI Ecore_Pipe *
285 ecore_pipe_add(void (*handler) (void *data, void *buffer, unsigned int nbyte),
286                const void *data)
287 {
288    Ecore_Pipe *p;
289    int         fds[2];
290
291    if (!handler) return NULL;
292
293    p = (Ecore_Pipe *)calloc(1, sizeof(Ecore_Pipe));
294    if (!p) return NULL;
295
296    if (pipe(fds))
297      {
298         free(p);
299         return NULL;
300      }
301
302    ECORE_MAGIC_SET(p, ECORE_MAGIC_PIPE);
303    p->fd_read = fds[0];
304    p->fd_write = fds[1];
305    p->handler = handler;
306    p->data = data;
307
308    fcntl(p->fd_read, F_SETFL, O_NONBLOCK);
309    p->fd_handler = ecore_main_fd_handler_add(p->fd_read,
310                                           ECORE_FD_READ,
311                                           _ecore_pipe_read,
312                                           p,
313                                           NULL, NULL);
314    return p;
315 }
316
317 /**
318  * Free an Ecore_Pipe object created with ecore_pipe_add().
319  *
320  * @param p The Ecore_Pipe object to be freed.
321  * @return The pointer to the private data
322  * @ingroup Ecore_Pipe_Group
323  */
324 EAPI void *
325 ecore_pipe_del(Ecore_Pipe *p)
326 {
327    void *data;
328
329    if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
330      {
331         ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_del");
332         return NULL;
333      }
334    if (p->fd_handler != NULL) ecore_main_fd_handler_del(p->fd_handler);
335    if (p->fd_read != PIPE_FD_INVALID) pipe_close(p->fd_read);
336    if (p->fd_write != PIPE_FD_INVALID) pipe_close(p->fd_write);
337    data = (void *)p->data;
338    free(p);
339    return data;
340 }
341
342 /**
343  * Close the read end of an Ecore_Pipe object created with ecore_pipe_add().
344  *
345  * @param p The Ecore_Pipe object.
346  * @ingroup Ecore_Pipe_Group
347  */
348 EAPI void
349 ecore_pipe_read_close(Ecore_Pipe *p)
350 {
351    if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
352      {
353         ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_read_close");
354         return;
355      }
356    ecore_main_fd_handler_del(p->fd_handler);
357    p->fd_handler = NULL;
358    pipe_close(p->fd_read);
359    p->fd_read = PIPE_FD_INVALID;
360 }
361
362 /**
363  * Close the write end of an Ecore_Pipe object created with ecore_pipe_add().
364  *
365  * @param p The Ecore_Pipe object.
366  * @ingroup Ecore_Pipe_Group
367  */
368 EAPI void
369 ecore_pipe_write_close(Ecore_Pipe *p)
370 {
371    if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
372      {
373         ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_write_close");
374         return;
375      }
376    pipe_close(p->fd_write);
377    p->fd_write = PIPE_FD_INVALID;
378 }
379
380 /**
381  * Write on the file descriptor the data passed as parameter.
382  *
383  * @param p      The Ecore_Pipe object.
384  * @param buffer The data to write into the pipe.
385  * @param nbytes The size of the @p buffer in bytes
386  * @return       Returns EINA_TRUE on a successful write, EINA_FALSE on an error
387  * @ingroup Ecore_Pipe_Group
388  */
389 EAPI int
390 ecore_pipe_write(Ecore_Pipe *p, const void *buffer, unsigned int nbytes)
391 {
392    ssize_t ret;
393    size_t  already_written = 0;
394    int     retry = ECORE_PIPE_WRITE_RETRY;
395
396    if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
397      {
398         ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_write");
399         return EINA_FALSE;
400      }
401
402    if (p->fd_write == PIPE_FD_INVALID) return EINA_FALSE;
403
404    /* First write the len into the pipe */
405    do
406      {
407         ret = pipe_write(p->fd_write, &nbytes, sizeof(nbytes));
408         if (ret == sizeof(nbytes))
409           {
410              retry = ECORE_PIPE_WRITE_RETRY;
411              break;
412           }
413         else if (ret > 0)
414           {
415              /* XXX What should we do here? */
416              ERR("The length of the data was not written complete"
417                  " to the pipe");
418              return EINA_FALSE;
419           }
420         else if (ret == PIPE_FD_ERROR && errno == EPIPE)
421           {
422              pipe_close(p->fd_write);
423              p->fd_write = PIPE_FD_INVALID;
424              return EINA_FALSE;
425           }
426         else if (ret == PIPE_FD_ERROR && errno == EINTR)
427           /* try it again */
428           ;
429         else
430           {
431              ERR("An unhandled error (ret: %zd errno: %d)"
432                  "occured while writing to the pipe the length",
433                  ret, errno);
434           }
435      }
436    while (retry--);
437
438    if (retry != ECORE_PIPE_WRITE_RETRY) return EINA_FALSE;
439
440    /* and now pass the data to the pipe */
441    do
442      {
443         ret = pipe_write(p->fd_write,
444                          ((unsigned char *)buffer) + already_written,
445                          nbytes - already_written);
446         
447         if (ret == (ssize_t)(nbytes - already_written))
448           return EINA_TRUE;
449         else if (ret >= 0)
450           {
451              already_written -= ret;
452              continue;
453           }
454         else if (ret == PIPE_FD_ERROR && errno == EPIPE)
455           {
456              pipe_close(p->fd_write);
457              p->fd_write = PIPE_FD_INVALID;
458              return EINA_FALSE;
459           }
460         else if (ret == PIPE_FD_ERROR && errno == EINTR)
461           /* try it again */
462           ;
463         else
464           {
465              ERR("An unhandled error (ret: %zd errno: %d)"
466                  "occured while writing to the pipe the length",
467                  ret, errno);
468           }
469      }
470    while (retry--);
471
472    return EINA_FALSE;
473 }
474
475 /* Private function */
476
477 static int
478 _ecore_pipe_read(void *data, Ecore_Fd_Handler *fd_handler __UNUSED__)
479 {
480    Ecore_Pipe  *p;
481    double       start_time;
482
483    p = (Ecore_Pipe *)data;
484    start_time = ecore_time_get();
485
486    do
487      {
488         ssize_t       ret;
489
490         /* if we already have read some data we don't need to read the len
491          * but to finish the already started job
492          */
493         if (p->len == 0)
494           {
495              /* read the len of the passed data */
496              ret = pipe_read(p->fd_read, &p->len, sizeof(p->len));
497
498              /* catch the non error case first */
499              if (ret == sizeof(p->len))
500                ;
501              else if (ret > 0)
502                {
503                   /* XXX What should we do here? */
504                   ERR("Only read %zd bytes from the pipe, although"
505                       " we need to read %zd bytes.", ret, sizeof(p->len));
506                }
507              else if (ret == 0)
508                {
509                   p->handler((void *)p->data, NULL, 0);
510                   pipe_close(p->fd_read);
511                   p->fd_read = PIPE_FD_INVALID;
512                   p->fd_handler = NULL;
513                   return ECORE_CALLBACK_CANCEL;
514                }
515 #ifndef _WIN32
516              else if ((ret == PIPE_FD_ERROR) && ((errno == EINTR) || (errno == EAGAIN)))
517                return ECORE_CALLBACK_RENEW;
518              else
519                {
520                   ERR("An unhandled error (ret: %zd errno: %d)"
521                       "occured while reading from the pipe the length",
522                       ret, errno);
523                   return ECORE_CALLBACK_RENEW;
524                }
525 #else
526              else /* ret == PIPE_FD_ERROR is the only other case on Windows */
527                {
528                   if (WSAGetLastError() != WSAEWOULDBLOCK)
529                     {
530                        p->handler((void *)p->data, NULL, 0);
531                        pipe_close(p->fd_read);
532                        p->fd_read = PIPE_FD_INVALID;
533                        p->fd_handler = NULL;
534                        return ECORE_CALLBACK_CANCEL;
535                     }
536                }
537 #endif
538           }
539
540         if (!p->passed_data)
541           p->passed_data = malloc(p->len);
542
543         /* and read the passed data */
544         ret = pipe_read(p->fd_read,
545                         ((unsigned char *)p->passed_data) + p->already_read,
546                         p->len - p->already_read);
547         
548         /* catch the non error case first */
549         if (ret == (ssize_t)(p->len - p->already_read))
550           {
551              p->handler((void *)p->data, p->passed_data, p->len);
552              free(p->passed_data);
553              /* reset all values to 0 */
554              p->passed_data = NULL;
555              p->already_read = 0;
556              p->len = 0;
557           }
558         else if (ret >= 0)
559           {
560              p->already_read += ret;
561              return ECORE_CALLBACK_RENEW;
562           }
563         else if (ret == 0)
564           {
565              p->handler((void *)p->data, NULL, 0);
566              pipe_close(p->fd_read);
567              p->fd_read = PIPE_FD_INVALID;
568              p->fd_handler = NULL;
569              return ECORE_CALLBACK_CANCEL;
570           }
571 #ifndef _WIN32
572         else if (ret == PIPE_FD_ERROR && (errno == EINTR || errno == EAGAIN))
573           return ECORE_CALLBACK_RENEW;
574         else
575           {
576              ERR("An unhandled error (ret: %zd errno: %d)"
577                  "occured while reading from the pipe the data",
578                  ret, errno);
579              return ECORE_CALLBACK_RENEW;
580           }
581 #else
582         else /* ret == PIPE_FD_ERROR is the only other case on Windows */
583           {
584              if (WSAGetLastError() != WSAEWOULDBLOCK)
585                {
586                   p->handler((void *)p->data, NULL, 0);
587                   pipe_close(p->fd_read);
588                   p->fd_read = PIPE_FD_INVALID;
589                   p->fd_handler = NULL;
590                   return ECORE_CALLBACK_CANCEL;
591                }
592              else
593                break;
594           }
595 #endif
596      }
597    while (ecore_time_get() - start_time < ecore_animator_frametime_get());
598    
599    return ECORE_CALLBACK_RENEW;
600 }