static void gst_multifdsink_class_init (GstMultiFdSinkClass * klass);
static void gst_multifdsink_init (GstMultiFdSink * multifdsink);
-static void gst_multifdsink_client_remove (GstMultiFdSink * sink,
- GstTCPClient * client);
+static void gst_multifdsink_remove_client_link (GstMultiFdSink * sink,
+ GList * link);
static void gst_multifdsink_chain (GstPad * pad, GstData * _data);
static GstElementStateReturn gst_multifdsink_change_state (GstElement *
/* create client datastructure */
client = g_new0 (GstTCPClient, 1);
client->fd = fd;
- client->bad = FALSE;
+ client->status = GST_CLIENT_STATUS_OK;
client->bufpos = -1;
client->bufoffset = 0;
client->sending = NULL;
void
gst_multifdsink_remove (GstMultiFdSink * sink, int fd)
{
- GList *clients;
+ GList *clients, *next;
GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
g_mutex_lock (sink->clientslock);
/* loop over the clients to find the one with the fd */
- for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+ for (clients = sink->clients; clients; clients = next) {
GstTCPClient *client;
client = (GstTCPClient *) clients->data;
+ next = g_list_next (clients);
if (client->fd == fd) {
- gst_multifdsink_client_remove (sink, client);
+ client->status = GST_CLIENT_STATUS_REMOVED;
+ gst_multifdsink_remove_client_link (sink, clients);
break;
}
}
void
gst_multifdsink_clear (GstMultiFdSink * sink)
{
+ GList *clients, *next;
+
GST_DEBUG_OBJECT (sink, "clearing all clients");
g_mutex_lock (sink->clientslock);
- while (sink->clients) {
+ for (clients = sink->clients; clients; clients = next) {
GstTCPClient *client;
- client = (GstTCPClient *) sink->clients->data;
- gst_multifdsink_client_remove (sink, client);
+ client = (GstTCPClient *) clients->data;
+ next = g_list_next (clients);
+
+ client->status = GST_CLIENT_STATUS_REMOVED;
+ gst_multifdsink_remove_client_link (sink, clients);
}
g_mutex_unlock (sink->clientslock);
}
/* should be called with the clientslock held */
static void
-gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client)
+gst_multifdsink_remove_client_link (GstMultiFdSink * sink, GList * link)
{
- int fd = client->fd;
+ int fd;
GTimeVal now;
+ GstTCPClient *client = (GstTCPClient *) link->data;
+
+ fd = client->fd;
/* FIXME: if we keep track of ip we can log it here and signal */
GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
+
+ switch (client->status) {
+ case GST_CLIENT_STATUS_OK:
+ GST_WARNING_OBJECT (sink, "removing client %p with fd %d for no reason",
+ client, client->fd);
+ break;
+ case GST_CLIENT_STATUS_CLOSED:
+ GST_DEBUG_OBJECT (sink, "removing client %p with fd %d because of close",
+ client, client->fd);
+ break;
+ case GST_CLIENT_STATUS_REMOVED:
+ GST_DEBUG_OBJECT (sink,
+ "removing client %p with fd %d because the app removed it", client,
+ client->fd);
+ break;
+ case GST_CLIENT_STATUS_SLOW:
+ GST_INFO_OBJECT (sink,
+ "removing client %p with fd %d because it was too slow", client,
+ client->fd);
+ break;
+ case GST_CLIENT_STATUS_ERROR:
+ GST_WARNING_OBJECT (sink,
+ "removing client %p with fd %d because of error", client, client->fd);
+ break;
+ default:
+ GST_WARNING_OBJECT (sink,
+ "removing client %p with fd %d with invalid reason", client,
+ client->fd);
+ break;
+ }
+
FD_CLR (fd, &sink->readfds);
FD_CLR (fd, &sink->writefds);
if (close (fd) != 0) {
+ /* this is not really an error */
GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno));
}
SEND_COMMAND (sink, CONTROL_RESTART);
g_get_current_time (&now);
client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
+ /* unlock the mutex before signaling because the signal handler
+ * might query some properties */
g_mutex_unlock (sink->clientslock);
g_signal_emit (G_OBJECT (sink),
gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd);
+ /* lock again before we remove the client completely */
g_mutex_lock (sink->clientslock);
- sink->clients = g_list_remove (sink->clients, client);
+ sink->clients = g_list_delete_link (sink->clients, link);
g_free (client);
}
/* handle a read on a client fd,
* which either indicates a close or should be ignored
- * returns FALSE if the client has been closed. */
+ * returns FALSE if some error occured or the client closed. */
static gboolean
gst_multifdsink_handle_client_read (GstMultiFdSink * sink,
GstTCPClient * client)
fd = client->fd;
if (ioctl (fd, FIONREAD, &avail) < 0) {
- GST_WARNING_OBJECT (sink, "ioctl failed for fd %d", fd);
+ GST_WARNING_OBJECT (sink, "ioctl failed for fd %d: %s",
+ fd, g_strerror (errno));
+ client->status = GST_CLIENT_STATUS_ERROR;
ret = FALSE;
return ret;
}
if (avail == 0) {
/* client sent close, so remove it */
GST_DEBUG_OBJECT (sink, "client asked for close, removing on fd %d", fd);
+ client->status = GST_CLIENT_STATUS_CLOSED;
ret = FALSE;
} else if (avail < 0) {
GST_WARNING_OBJECT (sink, "avail < 0, removing on fd %d", fd);
+ client->status = GST_CLIENT_STATUS_ERROR;
ret = FALSE;
} else {
guint8 dummy[512];
if (nread < -1) {
GST_WARNING_OBJECT (sink, "could not read bytes from fd %d: %s",
fd, g_strerror (errno));
+ client->status = GST_CLIENT_STATUS_ERROR;
ret = FALSE;
break;
} else if (nread == 0) {
GST_WARNING_OBJECT (sink, "fd %d: gave 0 bytes in read, removing", fd);
+ client->status = GST_CLIENT_STATUS_ERROR;
ret = FALSE;
break;
}
GST_LOG_OBJECT (sink, "Queueing data of length %d for fd %d",
len, client->fd);
- client->sending = g_list_append (client->sending, buf);
+ client->sending = g_slist_append (client->sending, buf);
return TRUE;
}
}
gst_buffer_ref (buffer);
- client->sending = g_list_append (client->sending, buffer);
+ client->sending = g_slist_append (client->sending, buffer);
return TRUE;
}
*
* When the sending returns a partial buffer we stop sending more data as the next send
* operation could block.
+ *
+ * This functions returns FALSE if some error occured.
*/
static gboolean
gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
* yet, send them out one by one */
if (!client->streamheader_sent) {
if (sink->streamheader) {
- GList *l;
+ GSList *l;
for (l = sink->streamheader; l; l = l->next) {
/* queue stream headers for sending */
} else {
GST_DEBUG_OBJECT (sink, "could not write, removing client on fd %d",
fd);
+ client->status = GST_CLIENT_STATUS_ERROR;
return FALSE;
}
} else {
more = FALSE;
} else {
/* complete buffer was written, we can proceed to the next one */
- client->sending = g_list_remove (client->sending, head);
+ client->sending = g_slist_remove (client->sending, head);
gst_buffer_unref (head);
/* make sure we start from byte 0 for the next buffer */
client->bufoffset = 0;
static void
gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
{
- GList *clients;
+ GList *clients, *next;
gint queuelen;
- GList *slow = NULL;
gboolean need_signal = FALSE;
gint max_buffer_usage;
gint i;
/* then loop over the clients and update the positions */
max_buffer_usage = 0;
- for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+ for (clients = sink->clients; clients; clients = next) {
GstTCPClient *client;
client = (GstTCPClient *) clients->data;
+ next = g_list_next (clients);
client->bufpos++;
GST_LOG_OBJECT (sink, "client %p with fd %d at position %d",
client, client->fd);
FD_CLR (client->fd, &sink->readfds);
FD_CLR (client->fd, &sink->writefds);
- slow = g_list_prepend (slow, client);
+ client->status = GST_CLIENT_STATUS_SLOW;
+ gst_multifdsink_remove_client_link (sink, clients);
/* cannot send data to this client anymore. need to signal the select thread that
* the fd_set changed */
need_signal = TRUE;
max_buffer_usage = client->bufpos;
}
}
- /* remove crap clients */
- for (clients = slow; clients; clients = g_list_next (clients)) {
- GstTCPClient *client;
-
- client = (GstTCPClient *) clients->data;
-
- gst_multifdsink_client_remove (sink, client);
- }
- g_list_free (slow);
/* nobody is referencing buffers after max_buffer_usage so we can
* remove them from the queue */
for (i = queuelen - 1; i > max_buffer_usage; i--) {
{
int result;
fd_set testreadfds, testwritefds;
- GList *clients, *closed = NULL;
+ GList *clients, *next;
gboolean try_again;
GstMultiFdSinkClass *fclass;
GST_WARNING_OBJECT (sink, "select failed: %s", g_strerror (errno));
if (errno == EBADF) {
/* ok, so one of the fds is invalid. We loop over them to find one
- * that gives an error to the F_GETFL fcntl.
- */
+ * that gives an error to the F_GETFL fcntl. */
g_mutex_lock (sink->clientslock);
- for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+ for (clients = sink->clients; clients; clients = next) {
GstTCPClient *client;
int fd;
long flags;
int res;
client = (GstTCPClient *) clients->data;
+ next = g_list_next (clients);
+
fd = client->fd;
res = fcntl (fd, F_GETFL, &flags);
if (res == -1) {
- GST_WARNING_OBJECT (sink, "fnctl failed for %d, marking as bad: %s",
+ GST_WARNING_OBJECT (sink, "fnctl failed for %d, removing: %s",
fd, g_strerror (errno));
if (errno == EBADF) {
- client->bad = TRUE;
+ client->status = GST_CLIENT_STATUS_ERROR;
+ gst_multifdsink_remove_client_link (sink, clients);
}
}
}
g_mutex_unlock (sink->clientslock);
+ /* after this, go back in the select loop as the read/writefds
+ * are not valid */
+ try_again = TRUE;
} else if (errno == EINTR) {
+ /* interrupted system call, just redo the select */
try_again = TRUE;
} else {
GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
/* Check the reads */
g_mutex_lock (sink->clientslock);
- for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+ for (clients = sink->clients; clients; clients = next) {
GstTCPClient *client;
int fd;
client = (GstTCPClient *) clients->data;
- if (client->bad) {
- closed = g_list_prepend (closed, client);
+ next = g_list_next (clients);
+
+ if (client->status != GST_CLIENT_STATUS_OK) {
+ gst_multifdsink_remove_client_link (sink, clients);
continue;
}
if (FD_ISSET (fd, &testreadfds)) {
/* handle client read */
if (!gst_multifdsink_handle_client_read (sink, client)) {
- closed = g_list_prepend (closed, client);
+ gst_multifdsink_remove_client_link (sink, clients);
continue;
}
}
if (FD_ISSET (fd, &testwritefds)) {
/* handle client write */
if (!gst_multifdsink_handle_client_write (sink, client)) {
- closed = g_list_prepend (closed, client);
+ gst_multifdsink_remove_client_link (sink, clients);
continue;
}
}
}
- /* remove crappy clients */
- for (clients = closed; clients; clients = g_list_next (clients)) {
- GstTCPClient *client;
-
- client = (GstTCPClient *) clients->data;
-
- GST_DEBUG_OBJECT (sink, "removing client %p with fd %d because of close",
- client, client->fd);
- gst_multifdsink_client_remove (sink, client);
- }
- g_list_free (closed);
g_mutex_unlock (sink->clientslock);
}
GST_DEBUG_OBJECT (sink,
"appending IN_CAPS buffer with length %d to streamheader",
GST_BUFFER_SIZE (buf));
- sink->streamheader = g_list_append (sink->streamheader, buf);
+ sink->streamheader = g_slist_append (sink->streamheader, buf);
return;
}
close (WRITE_SOCKET (this));
if (this->streamheader) {
- GList *l;
+ GSList *l;
for (l = this->streamheader; l; l = l->next) {
gst_buffer_unref (l->data);
}
- g_list_free (this->streamheader);
+ g_slist_free (this->streamheader);
}
if (fclass->close)