14 #include "ecore_private.h"
16 /* How of then we should retry to write to the pipe */
17 #define ECORE_PIPE_WRITE_RETRY 6
20 * On Windows, pipe() is implemented with sockets.
21 * Contrary to Linux, Windows uses different functions
22 * for sockets and fd's: write() is for fd's and send
23 * is for sockets. So I need to put some win32 code
24 * here. I can't think of a solution where the win32
25 * code is in Evil and not here.
30 # include <winsock2.h>
32 # define pipe_write(fd, buffer, size) send((fd), (char *)(buffer), size, 0)
33 # define pipe_read(fd, buffer, size) recv((fd), (char *)(buffer), size, 0)
34 # define pipe_close(fd) closesocket(fd)
35 # define PIPE_FD_INVALID INVALID_SOCKET
36 # define PIPE_FD_ERROR SOCKET_ERROR
43 # define pipe_write(fd, buffer, size) write((fd), buffer, size)
44 # define pipe_read(fd, buffer, size) read((fd), buffer, size)
45 # define pipe_close(fd) close(fd)
46 # define PIPE_FD_INVALID -1
47 # define PIPE_FD_ERROR -1
56 Ecore_Fd_Handler *fd_handler;
58 Ecore_Pipe_Cb handler;
65 static Eina_Bool _ecore_pipe_read(void *data, Ecore_Fd_Handler *fd_handler);
68 * @defgroup Ecore_Pipe_Group Pipe wrapper
70 * These functions wrap the pipe / write / read functions to
71 * easily integrate a loop that is in its own thread to the ecore
74 * The ecore_pipe_add() function creates file descriptors (sockets on
75 * Windows) and attach an handle to the ecore main loop. That handle is
76 * called when data is read in the pipe. To write data in the pipe,
77 * just call ecore_pipe_write(). When you are done, just call
80 * Here is an example that uses the pipe wrapper with a Gstreamer
81 * pipeline. For each decoded frame in the Gstreamer thread, a handle
82 * is called in the ecore thread.
84 * @code#include <gst/gst.h>
89 * static GstElement *_buid_pipeline (gchar *filename, Ecore_Pipe *pipe);
91 * static void new_decoded_pad_cb (GstElement *demuxer,
93 * gpointer user_data);
95 * static void handler(void *data, void *buf, unsigned int len)
97 * GstBuffer *buffer = *((GstBuffer **)buf);
99 * printf ("handler : %p\n", buffer);
100 * printf ("frame : %d %p %lld %p\n", nbr++, data, (long long)GST_BUFFER_DURATION(buffer), buffer);
101 * gst_buffer_unref (buffer);
105 * static void handoff (GstElement* object,
108 * gpointer user_data)
112 * pipe = (Ecore_Pipe *)user_data;
113 * printf ("handoff : %p\n", arg0);
114 * gst_buffer_ref (arg0);
115 * ecore_pipe_write(pipe, &arg0, sizeof(arg0));
119 * main (int argc, char *argv[])
121 * GstElement *pipeline;
125 * gst_init (&argc, &argv);
127 * if (!ecore_init ())
133 * pipe = ecore_pipe_add (handler);
142 * g_print ("usage: %s file.avi\n", argv[0]);
143 * ecore_pipe_del (pipe);
148 * filename = argv[1];
150 * pipeline = _buid_pipeline (filename, pipe);
152 * g_print ("Error during the pipeline building\n");
153 * ecore_pipe_del (pipe);
159 * gst_element_set_state (pipeline, GST_STATE_PLAYING);
161 * ecore_main_loop_begin();
163 * ecore_pipe_del (pipe);
171 * new_decoded_pad_cb (GstElement *demuxer,
173 * gpointer user_data)
175 * GstElement *decoder;
180 * caps = gst_pad_get_caps (new_pad);
181 * str = gst_caps_to_string (caps);
183 * if (g_str_has_prefix (str, "video/")) {
184 * decoder = GST_ELEMENT (user_data);
186 * pad = gst_element_get_pad (decoder, "sink");
187 * if (GST_PAD_LINK_FAILED (gst_pad_link (new_pad, pad))) {
188 * g_warning ("Failed to link %s:%s to %s:%s", GST_DEBUG_PAD_NAME (new_pad),
189 * GST_DEBUG_PAD_NAME (pad));
193 * gst_caps_unref (caps);
196 * static GstElement *
197 * _buid_pipeline (gchar *filename, Ecore_Pipe *pipe)
199 * GstElement *pipeline;
200 * GstElement *filesrc;
201 * GstElement *demuxer;
202 * GstElement *decoder;
204 GstStateChangeReturn res;
206 * pipeline = gst_pipeline_new ("pipeline");
210 * filesrc = gst_element_factory_make ("filesrc", "filesrc");
212 * printf ("no filesrc");
215 * g_object_set (G_OBJECT (filesrc), "location", filename, NULL);
217 * demuxer = gst_element_factory_make ("oggdemux", "demuxer");
219 * printf ("no demux");
223 * decoder = gst_element_factory_make ("theoradec", "decoder");
229 * g_signal_connect (demuxer, "pad-added",
230 * G_CALLBACK (new_decoded_pad_cb), decoder);
232 * sink = gst_element_factory_make ("fakesink", "sink");
234 * printf ("no sink");
237 * g_object_set (G_OBJECT (sink), "sync", EINA_TRUE, NULL);
238 * g_object_set (G_OBJECT (sink), "signal-handoffs", EINA_TRUE, NULL);
239 * g_signal_connect (sink, "handoff",
240 * G_CALLBACK (handoff), pipe);
242 * gst_bin_add_many (GST_BIN (pipeline),
243 * filesrc, demuxer, decoder, sink, NULL);
245 * if (!gst_element_link (filesrc, demuxer))
247 * if (!gst_element_link (decoder, sink))
250 * res = gst_element_set_state (pipeline, GST_STATE_PAUSED);
251 * if (res == GST_STATE_CHANGE_FAILURE)
254 * res = gst_element_get_state( pipeline, NULL, NULL, GST_CLOCK_TIME_NONE );
255 * if (res != GST_STATE_CHANGE_SUCCESS)
261 * gst_object_unref (GST_OBJECT (pipeline));
269 * Create two file descriptors (sockets on Windows). Add
270 * a callback that will be called when the file descriptor that
271 * is listened receives data. An event is also put in the event
272 * queue when data is received.
274 * @param handler The handler called when data is received.
275 * @param data Data to pass to @p handler when it is called.
276 * @return A newly created Ecore_Pipe object if successful.
278 * @ingroup Ecore_Pipe_Group
281 ecore_pipe_add(Ecore_Pipe_Cb handler, const void *data)
286 if (!handler) return NULL;
288 p = (Ecore_Pipe *)calloc(1, sizeof(Ecore_Pipe));
297 ECORE_MAGIC_SET(p, ECORE_MAGIC_PIPE);
299 p->fd_write = fds[1];
300 p->handler = handler;
303 fcntl(p->fd_read, F_SETFL, O_NONBLOCK);
304 p->fd_handler = ecore_main_fd_handler_add(p->fd_read,
313 * Free an Ecore_Pipe object created with ecore_pipe_add().
315 * @param p The Ecore_Pipe object to be freed.
316 * @return The pointer to the private data
317 * @ingroup Ecore_Pipe_Group
320 ecore_pipe_del(Ecore_Pipe *p)
324 if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
326 ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_del");
329 if (p->fd_handler) ecore_main_fd_handler_del(p->fd_handler);
330 if (p->fd_read != PIPE_FD_INVALID) pipe_close(p->fd_read);
331 if (p->fd_write != PIPE_FD_INVALID) pipe_close(p->fd_write);
332 data = (void *)p->data;
338 * Close the read end of an Ecore_Pipe object created with ecore_pipe_add().
340 * @param p The Ecore_Pipe object.
341 * @ingroup Ecore_Pipe_Group
344 ecore_pipe_read_close(Ecore_Pipe *p)
346 if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
348 ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_read_close");
351 ecore_main_fd_handler_del(p->fd_handler);
352 p->fd_handler = NULL;
353 pipe_close(p->fd_read);
354 p->fd_read = PIPE_FD_INVALID;
358 * Close the write end of an Ecore_Pipe object created with ecore_pipe_add().
360 * @param p The Ecore_Pipe object.
361 * @ingroup Ecore_Pipe_Group
364 ecore_pipe_write_close(Ecore_Pipe *p)
366 if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
368 ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_write_close");
371 pipe_close(p->fd_write);
372 p->fd_write = PIPE_FD_INVALID;
376 * Write on the file descriptor the data passed as parameter.
378 * @param p The Ecore_Pipe object.
379 * @param buffer The data to write into the pipe.
380 * @param nbytes The size of the @p buffer in bytes
381 * @return Returns EINA_TRUE on a successful write, EINA_FALSE on an error
382 * @ingroup Ecore_Pipe_Group
385 ecore_pipe_write(Ecore_Pipe *p, const void *buffer, unsigned int nbytes)
388 size_t already_written = 0;
389 int retry = ECORE_PIPE_WRITE_RETRY;
391 if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
393 ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_write");
397 if (p->fd_write == PIPE_FD_INVALID) return EINA_FALSE;
399 /* First write the len into the pipe */
402 ret = pipe_write(p->fd_write, &nbytes, sizeof(nbytes));
403 if (ret == sizeof(nbytes))
405 retry = ECORE_PIPE_WRITE_RETRY;
410 /* XXX What should we do here? */
411 ERR("The length of the data was not written complete"
415 else if (ret == PIPE_FD_ERROR && errno == EPIPE)
417 pipe_close(p->fd_write);
418 p->fd_write = PIPE_FD_INVALID;
421 else if (ret == PIPE_FD_ERROR && errno == EINTR)
426 ERR("An unhandled error (ret: %zd errno: %d)"
427 "occured while writing to the pipe the length",
433 if (retry != ECORE_PIPE_WRITE_RETRY) return EINA_FALSE;
435 /* and now pass the data to the pipe */
438 ret = pipe_write(p->fd_write,
439 ((unsigned char *)buffer) + already_written,
440 nbytes - already_written);
442 if (ret == (ssize_t)(nbytes - already_written))
446 already_written -= ret;
449 else if (ret == PIPE_FD_ERROR && errno == EPIPE)
451 pipe_close(p->fd_write);
452 p->fd_write = PIPE_FD_INVALID;
455 else if (ret == PIPE_FD_ERROR && errno == EINTR)
460 ERR("An unhandled error (ret: %zd errno: %d)"
461 "occured while writing to the pipe the length",
470 /* Private function */
473 _ecore_pipe_read(void *data, Ecore_Fd_Handler *fd_handler __UNUSED__)
478 p = (Ecore_Pipe *)data;
479 start_time = ecore_time_get();
485 /* if we already have read some data we don't need to read the len
486 * but to finish the already started job
490 /* read the len of the passed data */
491 ret = pipe_read(p->fd_read, &p->len, sizeof(p->len));
493 /* catch the non error case first */
494 if (ret == sizeof(p->len))
498 /* XXX What should we do here? */
499 ERR("Only read %zd bytes from the pipe, although"
500 " we need to read %zd bytes.", ret, sizeof(p->len));
504 p->handler((void *)p->data, NULL, 0);
505 pipe_close(p->fd_read);
506 p->fd_read = PIPE_FD_INVALID;
507 p->fd_handler = NULL;
508 return ECORE_CALLBACK_CANCEL;
511 else if ((ret == PIPE_FD_ERROR) && ((errno == EINTR) || (errno == EAGAIN)))
512 return ECORE_CALLBACK_RENEW;
515 ERR("An unhandled error (ret: %zd errno: %d)"
516 "occured while reading from the pipe the length",
518 return ECORE_CALLBACK_RENEW;
521 else /* ret == PIPE_FD_ERROR is the only other case on Windows */
523 if (WSAGetLastError() != WSAEWOULDBLOCK)
525 p->handler((void *)p->data, NULL, 0);
526 pipe_close(p->fd_read);
527 p->fd_read = PIPE_FD_INVALID;
528 p->fd_handler = NULL;
529 return ECORE_CALLBACK_CANCEL;
536 p->passed_data = malloc(p->len);
538 /* and read the passed data */
539 ret = pipe_read(p->fd_read,
540 ((unsigned char *)p->passed_data) + p->already_read,
541 p->len - p->already_read);
543 /* catch the non error case first */
544 if (ret == (ssize_t)(p->len - p->already_read))
546 p->handler((void *)p->data, p->passed_data, p->len);
547 free(p->passed_data);
548 /* reset all values to 0 */
549 p->passed_data = NULL;
555 p->already_read += ret;
556 return ECORE_CALLBACK_RENEW;
560 p->handler((void *)p->data, NULL, 0);
561 pipe_close(p->fd_read);
562 p->fd_read = PIPE_FD_INVALID;
563 p->fd_handler = NULL;
564 return ECORE_CALLBACK_CANCEL;
567 else if (ret == PIPE_FD_ERROR && (errno == EINTR || errno == EAGAIN))
568 return ECORE_CALLBACK_RENEW;
571 ERR("An unhandled error (ret: %zd errno: %d)"
572 "occured while reading from the pipe the data",
574 return ECORE_CALLBACK_RENEW;
577 else /* ret == PIPE_FD_ERROR is the only other case on Windows */
579 if (WSAGetLastError() != WSAEWOULDBLOCK)
581 p->handler((void *)p->data, NULL, 0);
582 pipe_close(p->fd_read);
583 p->fd_read = PIPE_FD_INVALID;
584 p->fd_handler = NULL;
585 return ECORE_CALLBACK_CANCEL;
592 while (ecore_time_get() - start_time < ecore_animator_frametime_get());
594 return ECORE_CALLBACK_RENEW;