2 * vim:ts=8:sw=3:sts=8:noexpandtab:cino=>5n-3f0^-2{2
18 #include "ecore_private.h"
20 /* How of then we should retry to write to the pipe */
21 #define ECORE_PIPE_WRITE_RETRY 6
24 * On Windows, pipe() is implemented with sockets.
25 * Contrary to Linux, Windows uses different functions
26 * for sockets and fd's: write() is for fd's and send
27 * is for sockets. So I need to put some win32 code
28 * here. I can't think of a solution where the win32
29 * code is in Evil and not here.
34 # include <winsock2.h>
36 # define pipe_write(fd, buffer, size) send((fd), (char *)(buffer), size, 0)
37 # define pipe_read(fd, buffer, size) recv((fd), (char *)(buffer), size, 0)
38 # define pipe_close(fd) closesocket(fd)
39 # define PIPE_FD_INVALID INVALID_SOCKET
40 # define PIPE_FD_ERROR SOCKET_ERROR
47 # define pipe_write(fd, buffer, size) write((fd), buffer, size)
48 # define pipe_read(fd, buffer, size) read((fd), buffer, size)
49 # define pipe_close(fd) close(fd)
50 # define PIPE_FD_INVALID -1
51 # define PIPE_FD_ERROR -1
60 Ecore_Fd_Handler *fd_handler;
62 void (*handler) (void *data, void *buffer, unsigned int nbyte);
69 static int _ecore_pipe_read(void *data, Ecore_Fd_Handler *fd_handler);
72 * @defgroup Ecore_Pipe_Group Pipe wrapper
74 * These functions wrap the pipe / write / read functions to
75 * easily integrate a loop that is in its own thread to the ecore
78 * The ecore_pipe_add() function creates file descriptors (sockets on
79 * Windows) and attach an handle to the ecore main loop. That handle is
80 * called when data is read in the pipe. To write data in the pipe,
81 * just call ecore_pipe_write(). When you are done, just call
84 * Here is an example that uses the pipe wrapper with a Gstreamer
85 * pipeline. For each decoded frame in the Gstreamer thread, a handle
86 * is called in the ecore thread.
88 * @code#include <gst/gst.h>
93 * static GstElement *_buid_pipeline (gchar *filename, Ecore_Pipe *pipe);
95 * static void new_decoded_pad_cb (GstElement *demuxer,
97 * gpointer user_data);
99 * static void handler(void *data, void *buf, unsigned int len)
101 * GstBuffer *buffer = *((GstBuffer **)buf);
103 * printf ("handler : %p\n", buffer);
104 * printf ("frame : %d %p %lld %p\n", nbr++, data, (long long)GST_BUFFER_DURATION(buffer), buffer);
105 * gst_buffer_unref (buffer);
109 * static void handoff (GstElement* object,
112 * gpointer user_data)
116 * pipe = (Ecore_Pipe *)user_data;
117 * printf ("handoff : %p\n", arg0);
118 * gst_buffer_ref (arg0);
119 * ecore_pipe_write(pipe, &arg0, sizeof(arg0));
123 * main (int argc, char *argv[])
125 * GstElement *pipeline;
129 * gst_init (&argc, &argv);
131 * if (!ecore_init ())
137 * pipe = ecore_pipe_add (handler);
146 * g_print ("usage: %s file.avi\n", argv[0]);
147 * ecore_pipe_del (pipe);
152 * filename = argv[1];
154 * pipeline = _buid_pipeline (filename, pipe);
156 * g_print ("Error during the pipeline building\n");
157 * ecore_pipe_del (pipe);
163 * gst_element_set_state (pipeline, GST_STATE_PLAYING);
165 * ecore_main_loop_begin();
167 * ecore_pipe_del (pipe);
175 * new_decoded_pad_cb (GstElement *demuxer,
177 * gpointer user_data)
179 * GstElement *decoder;
184 * caps = gst_pad_get_caps (new_pad);
185 * str = gst_caps_to_string (caps);
187 * if (g_str_has_prefix (str, "video/")) {
188 * decoder = GST_ELEMENT (user_data);
190 * pad = gst_element_get_pad (decoder, "sink");
191 * if (GST_PAD_LINK_FAILED (gst_pad_link (new_pad, pad))) {
192 * g_warning ("Failed to link %s:%s to %s:%s", GST_DEBUG_PAD_NAME (new_pad),
193 * GST_DEBUG_PAD_NAME (pad));
197 * gst_caps_unref (caps);
200 * static GstElement *
201 * _buid_pipeline (gchar *filename, Ecore_Pipe *pipe)
203 * GstElement *pipeline;
204 * GstElement *filesrc;
205 * GstElement *demuxer;
206 * GstElement *decoder;
208 GstStateChangeReturn res;
210 * pipeline = gst_pipeline_new ("pipeline");
214 * filesrc = gst_element_factory_make ("filesrc", "filesrc");
216 * printf ("no filesrc");
219 * g_object_set (G_OBJECT (filesrc), "location", filename, NULL);
221 * demuxer = gst_element_factory_make ("oggdemux", "demuxer");
223 * printf ("no demux");
227 * decoder = gst_element_factory_make ("theoradec", "decoder");
233 * g_signal_connect (demuxer, "pad-added",
234 * G_CALLBACK (new_decoded_pad_cb), decoder);
236 * sink = gst_element_factory_make ("fakesink", "sink");
238 * printf ("no sink");
241 * g_object_set (G_OBJECT (sink), "sync", EINA_TRUE, NULL);
242 * g_object_set (G_OBJECT (sink), "signal-handoffs", EINA_TRUE, NULL);
243 * g_signal_connect (sink, "handoff",
244 * G_CALLBACK (handoff), pipe);
246 * gst_bin_add_many (GST_BIN (pipeline),
247 * filesrc, demuxer, decoder, sink, NULL);
249 * if (!gst_element_link (filesrc, demuxer))
251 * if (!gst_element_link (decoder, sink))
254 * res = gst_element_set_state (pipeline, GST_STATE_PAUSED);
255 * if (res == GST_STATE_CHANGE_FAILURE)
258 * res = gst_element_get_state( pipeline, NULL, NULL, GST_CLOCK_TIME_NONE );
259 * if (res != GST_STATE_CHANGE_SUCCESS)
265 * gst_object_unref (GST_OBJECT (pipeline));
273 * Create two file descriptors (sockets on Windows). Add
274 * a callback that will be called when the file descriptor that
275 * is listened receives data. An event is also put in the event
276 * queue when data is received.
278 * @param handler The handler called when data is received.
279 * @param data Data to pass to @p handler when it is called.
280 * @return A newly created Ecore_Pipe object if successful.
282 * @ingroup Ecore_Pipe_Group
285 ecore_pipe_add(void (*handler) (void *data, void *buffer, unsigned int nbyte),
291 if (!handler) return NULL;
293 p = (Ecore_Pipe *)calloc(1, sizeof(Ecore_Pipe));
302 ECORE_MAGIC_SET(p, ECORE_MAGIC_PIPE);
304 p->fd_write = fds[1];
305 p->handler = handler;
308 fcntl(p->fd_read, F_SETFL, O_NONBLOCK);
309 p->fd_handler = ecore_main_fd_handler_add(p->fd_read,
318 * Free an Ecore_Pipe object created with ecore_pipe_add().
320 * @param p The Ecore_Pipe object to be freed.
321 * @return The pointer to the private data
322 * @ingroup Ecore_Pipe_Group
325 ecore_pipe_del(Ecore_Pipe *p)
329 if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
331 ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_del");
334 if (p->fd_handler != NULL) ecore_main_fd_handler_del(p->fd_handler);
335 if (p->fd_read != PIPE_FD_INVALID) pipe_close(p->fd_read);
336 if (p->fd_write != PIPE_FD_INVALID) pipe_close(p->fd_write);
337 data = (void *)p->data;
343 * Close the read end of an Ecore_Pipe object created with ecore_pipe_add().
345 * @param p The Ecore_Pipe object.
346 * @ingroup Ecore_Pipe_Group
349 ecore_pipe_read_close(Ecore_Pipe *p)
351 if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
353 ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_read_close");
356 ecore_main_fd_handler_del(p->fd_handler);
357 p->fd_handler = NULL;
358 pipe_close(p->fd_read);
359 p->fd_read = PIPE_FD_INVALID;
363 * Close the write end of an Ecore_Pipe object created with ecore_pipe_add().
365 * @param p The Ecore_Pipe object.
366 * @ingroup Ecore_Pipe_Group
369 ecore_pipe_write_close(Ecore_Pipe *p)
371 if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
373 ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_write_close");
376 pipe_close(p->fd_write);
377 p->fd_write = PIPE_FD_INVALID;
381 * Write on the file descriptor the data passed as parameter.
383 * @param p The Ecore_Pipe object.
384 * @param buffer The data to write into the pipe.
385 * @param nbytes The size of the @p buffer in bytes
386 * @return Returns EINA_TRUE on a successful write, EINA_FALSE on an error
387 * @ingroup Ecore_Pipe_Group
390 ecore_pipe_write(Ecore_Pipe *p, const void *buffer, unsigned int nbytes)
393 size_t already_written = 0;
394 int retry = ECORE_PIPE_WRITE_RETRY;
396 if (!ECORE_MAGIC_CHECK(p, ECORE_MAGIC_PIPE))
398 ECORE_MAGIC_FAIL(p, ECORE_MAGIC_PIPE, "ecore_pipe_write");
402 if (p->fd_write == PIPE_FD_INVALID) return EINA_FALSE;
404 /* First write the len into the pipe */
407 ret = pipe_write(p->fd_write, &nbytes, sizeof(nbytes));
408 if (ret == sizeof(nbytes))
410 retry = ECORE_PIPE_WRITE_RETRY;
415 /* XXX What should we do here? */
416 ERR("The length of the data was not written complete"
420 else if (ret == PIPE_FD_ERROR && errno == EPIPE)
422 pipe_close(p->fd_write);
423 p->fd_write = PIPE_FD_INVALID;
426 else if (ret == PIPE_FD_ERROR && errno == EINTR)
431 ERR("An unhandled error (ret: %zd errno: %d)"
432 "occured while writing to the pipe the length",
438 if (retry != ECORE_PIPE_WRITE_RETRY) return EINA_FALSE;
440 /* and now pass the data to the pipe */
443 ret = pipe_write(p->fd_write,
444 ((unsigned char *)buffer) + already_written,
445 nbytes - already_written);
447 if (ret == (ssize_t)(nbytes - already_written))
451 already_written -= ret;
454 else if (ret == PIPE_FD_ERROR && errno == EPIPE)
456 pipe_close(p->fd_write);
457 p->fd_write = PIPE_FD_INVALID;
460 else if (ret == PIPE_FD_ERROR && errno == EINTR)
465 ERR("An unhandled error (ret: %zd errno: %d)"
466 "occured while writing to the pipe the length",
475 /* Private function */
478 _ecore_pipe_read(void *data, Ecore_Fd_Handler *fd_handler __UNUSED__)
483 p = (Ecore_Pipe *)data;
484 start_time = ecore_time_get();
490 /* if we already have read some data we don't need to read the len
491 * but to finish the already started job
495 /* read the len of the passed data */
496 ret = pipe_read(p->fd_read, &p->len, sizeof(p->len));
498 /* catch the non error case first */
499 if (ret == sizeof(p->len))
503 /* XXX What should we do here? */
504 ERR("Only read %zd bytes from the pipe, although"
505 " we need to read %zd bytes.", ret, sizeof(p->len));
509 p->handler((void *)p->data, NULL, 0);
510 pipe_close(p->fd_read);
511 p->fd_read = PIPE_FD_INVALID;
512 p->fd_handler = NULL;
513 return ECORE_CALLBACK_CANCEL;
516 else if ((ret == PIPE_FD_ERROR) && ((errno == EINTR) || (errno == EAGAIN)))
517 return ECORE_CALLBACK_RENEW;
520 ERR("An unhandled error (ret: %zd errno: %d)"
521 "occured while reading from the pipe the length",
523 return ECORE_CALLBACK_RENEW;
526 else /* ret == PIPE_FD_ERROR is the only other case on Windows */
528 if (WSAGetLastError() != WSAEWOULDBLOCK)
530 p->handler((void *)p->data, NULL, 0);
531 pipe_close(p->fd_read);
532 p->fd_read = PIPE_FD_INVALID;
533 p->fd_handler = NULL;
534 return ECORE_CALLBACK_CANCEL;
541 p->passed_data = malloc(p->len);
543 /* and read the passed data */
544 ret = pipe_read(p->fd_read,
545 ((unsigned char *)p->passed_data) + p->already_read,
546 p->len - p->already_read);
548 /* catch the non error case first */
549 if (ret == (ssize_t)(p->len - p->already_read))
551 p->handler((void *)p->data, p->passed_data, p->len);
552 free(p->passed_data);
553 /* reset all values to 0 */
554 p->passed_data = NULL;
560 p->already_read += ret;
561 return ECORE_CALLBACK_RENEW;
565 p->handler((void *)p->data, NULL, 0);
566 pipe_close(p->fd_read);
567 p->fd_read = PIPE_FD_INVALID;
568 p->fd_handler = NULL;
569 return ECORE_CALLBACK_CANCEL;
572 else if (ret == PIPE_FD_ERROR && (errno == EINTR || errno == EAGAIN))
573 return ECORE_CALLBACK_RENEW;
576 ERR("An unhandled error (ret: %zd errno: %d)"
577 "occured while reading from the pipe the data",
579 return ECORE_CALLBACK_RENEW;
582 else /* ret == PIPE_FD_ERROR is the only other case on Windows */
584 if (WSAGetLastError() != WSAEWOULDBLOCK)
586 p->handler((void *)p->data, NULL, 0);
587 pipe_close(p->fd_read);
588 p->fd_read = PIPE_FD_INVALID;
589 p->fd_handler = NULL;
590 return ECORE_CALLBACK_CANCEL;
597 while (ecore_time_get() - start_time < ecore_animator_frametime_get());
599 return ECORE_CALLBACK_RENEW;