2004-08-18 Wim Taymans <wim@fluendo.com>
+ * gst/tcp/gstfdset.c: (gst_fdset_free), (gst_fdset_set_mode),
+ (gst_fdset_get_mode), (gst_fdset_add_fd), (gst_fdset_remove_fd),
+ (gst_fdset_fd_ctl_write), (gst_fdset_fd_ctl_read),
+ (gst_fdset_fd_has_closed), (gst_fdset_fd_has_error),
+ (gst_fdset_fd_can_read), (gst_fdset_fd_can_write),
+ (gst_fdset_wait):
+ * gst/tcp/gstfdset.h:
+ * gst/tcp/gstmultifdsink.c: (gst_multifdsink_add),
+ (gst_multifdsink_client_queue_buffer),
+ (gst_multifdsink_handle_client_write):
+ * gst/tcp/gstmultifdsink.h:
+ Some extra checks in gstfdset.
+ Only use send() when the fd is a socket. Don't try to
+ read from write only fds.
+
+2004-08-18 Wim Taymans <wim@fluendo.com>
+
* gst/tcp/gstfdset.c: (gst_fdset_add_fd), (gst_fdset_remove_fd),
(gst_fdset_fd_ctl_write), (gst_fdset_fd_ctl_read),
(gst_fdset_fd_has_closed), (gst_fdset_fd_has_error),
void
gst_fdset_free (GstFDSet * set)
{
+ g_return_if_fail (set != NULL);
+
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
break;
void
gst_fdset_set_mode (GstFDSet * set, GstFDSetMode mode)
{
+ g_return_if_fail (set != NULL);
+
g_warning ("implement me");
}
GstFDSetMode
gst_fdset_get_mode (GstFDSet * set)
{
+ g_return_val_if_fail (set != NULL, FALSE);
+
return set->mode;
}
-void
+gboolean
gst_fdset_add_fd (GstFDSet * set, GstFD * fd)
{
+ gboolean res = FALSE;
+
+ g_return_val_if_fail (set != NULL, FALSE);
+ g_return_val_if_fail (fd != NULL, FALSE);
+
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
- /* nothing */
+ res = TRUE;
break;
case GST_FDSET_MODE_POLL:
{
set->free = -1;
g_mutex_unlock (set->poll_lock);
+ res = TRUE;
break;
}
case GST_FDSET_MODE_EPOLL:
break;
}
+ return res;
}
-void
+gboolean
gst_fdset_remove_fd (GstFDSet * set, GstFD * fd)
{
+ gboolean res = FALSE;
+
+ g_return_val_if_fail (set != NULL, FALSE);
+ g_return_val_if_fail (fd != NULL, FALSE);
+
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
/* nothing */
FD_CLR (fd->fd, &set->writefds);
FD_CLR (fd->fd, &set->readfds);
+ res = TRUE;
break;
case GST_FDSET_MODE_POLL:
{
g_mutex_lock (set->poll_lock);
+ /* FIXME on some platforms poll doesn't ignore the fd
+ * when set to -1 */
set->pollfds[fd->idx].fd = -1;
set->pollfds[fd->idx].events = 0;
set->pollfds[fd->idx].revents = 0;
set->free = MIN (set->free, fd->idx);
}
g_mutex_unlock (set->poll_lock);
+ res = TRUE;
break;
}
case GST_FDSET_MODE_EPOLL:
break;
}
+ return res;
}
void
gst_fdset_fd_ctl_write (GstFDSet * set, GstFD * fd, gboolean active)
{
+ g_return_if_fail (set != NULL);
+ g_return_if_fail (fd != NULL);
+
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
if (active)
void
gst_fdset_fd_ctl_read (GstFDSet * set, GstFD * fd, gboolean active)
{
+ g_return_if_fail (set != NULL);
+ g_return_if_fail (fd != NULL);
+
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
if (active)
{
gboolean res = FALSE;
+ g_return_val_if_fail (set != NULL, FALSE);
+ g_return_val_if_fail (fd != NULL, FALSE);
+
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
res = FALSE;
{
gboolean res = FALSE;
+ g_return_val_if_fail (set != NULL, FALSE);
+ g_return_val_if_fail (fd != NULL, FALSE);
+
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
res = FALSE;
{
gboolean res = FALSE;
+ g_return_val_if_fail (set != NULL, FALSE);
+ g_return_val_if_fail (fd != NULL, FALSE);
+
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
res = FD_ISSET (fd->fd, &set->testreadfds);
{
gboolean res = FALSE;
+ g_return_val_if_fail (set != NULL, FALSE);
+ g_return_val_if_fail (fd != NULL, FALSE);
+
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
res = FD_ISSET (fd->fd, &set->testwritefds);
return res;
}
-int
+gint
gst_fdset_wait (GstFDSet * set, int timeout)
{
int res = -1;
+ g_return_val_if_fail (set != NULL, -1);
+
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
{
void gst_fdset_set_mode (GstFDSet *set, GstFDSetMode mode);
GstFDSetMode gst_fdset_get_mode (GstFDSet *set);
-void gst_fdset_add_fd (GstFDSet *set, GstFD *fd);
-void gst_fdset_remove_fd (GstFDSet *set, GstFD *fd);
+gboolean gst_fdset_add_fd (GstFDSet *set, GstFD *fd);
+gboolean gst_fdset_remove_fd (GstFDSet *set, GstFD *fd);
void gst_fdset_fd_ctl_write (GstFDSet *set, GstFD *fd, gboolean active);
void gst_fdset_fd_ctl_read (GstFDSet *set, GstFD *fd, gboolean active);
gboolean gst_fdset_fd_can_read (GstFDSet *set, GstFD *fd);
gboolean gst_fdset_fd_can_write (GstFDSet *set, GstFD *fd);
-int gst_fdset_wait (GstFDSet *set, int timeout);
+gint gst_fdset_wait (GstFDSet *set, int timeout);
G_END_DECLS
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
+#include <sys/stat.h>
#ifdef HAVE_FIONREAD_IN_SYS_FILIO
#include <sys/filio.h>
{
GstTCPClient *client;
GTimeVal now;
+ gint flags, res;
+ struct stat statbuf;
GST_DEBUG_OBJECT (sink, "adding client on fd %d", fd);
sink->clients = g_list_prepend (sink->clients, client);
/* set the socket to non blocking */
- fcntl (fd, F_SETFL, O_NONBLOCK);
+ res = fcntl (fd, F_SETFL, O_NONBLOCK);
/* we always read from a client */
gst_fdset_add_fd (sink->fdset, &client->fd);
- gst_fdset_fd_ctl_read (sink->fdset, &client->fd, TRUE);
+
+ /* we don't try to read from write only fds */
+ flags = fcntl (fd, F_GETFL, 0);
+ if ((flags & O_ACCMODE) != O_WRONLY) {
+ gst_fdset_fd_ctl_read (sink->fdset, &client->fd, TRUE);
+ }
+ /* figure out the mode, can't use send() for non sockets */
+ res = fstat (fd, &statbuf);
+ if (S_ISSOCK (statbuf.st_mode)) {
+ client->is_socket = TRUE;
+ }
SEND_COMMAND (sink, CONTROL_RESTART);
#else
#define FLAGS 0
#endif
- wrote =
- send (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize, FLAGS);
+ if (client->is_socket) {
+ wrote =
+ send (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize,
+ FLAGS);
+ } else {
+ wrote = write (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize);
+ }
+
if (wrote < 0) {
/* hmm error.. */
if (errno == EAGAIN) {
/* nothing serious, resource was unavailable, try again later */
more = FALSE;
} else {
- GST_WARNING_OBJECT (sink, "could not write, removing client on fd %d",
- fd);
+ GST_WARNING_OBJECT (sink,
+ "could not write, removing client on fd %d: %s", fd,
+ g_strerror (errno));
client->status = GST_CLIENT_STATUS_ERROR;
return FALSE;
}
gint bufpos; /* position of this client in the global queue */
GstClientStatus status;
+ gboolean is_socket;
GSList *sending; /* the buffers we need to send */
gint bufoffset; /* offset in the first buffer */