+2008-02-28 Wim Taymans <wim.taymans@collabora.co.uk>
+
+ Patch by: Peter Kjellerstedt <pkj at axis dot com>
+
+ * plugins/elements/gstfdsink.c: (gst_fd_sink_render),
+ (gst_fd_sink_start), (gst_fd_sink_stop), (gst_fd_sink_unlock),
+ (gst_fd_sink_unlock_stop), (gst_fd_sink_update_fd):
+ * plugins/elements/gstfdsink.h:
+ * plugins/elements/gstfdsrc.c: (gst_fd_src_update_fd),
+ (gst_fd_src_start), (gst_fd_src_stop), (gst_fd_src_unlock),
+ (gst_fd_src_unlock_stop), (gst_fd_src_create),
+ (gst_fd_src_uri_set_uri):
+ * plugins/elements/gstfdsrc.h:
+ Port to GstPoll. See #505417.
+
2008-02-27 Jan Schmidt <jan.schmidt@sun.com>
* win32/common/libgstreamer.def:
#include "gstfdsink.h"
-/* We add a control socket as in fdsrc to make it shutdown quickly when it's blocking on the fd.
- * Select is used to determine when the fd is ready for use. When the element state is changed,
- * it happens from another thread while fdsink is select'ing on the fd. The state-change thread
- * sends a control message, so fdsink wakes up and changes state immediately otherwise
- * it would stay blocked until it receives some data. */
-
-/* the select call is also performed on the control sockets, that way
- * we can send special commands to unblock the select call */
-#define CONTROL_STOP 'S' /* stop the select call */
-#define CONTROL_SOCKETS(sink) sink->control_sock
-#define WRITE_SOCKET(sink) sink->control_sock[1]
-#define READ_SOCKET(sink) sink->control_sock[0]
-
-#define SEND_COMMAND(sink, command) \
-G_STMT_START { \
- unsigned char c; c = command; \
- write (WRITE_SOCKET(sink), &c, 1); \
-} G_STMT_END
-
-#define READ_COMMAND(sink, command, res) \
-G_STMT_START { \
- res = read(READ_SOCKET(sink), &command, 1); \
-} G_STMT_END
-
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
gst_fd_sink_render (GstBaseSink * sink, GstBuffer * buffer)
{
GstFdSink *fdsink;
+ guint8 *data;
+ guint size;
+ gint written;
#ifndef HAVE_WIN32
- fd_set readfds;
- fd_set writefds;
gint retval;
#endif
- guint8 *data;
- guint size;
- gint written;
fdsink = GST_FD_SINK (sink);
again:
#ifndef HAVE_WIN32
-
- FD_ZERO (&readfds);
- FD_SET (READ_SOCKET (fdsink), &readfds);
-
- FD_ZERO (&writefds);
- FD_SET (fdsink->fd, &writefds);
-
do {
GST_DEBUG_OBJECT (fdsink, "going into select, have %d bytes to write",
size);
- retval = select (FD_SETSIZE, &readfds, &writefds, NULL, NULL);
- } while ((retval == -1 && errno == EINTR));
-
- if (retval == -1)
- goto select_error;
-
- if (FD_ISSET (READ_SOCKET (fdsink), &readfds))
- goto stopped;
+ retval = gst_poll_wait (fdsink->fdset, GST_CLOCK_TIME_NONE);
+ } while (retval == -1 && errno == EINTR);
+
+ if (retval == -1) {
+ if (errno == EBUSY)
+ goto stopped;
+ else
+ goto select_error;
+ }
#endif
GST_DEBUG_OBJECT (fdsink, "writing %d bytes to file descriptor %d", size,
gst_fd_sink_start (GstBaseSink * basesink)
{
GstFdSink *fdsink;
- gint control_sock[2];
+ GstPollFD fd;
fdsink = GST_FD_SINK (basesink);
if (!gst_fd_sink_check_fd (fdsink, fdsink->fd))
return FALSE;
- if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
+ if ((fdsink->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL)
goto socket_pair;
- READ_SOCKET (fdsink) = control_sock[0];
- WRITE_SOCKET (fdsink) = control_sock[1];
-
- fcntl (READ_SOCKET (fdsink), F_SETFL, O_NONBLOCK);
- fcntl (WRITE_SOCKET (fdsink), F_SETFL, O_NONBLOCK);
+ fd.fd = fdsink->fd;
+ gst_poll_add_fd (fdsink->fdset, &fd);
+ gst_poll_fd_ctl_write (fdsink->fdset, &fd, TRUE);
return TRUE;
{
GstFdSink *fdsink = GST_FD_SINK (basesink);
- close (READ_SOCKET (fdsink));
- close (WRITE_SOCKET (fdsink));
+ if (fdsink->fdset) {
+ gst_poll_free (fdsink->fdset);
+ fdsink->fdset = NULL;
+ }
return TRUE;
}
{
GstFdSink *fdsink = GST_FD_SINK (basesink);
- GST_LOG_OBJECT (fdsink, "Sending unlock command to queue");
- SEND_COMMAND (fdsink, CONTROL_STOP);
+ GST_LOG_OBJECT (fdsink, "Flushing");
+ GST_OBJECT_LOCK (fdsink);
+ gst_poll_set_flushing (fdsink->fdset, TRUE);
+ GST_OBJECT_UNLOCK (fdsink);
return TRUE;
}
{
GstFdSink *fdsink = GST_FD_SINK (basesink);
- /* read all stop commands */
- GST_LOG_OBJECT (fdsink, "Clearing unlock command queue");
-
- while (TRUE) {
- gchar command;
- int res;
-
- READ_COMMAND (fdsink, command, res);
- if (res < 0) {
- GST_LOG_OBJECT (fdsink, "no more commands");
- /* no more commands */
- break;
- }
- }
+ GST_LOG_OBJECT (fdsink, "No longer flushing");
+ GST_OBJECT_LOCK (fdsink);
+ gst_poll_set_flushing (fdsink->fdset, FALSE);
+ GST_OBJECT_UNLOCK (fdsink);
return TRUE;
}
/* assign the fd */
GST_OBJECT_LOCK (fdsink);
+ if (fdsink->fdset) {
+ GstPollFD fd = { 0 };
+
+ fd.fd = fdsink->fd;
+ gst_poll_remove_fd (fdsink->fdset, &fd);
+
+ fd.fd = new_fd;
+ gst_poll_add_fd (fdsink->fdset, &fd);
+ gst_poll_fd_ctl_write (fdsink->fdset, &fd, TRUE);
+ }
fdsink->fd = new_fd;
g_free (fdsink->uri);
fdsink->uri = g_strdup_printf ("fd://%d", fdsink->fd);
+
GST_OBJECT_UNLOCK (fdsink);
return TRUE;
gchar *uri;
- gint control_sock[2];
+ GstPoll *fdset;
int fd;
guint64 bytes_written;
#include "gstfdsrc.h"
-/* the select call is also performed on the control sockets, that way
- * we can send special commands to unblock the select call */
-#define CONTROL_STOP 'S' /* stop the select call */
-#define CONTROL_SOCKETS(src) src->control_sock
-#define WRITE_SOCKET(src) src->control_sock[1]
-#define READ_SOCKET(src) src->control_sock[0]
-
-#define SEND_COMMAND(src, command) \
-G_STMT_START { \
- unsigned char c; c = command; \
- write (WRITE_SOCKET(src), &c, 1); \
-} G_STMT_END
-
-#define READ_COMMAND(src, command, res) \
-G_STMT_START { \
- res = read(READ_SOCKET(src), &command, 1); \
-} G_STMT_END
-
#define DEFAULT_BLOCKSIZE 4096
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
{
struct stat stat_results;
+ /* we need to always update the fdset since it may not have existed when
+ * gst_fd_src_update_fd() was called earlier */
+ if (src->fdset != NULL) {
+ GstPollFD fd;
+
+ if (src->fd >= 0) {
+ fd.fd = src->fd;
+ gst_poll_remove_fd (src->fdset, &fd);
+ }
+
+ fd.fd = src->new_fd;
+ gst_poll_add_fd (src->fdset, &fd);
+ gst_poll_fd_ctl_read (src->fdset, &fd, TRUE);
+ }
+
if (src->fd != src->new_fd) {
GST_INFO_OBJECT (src, "Updating to fd %d", src->new_fd);
gst_fd_src_start (GstBaseSrc * bsrc)
{
GstFdSrc *src = GST_FD_SRC (bsrc);
- gint control_sock[2];
src->curoffset = 0;
- gst_fd_src_update_fd (src);
-
- if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
+ if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL)
goto socket_pair;
- READ_SOCKET (src) = control_sock[0];
- WRITE_SOCKET (src) = control_sock[1];
-
- fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
- fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
+ gst_fd_src_update_fd (src);
return TRUE;
{
GstFdSrc *src = GST_FD_SRC (bsrc);
- close (READ_SOCKET (src));
- close (WRITE_SOCKET (src));
+ if (src->fdset) {
+ gst_poll_free (src->fdset);
+ src->fdset = NULL;
+ }
return TRUE;
}
{
GstFdSrc *src = GST_FD_SRC (bsrc);
- GST_LOG_OBJECT (src, "sending unlock command");
- SEND_COMMAND (src, CONTROL_STOP);
+ GST_LOG_OBJECT (src, "Flushing");
+ GST_OBJECT_LOCK (src);
+ gst_poll_set_flushing (src->fdset, TRUE);
+ GST_OBJECT_UNLOCK (src);
return TRUE;
}
{
GstFdSrc *src = GST_FD_SRC (bsrc);
- GST_LOG_OBJECT (src, "clearing unlock command queue");
-
- /* read all stop commands */
- while (TRUE) {
- gchar command;
- int res;
-
- GST_LOG_OBJECT (src, "reading command");
-
- READ_COMMAND (src, command, res);
- if (res < 0) {
- GST_LOG_OBJECT (src, "no more commands");
- /* no more commands */
- break;
- }
- }
+ GST_LOG_OBJECT (src, "No longer flushing");
+ GST_OBJECT_LOCK (src);
+ gst_poll_set_flushing (src->fdset, FALSE);
+ GST_OBJECT_UNLOCK (src);
return TRUE;
}
guint blocksize;
#ifndef HAVE_WIN32
- fd_set readfds;
gint retval;
#endif
src = GST_FD_SRC (psrc);
#ifndef HAVE_WIN32
- FD_ZERO (&readfds);
- FD_SET (src->fd, &readfds);
- FD_SET (READ_SOCKET (src), &readfds);
-
do {
- retval = select (FD_SETSIZE, &readfds, NULL, NULL, NULL);
- } while ((retval == -1 && errno == EINTR));
-
- if (retval == -1)
- goto select_error;
-
- if (FD_ISSET (READ_SOCKET (src), &readfds))
- goto stopped;
+ retval = gst_poll_wait (src->fdset, GST_CLOCK_TIME_NONE);
+ } while (retval == -1 && errno == EINTR);
+
+ if (retval == -1) {
+ if (errno == EBUSY)
+ goto stopped;
+ else
+ goto select_error;
+ }
#endif
blocksize = GST_BASE_SRC (src)->blocksize;
}
g_free (protocol);
- if (sscanf (uri, "fd://%d", &fd) != 1)
+ if (sscanf (uri, "fd://%d", &fd) != 1 || fd < 0)
return FALSE;
src->new_fd = fd;
gchar *uri;
- gint control_sock[2];
+ GstPoll *fdset;
gulong curoffset; /* current offset in file */
};