14 #include "ecore_private.h"
16 /* How of then we should retry to write to the pipe */
17 #define ECORE_PIPE_WRITE_RETRY 6
20 * On Windows, pipe() is implemented with sockets.
21 * Contrary to Linux, Windows uses different functions
22 * for sockets and fd's: write() is for fd's and send
23 * is for sockets. So I need to put some win32 code
24 * here. I can't think of a solution where the win32
25 * code is in Evil and not here.
30 # include <winsock2.h>
32 # define pipe_write(fd, buffer, size) send((fd), (char *)(buffer), size, 0)
33 # define pipe_read(fd, buffer, size) recv((fd), (char *)(buffer), size, 0)
34 # define pipe_close(fd) closesocket(fd)
35 # define PIPE_FD_INVALID INVALID_SOCKET
36 # define PIPE_FD_ERROR SOCKET_ERROR
43 # define pipe_write(fd, buffer, size) write((fd), buffer, size)
44 # define pipe_read(fd, buffer, size) read((fd), buffer, size)
45 # define pipe_close(fd) close(fd)
46 # define PIPE_FD_INVALID -1
47 # define PIPE_FD_ERROR -1
56 Ecore_Fd_Handler *fd_handler;
58 Ecore_Pipe_Cb handler;
63 Eina_Bool delete_me : 1;
67 static Eina_Bool _ecore_pipe_read(void *data, Ecore_Fd_Handler *fd_handler);
70 * @addtogroup Ecore_Group Ecore - Main Loop and Job Functions.
76 * @addtogroup Ecore_Pipe_Group Pipe wrapper
78 * These functions wrap the pipe / write / read functions to
79 * easily integrate a loop that is in its own thread to the ecore
82 * The ecore_pipe_add() function creates file descriptors (sockets on
83 * Windows) and attach an handle to the ecore main loop. That handle is
84 * called when data is read in the pipe. To write data in the pipe,
85 * just call ecore_pipe_write(). When you are done, just call
88 * Here is an example that uses the pipe wrapper with a Gstreamer
89 * pipeline. For each decoded frame in the Gstreamer thread, a handle
90 * is called in the ecore thread.
92 * @code#include <gst/gst.h>
97 * static GstElement *_buid_pipeline (gchar *filename, Ecore_Pipe *pipe);
99 * static void new_decoded_pad_cb (GstElement *demuxer,
101 * gpointer user_data);
103 * static void handler(void *data, void *buf, unsigned int len)
105 * GstBuffer *buffer = *((GstBuffer **)buf);
107 * printf ("handler : %p\n", buffer);
108 * printf ("frame : %d %p %lld %p\n", nbr++, data, (long long)GST_BUFFER_DURATION(buffer), buffer);
109 * gst_buffer_unref (buffer);
113 * static void handoff (GstElement* object,
116 * gpointer user_data)
120 * pipe = (Ecore_Pipe *)user_data;
121 * printf ("handoff : %p\n", arg0);
122 * gst_buffer_ref (arg0);
123 * ecore_pipe_write(pipe, &arg0, sizeof(arg0));
127 * main (int argc, char *argv[])
129 * GstElement *pipeline;
133 * gst_init (&argc, &argv);
135 * if (!ecore_init ())
141 * pipe = ecore_pipe_add (handler);
150 * g_print ("usage: %s file.avi\n", argv[0]);
151 * ecore_pipe_del (pipe);
156 * filename = argv[1];
158 * pipeline = _buid_pipeline (filename, pipe);
160 * g_print ("Error during the pipeline building\n");
161 * ecore_pipe_del (pipe);
167 * gst_element_set_state (pipeline, GST_STATE_PLAYING);
169 * ecore_main_loop_begin();
171 * ecore_pipe_del (pipe);
179 * new_decoded_pad_cb (GstElement *demuxer,
181 * gpointer user_data)
183 * GstElement *decoder;
188 * caps = gst_pad_get_caps (new_pad);
189 * str = gst_caps_to_string (caps);
191 * if (g_str_has_prefix (str, "video/")) {
192 * decoder = GST_ELEMENT (user_data);
194 * pad = gst_element_get_pad (decoder, "sink");
195 * if (GST_PAD_LINK_FAILED (gst_pad_link (new_pad, pad))) {
196 * g_warning ("Failed to link %s:%s to %s:%s", GST_DEBUG_PAD_NAME (new_pad),
197 * GST_DEBUG_PAD_NAME (pad));
201 * gst_caps_unref (caps);
204 * static GstElement *
205 * _buid_pipeline (gchar *filename, Ecore_Pipe *pipe)
207 * GstElement *pipeline;
208 * GstElement *filesrc;
209 * GstElement *demuxer;
210 * GstElement *decoder;
212 GstStateChangeReturn res;
214 * pipeline = gst_pipeline_new ("pipeline");
218 * filesrc = gst_element_factory_make ("filesrc", "filesrc");
220 * printf ("no filesrc");
223 * g_object_set (G_OBJECT (filesrc), "location", filename, NULL);
225 * demuxer = gst_element_factory_make ("oggdemux", "demuxer");
227 * printf ("no demux");
231 * decoder = gst_element_factory_make ("theoradec", "decoder");
237 * g_signal_connect (demuxer, "pad-added",
238 * G_CALLBACK (new_decoded_pad_cb), decoder);
240 * sink = gst_element_factory_make ("fakesink", "sink");
242 * printf ("no sink");
245 * g_object_set (G_OBJECT (sink), "sync", EINA_TRUE, NULL);
246 * g_object_set (G_OBJECT (sink), "signal-handoffs", EINA_TRUE, NULL);
247 * g_signal_connect (sink, "handoff",
248 * G_CALLBACK (handoff), pipe);
250 * gst_bin_add_many (GST_BIN (pipeline),
251 * filesrc, demuxer, decoder, sink, NULL);
253 * if (!gst_element_link (filesrc, demuxer))
255 * if (!gst_element_link (decoder, sink))
258 * res = gst_element_set_state (pipeline, GST_STATE_PAUSED);
259 * if (res == GST_STATE_CHANGE_FAILURE)
262 * res = gst_element_get_state( pipeline, NULL, NULL, GST_CLOCK_TIME_NONE );
263 * if (res != GST_STATE_CHANGE_SUCCESS)
269 * gst_object_unref (GST_OBJECT (pipeline));
277 * Create two file descriptors (sockets on Windows). Add
278 * a callback that will be called when the file descriptor that
279 * is listened receives data. An event is also put in the event
280 * queue when data is received.
282 * @param handler The handler called when data is received.
283 * @param data Data to pass to @p handler when it is called.
284 * @return A newly created Ecore_Pipe object if successful.
288 ecore_pipe_add(Ecore_Pipe_Cb handler, const void *data)
293 if (!handler) return NULL;
295 p = (Ecore_Pipe *)calloc(1, sizeof(Ecore_Pipe));
304 ECORE_MAGIC_SET(p, ECORE_MAGIC_PIPE);
306 p->fd_write = fds[1];
307 p->handler = handler;
310 fcntl(p->fd_read, F_SETFL, O_NONBLOCK);
311 p->fd_handler = ecore_main_fd_handler_add(p->fd_read,
320 * Free an Ecore_Pipe object created with ecore_pipe_add().
322 * @param p The Ecore_Pipe object to be freed.
323 * @return The pointer to the private data
326 ecore_pipe_del(Ecore_Pipe *p)
330 if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
332 ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_del");
335 p->delete_me = EINA_TRUE;
336 if (p->handling > 0) return (void *)p->data;
337 if (p->fd_handler) ecore_main_fd_handler_del(p->fd_handler);
338 if (p->fd_read != PIPE_FD_INVALID) pipe_close(p->fd_read);
339 if (p->fd_write != PIPE_FD_INVALID) pipe_close(p->fd_write);
340 data = (void *)p->data;
346 * Close the read end of an Ecore_Pipe object created with ecore_pipe_add().
348 * @param p The Ecore_Pipe object.
351 ecore_pipe_read_close(Ecore_Pipe *p)
353 if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
355 ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_read_close");
360 ecore_main_fd_handler_del(p->fd_handler);
361 p->fd_handler = NULL;
363 if (p->fd_read != PIPE_FD_INVALID)
365 pipe_close(p->fd_read);
366 p->fd_read = PIPE_FD_INVALID;
371 * Close the write end of an Ecore_Pipe object created with ecore_pipe_add().
373 * @param p The Ecore_Pipe object.
376 ecore_pipe_write_close(Ecore_Pipe *p)
378 if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
380 ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_write_close");
383 if (p->fd_write != PIPE_FD_INVALID)
385 pipe_close(p->fd_write);
386 p->fd_write = PIPE_FD_INVALID;
391 * Write on the file descriptor the data passed as parameter.
393 * @param p The Ecore_Pipe object.
394 * @param buffer The data to write into the pipe.
395 * @param nbytes The size of the @p buffer in bytes
396 * @return Returns EINA_TRUE on a successful write, EINA_FALSE on an error
399 ecore_pipe_write(Ecore_Pipe *p, const void *buffer, unsigned int nbytes)
402 size_t already_written = 0;
403 int retry = ECORE_PIPE_WRITE_RETRY;
405 if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
407 ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_write");
411 if (p->delete_me) return EINA_FALSE;
413 if (p->fd_write == PIPE_FD_INVALID) return EINA_FALSE;
415 /* First write the len into the pipe */
418 ret = pipe_write(p->fd_write, &nbytes, sizeof(nbytes));
419 if (ret == sizeof(nbytes))
421 retry = ECORE_PIPE_WRITE_RETRY;
426 /* XXX What should we do here? */
427 ERR("The length of the data was not written complete"
431 else if (ret == PIPE_FD_ERROR && errno == EPIPE)
433 pipe_close(p->fd_write);
434 p->fd_write = PIPE_FD_INVALID;
437 else if (ret == PIPE_FD_ERROR && errno == EINTR)
442 ERR("An unhandled error (ret: %zd errno: %d)"
443 "occurred while writing to the pipe the length",
449 if (retry != ECORE_PIPE_WRITE_RETRY) return EINA_FALSE;
451 /* and now pass the data to the pipe */
454 ret = pipe_write(p->fd_write,
455 ((unsigned char *)buffer) + already_written,
456 nbytes - already_written);
458 if (ret == (ssize_t)(nbytes - already_written))
462 already_written -= ret;
465 else if (ret == PIPE_FD_ERROR && errno == EPIPE)
467 pipe_close(p->fd_write);
468 p->fd_write = PIPE_FD_INVALID;
471 else if (ret == PIPE_FD_ERROR && errno == EINTR)
476 ERR("An unhandled error (ret: %zd errno: %d)"
477 "occurred while writing to the pipe the length",
494 /* Private function */
496 _ecore_pipe_unhandle(Ecore_Pipe *p)
506 _ecore_pipe_read(void *data, Ecore_Fd_Handler *fd_handler __UNUSED__)
512 p = (Ecore_Pipe *)data;
513 start_time = ecore_time_get();
516 for (i = 0; i < 16; i++)
520 /* if we already have read some data we don't need to read the len
521 * but to finish the already started job
525 /* read the len of the passed data */
526 ret = pipe_read(p->fd_read, &p->len, sizeof(p->len));
528 /* catch the non error case first */
529 if (ret == sizeof(p->len))
533 /* XXX What should we do here? */
534 ERR("Only read %zd bytes from the pipe, although"
535 " we need to read %zd bytes.", ret, sizeof(p->len));
536 _ecore_pipe_unhandle(p);
537 return ECORE_CALLBACK_CANCEL;
541 p->handler((void *)p->data, NULL, 0);
542 pipe_close(p->fd_read);
543 p->fd_read = PIPE_FD_INVALID;
544 p->fd_handler = NULL;
545 _ecore_pipe_unhandle(p);
546 return ECORE_CALLBACK_CANCEL;
549 else if ((ret == PIPE_FD_ERROR) && ((errno == EINTR) || (errno == EAGAIN)))
551 _ecore_pipe_unhandle(p);
552 return ECORE_CALLBACK_RENEW;
556 ERR("An unhandled error (ret: %zd errno: %d [%s])"
557 "occurred while reading from the pipe the length",
558 ret, errno, strerror(errno));
559 _ecore_pipe_unhandle(p);
560 return ECORE_CALLBACK_RENEW;
563 else /* ret == PIPE_FD_ERROR is the only other case on Windows */
565 if (WSAGetLastError() != WSAEWOULDBLOCK)
567 p->handler((void *)p->data, NULL, 0);
568 pipe_close(p->fd_read);
569 p->fd_read = PIPE_FD_INVALID;
570 p->fd_handler = NULL;
571 _ecore_pipe_unhandle(p);
572 return ECORE_CALLBACK_CANCEL;
579 p->passed_data = malloc(p->len);
581 /* and read the passed data */
582 ret = pipe_read(p->fd_read,
583 ((unsigned char *)p->passed_data) + p->already_read,
584 p->len - p->already_read);
586 /* catch the non error case first */
587 if (ret == (ssize_t)(p->len - p->already_read))
589 p->handler((void *)p->data, p->passed_data, p->len);
590 free(p->passed_data);
591 /* reset all values to 0 */
592 p->passed_data = NULL;
598 p->already_read += ret;
599 _ecore_pipe_unhandle(p);
600 return ECORE_CALLBACK_RENEW;
604 p->handler((void *)p->data, NULL, 0);
605 pipe_close(p->fd_read);
606 p->fd_read = PIPE_FD_INVALID;
607 p->fd_handler = NULL;
608 _ecore_pipe_unhandle(p);
609 return ECORE_CALLBACK_CANCEL;
612 else if (ret == PIPE_FD_ERROR && (errno == EINTR || errno == EAGAIN))
614 return ECORE_CALLBACK_RENEW;
618 ERR("An unhandled error (ret: %zd errno: %d)"
619 "occurred while reading from the pipe the data",
621 _ecore_pipe_unhandle(p);
622 return ECORE_CALLBACK_RENEW;
625 else /* ret == PIPE_FD_ERROR is the only other case on Windows */
627 if (WSAGetLastError() != WSAEWOULDBLOCK)
629 p->handler((void *)p->data, NULL, 0);
630 pipe_close(p->fd_read);
631 p->fd_read = PIPE_FD_INVALID;
632 p->fd_handler = NULL;
633 _ecore_pipe_unhandle(p);
634 return ECORE_CALLBACK_CANCEL;
642 _ecore_pipe_unhandle(p);
643 return ECORE_CALLBACK_RENEW;