#define TCP_DEFAULT_PORT 4953
#define TCP_BACKLOG 5
+#define CONTROL_RESTART 'R' /* restart the select call */
+#define CONTROL_STOP 'S' /* stop the select call */
+#define SEND_COMMAND(sink, command) \
+G_STMT_START { \
+ unsigned char c; c = command; \
+ write (sink->control_sock[1], &c, 1); \
+} G_STMT_END
+
+#define READ_COMMAND(sink, command) \
+G_STMT_START { \
+ read(sink->control_sock[0], &command, 1); \
+} G_STMT_END
+
/* elementfactory information */
static GstElementDetails gst_tcpserversink_details =
GST_ELEMENT_DETAILS ("TCP Server sink",
GST_DEBUG_CATEGORY (tcpserversink_debug);
#define GST_CAT_DEFAULT (tcpserversink_debug)
+typedef struct
+{
+ int fd;
+ int bufpos; /* position of this client in the global queue */
+
+ GList *sending; /* the buffers we need to send */
+ int bufoffset; /* offset in the first buffer */
+
+ gboolean caps_sent;
+ gboolean streamheader_sent;
+}
+GstTCPClient;
+
/* TCPServerSink signals and args */
enum
{
LAST_SIGNAL
};
+#define DEFAULT_BUFFERS_MAX 25
+#define DEFAULT_BUFFERS_SOFT_MAX 20
+
enum
{
ARG_0,
ARG_HOST,
ARG_PORT,
- ARG_PROTOCOL
+ ARG_PROTOCOL,
+ ARG_BUFFERS_MAX,
+ ARG_BUFFERS_SOFT_MAX,
};
static void gst_tcpserversink_base_init (gpointer g_class);
g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE,
G_PARAM_READWRITE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX,
+ g_param_spec_int ("buffers-max", "Buffers max",
+ "max number of buffers to queue (0 = no limit)", 0, G_MAXINT,
+ DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_SOFT_MAX,
+ g_param_spec_int ("buffers-soft-max", "Buffers soft max",
+ "Recover client when going over this limit (0 = no limit)", 0,
+ G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE));
gst_tcpserversink_signals[SIGNAL_CLIENT_ADDED] =
g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
this->protocol = GST_TCP_PROTOCOL_TYPE_NONE;
this->clock = NULL;
+
+ this->clients = NULL;
+
+ this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
+ this->queuelock = g_mutex_new ();
+ this->queuecond = g_cond_new ();
+ this->buffers_max = DEFAULT_BUFFERS_MAX;
+ this->buffers_soft_max = DEFAULT_BUFFERS_SOFT_MAX;
+
+ this->clientslock = g_mutex_new ();
+
}
static void
int client_sock_fd;
struct sockaddr_in client_address;
int client_address_len;
+ GstTCPClient *client;
client_sock_fd =
accept (sink->server_sock_fd, (struct sockaddr *) &client_address,
("Could not accept client on server socket: %s", g_strerror (errno)));
return FALSE;
}
- FD_SET (client_sock_fd, &(sink->clientfds));
+
+ /* create client datastructure */
+ client = g_new0 (GstTCPClient, 1);
+ client->fd = client_sock_fd;
+ client->bufpos = -1;
+ client->bufoffset = 0;
+ client->sending = NULL;
+
+ g_mutex_lock (sink->clientslock);
+ sink->clients = g_list_prepend (sink->clients, client);
+ g_mutex_unlock (sink->clientslock);
+
+ /* we always read from a client */
+ FD_SET (client_sock_fd, &sink->readfds);
+
+ /* set the socket to non blocking */
+ fcntl (client_sock_fd, F_SETFL, O_NONBLOCK);
+
GST_DEBUG_OBJECT (sink, "added new client ip %s with fd %d",
inet_ntoa (client_address.sin_addr), client_sock_fd);
g_signal_emit (G_OBJECT (sink),
}
static void
-gst_tcpserversink_client_remove (GstTCPServerSink * sink, int fd)
+gst_tcpserversink_client_remove (GstTCPServerSink * sink, GstTCPClient * client)
{
+ int fd = client->fd;
+
/* FIXME: if we keep track of ip we can log it here and signal */
GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
if (close (fd) != 0) {
GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno));
}
- FD_CLR (fd, &sink->clientfds);
- FD_CLR (fd, &sink->caps_sent);
- FD_CLR (fd, &sink->streamheader_sent);
+ FD_CLR (fd, &sink->readfds);
+ FD_CLR (fd, &sink->writefds);
+
+ sink->clients = g_list_remove (sink->clients, client);
+
+ g_free (client);
+
g_signal_emit (G_OBJECT (sink),
gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd);
}
* which either indicates a close or should be ignored
* returns FALSE if the client has been closed. */
static gboolean
-gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd)
+gst_tcpserversink_handle_client_read (GstTCPServerSink * sink,
+ GstTCPClient * client)
{
- int nread;
+ int nread, fd;
+
+ fd = client->fd;
GST_LOG_OBJECT (sink, "select reports client read on fd %d", fd);
if (nread == 0) {
/* client sent close, so remove it */
GST_DEBUG_OBJECT (sink, "client asked for close, removing on fd %d", fd);
- gst_tcpserversink_client_remove (sink, fd);
return FALSE;
} else {
/* FIXME: we should probably just Read 'n' Drop */
return TRUE;
}
-/* Write a buffer to the given fd for the given element using the given
- * protocol.
- * Return number of buffer bytes written.
- */
-static gint
-gst_tcp_buffer_write (GstBuffer * buf, int fd, GstElement * element,
- GstTCPProtocolType protocol)
+static gboolean
+gst_tcpserversink_client_queue_data (GstTCPServerSink * sink,
+ GstTCPClient * client, gchar * data, gint len)
{
- gint wrote = 0;
+ GstBuffer *buf;
- /* write the buffer header if we have one */
- switch (protocol) {
- case GST_TCP_PROTOCOL_TYPE_NONE:
- break;
+ buf = gst_buffer_new ();
+ GST_BUFFER_DATA (buf) = data;
+ GST_BUFFER_SIZE (buf) = len;
- case GST_TCP_PROTOCOL_TYPE_GDP:
- GST_LOG_OBJECT (element, "Sending buffer header through GDP");
- if (!gst_tcp_gdp_write_header (element, fd, buf, FALSE, "unknown", 0))
- return 0;
- break;
- default:
- g_warning ("Unhandled protocol type");
- break;
+ GST_DEBUG_OBJECT (sink, "Queueing data of length %d for fd %d",
+ len, client->fd);
+ client->sending = g_list_append (client->sending, buf);
+
+ return TRUE;
+}
+
+static gboolean
+gst_tcpserversink_client_queue_caps (GstTCPServerSink * sink,
+ GstTCPClient * client, const GstCaps * caps)
+{
+ guint8 *header;
+ guint8 *payload;
+ guint length;
+ gchar *string;
+
+ string = gst_caps_to_string (caps);
+ GST_DEBUG_OBJECT (sink, "Queueing caps %s for fd %d through GDP", string,
+ client->fd);
+ g_free (string);
+
+ if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) {
+ GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps");
+ return FALSE;
}
+ gst_tcpserversink_client_queue_data (sink, client, header, length);
- /* serve data to client */
- GST_LOG_OBJECT (element, "serving data buffer of size %d to client on fd %d",
- GST_BUFFER_SIZE (buf), fd);
-
- wrote =
- gst_tcp_socket_write (fd, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf));
-
- if (wrote < GST_BUFFER_SIZE (buf)) {
-/* FIXME: keep track of client ip and port and so on */
-/*
- GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
- (_("Error while sending data to \"%s:%d\"."),
- sink->host, sink->port),
- ("Only %d of %d bytes written: %s",
- bytes_written, GST_BUFFER_SIZE (buf), g_strerror (errno)));
-*/
- /* FIXME: there should be a better way to report problems, since we
- want to continue for other clients and just drop this particular one */
- GST_DEBUG_OBJECT (element, "Write failed: %d of %d bytes written", wrote,
- GST_BUFFER_SIZE (buf));
+ length = gst_dp_header_payload_length (header);
+ gst_tcpserversink_client_queue_data (sink, client, payload, length);
+
+ return TRUE;
+}
+
+static gboolean
+gst_tcpserversink_client_queue_buffer (GstTCPServerSink * sink,
+ GstTCPClient * client, GstBuffer * buffer)
+{
+ if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
+ guint8 *header;
+ guint len;
+
+ if (!gst_dp_header_from_buffer (buffer, 0, &len, &header)) {
+ GST_DEBUG_OBJECT (sink,
+ "could not create header, removing client on fd %d", client->fd);
+ return FALSE;
+ }
+ gst_tcpserversink_client_queue_data (sink, client, header, len);
}
- return wrote;
+ gst_buffer_ref (buffer);
+ client->sending = g_list_append (client->sending, buffer);
+
+ return TRUE;
}
-/* handle a write on a client fd,
- * which indicates a read request from a client */
+
+/* handle a write on a client,
+ * which indicates a read request from a client.
+ *
+ * The strategy is as follows, for each client we maintain a queue of GstBuffers
+ * that contain the raw bytes we need to send to the client. In the case of the
+ * GDP protocol, we create buffers out of the header bytes so that we can only focus
+ * on sending buffers.
+ *
+ * We first check to see if we need to send caps (in GDP) and streamheaders. If so,
+ * we queue them.
+ *
+ * Then we run into the main loop that tries to send as many buffers as possible. It
+ * will first exhaust the client->sending queue and if the queue is empty, it will
+ * pick a buffer from the global queue.
+ *
+ * Sending the Buffers from the client->sending queue is basically writing the bytes
+ * to the socket and maintaining a count of the bytes that were sent. When the buffer
+ * is completely sent, it is removed from the client->sending queue and we try to pick
+ * a new buffer for sending.
+ *
+ * When the sending returns a partial buffer we stop sending more data as the next send
+ * operation could block.
+ */
static gboolean
-gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd,
- GstPad * pad, GstBuffer * buf)
+gst_tcpserversink_handle_client_write (GstTCPServerSink * sink,
+ GstTCPClient * client)
{
- gint wrote = 0;
+ int fd = client->fd;
+ gboolean more;
+ gboolean res;
- /* when using GDP, first check if we have sent caps yet */
+ /* when using GDP, first check if we have queued caps yet */
if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
- if (!FD_ISSET (fd, &(sink->caps_sent))) {
- const GstCaps *caps;
- gchar *string;
-
- caps = GST_PAD_CAPS (GST_PAD_PEER (pad));
- string = gst_caps_to_string (caps);
- GST_DEBUG_OBJECT (sink, "Sending caps %s for fd %d through GDP", string,
- fd);
- if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), fd, caps, FALSE,
- "unknown", 0)) {
- GST_DEBUG_OBJECT (sink, "Failed sending caps, removing client");
- gst_tcpserversink_client_remove (sink, fd);
- g_free (string);
+ if (!client->caps_sent) {
+ const GstCaps *caps = GST_PAD_CAPS (GST_PAD_PEER (sink->sinkpad));
+
+ /* queue caps for sending */
+ res = gst_tcpserversink_client_queue_caps (sink, client, caps);
+ if (!res) {
+ GST_DEBUG_OBJECT (sink, "Failed queueing caps, removing client");
return FALSE;
}
- g_free (string);
- FD_SET (fd, &(sink->caps_sent));
+ client->caps_sent = TRUE;
}
}
/* if we have streamheader buffers, and haven't sent them to this client
* yet, send them out one by one */
- if (!FD_ISSET (fd, &(sink->streamheader_sent))) {
+ if (!client->streamheader_sent) {
if (sink->streamheader) {
GList *l;
for (l = sink->streamheader; l; l = l->next) {
- wrote = gst_tcp_buffer_write (l->data, fd, GST_ELEMENT (sink),
- sink->protocol);
- if (wrote < GST_BUFFER_SIZE (l->data)) {
+ /* queue stream headers for sending */
+ res =
+ gst_tcpserversink_client_queue_buffer (sink, client,
+ GST_BUFFER (l->data));
+ if (!res) {
GST_DEBUG_OBJECT (sink,
- "Failed sending streamheader, removing client");
- gst_tcpserversink_client_remove (sink, fd);
+ "Failed queueing streamheader, removing client");
+ return FALSE;
}
}
}
- FD_SET (fd, &(sink->streamheader_sent));
+ client->streamheader_sent = TRUE;
}
- /* now we write the data buffer */
- wrote = gst_tcp_buffer_write (buf, fd, GST_ELEMENT (sink), sink->protocol);
- if (wrote < GST_BUFFER_SIZE (buf)) {
- gst_tcpserversink_client_remove (sink, fd);
- /* write failed, so drop the client */
- GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
- if (close (fd) != 0) {
- GST_DEBUG_OBJECT (sink, "error closing fd %d after failed write: %s",
- fd, g_strerror (errno));
+ more = TRUE;
+ do {
+ gint maxsize;
+
+ if (!client->sending) {
+ /* client is not working on a buffer */
+ if (client->bufpos == -1) {
+ /* client is too fast, remove from write queue until new buffer is
+ * available */
+ FD_CLR (fd, &sink->writefds);
+ return TRUE;
+ } else {
+ /* client can pick a buffer from the global queue */
+ GstBuffer *buf;
+
+ /* grab buffer and ref, we need to ref since it could be unreffed in
+ * another thread when we unlock the queuelock */
+ g_mutex_lock (sink->queuelock);
+ buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
+ client->bufpos--;
+ gst_buffer_ref (buf);
+ g_mutex_unlock (sink->queuelock);
+
+ gst_tcpserversink_client_queue_buffer (sink, client, buf);
+ /* it is safe to unref now as queueing a buffer will ref it */
+ gst_buffer_unref (buf);
+ /* need to start from the first byte for this new buffer */
+ client->bufoffset = 0;
+ }
}
- return FALSE;
- }
+
+ /* see if we need to send something */
+ if (client->sending) {
+ ssize_t wrote;
+ GstBuffer *head;
+
+ /* pick first buffer from list */
+ head = GST_BUFFER (client->sending->data);
+ maxsize = GST_BUFFER_SIZE (head) - client->bufoffset;
+
+ /* try to write the complete buffer */
+ wrote =
+ send (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize,
+ MSG_NOSIGNAL);
+ if (wrote < 0) {
+ /* hmm error.. */
+ if (errno == EAGAIN) {
+ /* nothing serious, resource was unavailable, try again later */
+ more = FALSE;
+ } else {
+ GST_DEBUG_OBJECT (sink, "could not write, removing client on fd %d",
+ fd);
+ return FALSE;
+ }
+ } else if (wrote < maxsize) {
+ /* partial write means that the client cannot read more and we should
+ * stop sending more */
+ GST_DEBUG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote);
+ client->bufoffset += wrote;
+ more = FALSE;
+ } else {
+ /* complete buffer was written, we can proceed to the next one */
+ client->sending = g_list_remove (client->sending, head);
+ gst_buffer_unref (head);
+ /* make sure we start from byte 0 for the next buffer */
+ client->bufoffset = 0;
+ }
+ }
+ } while (more);
+
return TRUE;
}
+/* Queue a buffer on the global queue.
+ *
+ * This functions adds the buffer to the front of a GArray. It removes the
+ * tail buffer if the max queue size is exceeded. Unreffing the buffer that
+ * is queued. Note that unreffing the buffer is not a problem as clients who
+ * started writing out this buffer will still have a reference to it in the
+ * client->sending queue.
+ *
+ * After adding the buffer, we update all client positions in the queue. If
+ * a client moves of the soft max, we start the recovery procedure for this
+ * slow client. If it goes over the hard max, it is put into the slow list
+ * and removed.
+ *
+ * Special care is taken of clients that were waiting for a new buffer (they
+ * had a position of -1) because they can proceed after adding this new buffer.
+ * This is done by adding the client back into the write fd_set and signalling
+ * the select thread that the fd_set changed.
+ *
+ */
static void
-gst_tcpserversink_chain (GstPad * pad, GstData * _data)
+gst_tcpserversink_queue_buffer (GstTCPServerSink * sink, GstBuffer * buf)
+{
+ GList *clients;
+ gint queuelen;
+ GList *slow = NULL;
+ gboolean need_signal = FALSE;
+
+ g_mutex_lock (sink->queuelock);
+ /* add buffer to queue */
+ g_array_prepend_val (sink->bufqueue, buf);
+ queuelen = sink->bufqueue->len;
+ if (queuelen > sink->buffers_max) {
+ GstBuffer *old;
+
+ /* queue exceeded max size */
+ queuelen--;
+ old = g_array_index (sink->bufqueue, GstBuffer *, queuelen);
+ sink->bufqueue = g_array_remove_index (sink->bufqueue, queuelen);
+
+ /* unref tail buffer */
+ gst_buffer_unref (old);
+ }
+ g_mutex_unlock (sink->queuelock);
+
+ /* then loop over the clients and update the positions */
+ g_mutex_lock (sink->clientslock);
+ for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+ GstTCPClient *client;
+
+ client = (GstTCPClient *) clients->data;
+
+ client->bufpos++;
+ GST_LOG_OBJECT (sink, "client %p with fd %d at position %d",
+ client, client->fd, client->bufpos);
+ if (client->bufpos >= sink->buffers_soft_max) {
+ if (client->bufpos == sink->buffers_soft_max) {
+ g_warning ("client %p with fd %d is lagging...", client, client->fd);
+ }
+ GST_LOG_OBJECT (sink, "client %p with fd %d is lagging",
+ client, client->fd);
+ }
+ if (client->bufpos >= queuelen) {
+ /* remove client */
+ GST_LOG_OBJECT (sink, "client %p with fd %d is too slow, removing",
+ client, client->fd);
+ g_warning ("client %p with fd %d too slow, removing", client, client->fd);
+ FD_CLR (client->fd, &sink->readfds);
+ FD_CLR (client->fd, &sink->writefds);
+ slow = g_list_prepend (slow, client);
+ /* cannot send data to this client anymore. need to signal the select thread that
+ * the fd_set changed */
+ need_signal = TRUE;
+ } else if (client->bufpos == 0) {
+ /* can send data to this client now. need to signal the select thread that
+ * the fd_set changed */
+ FD_SET (client->fd, &sink->writefds);
+ need_signal = TRUE;
+ }
+ }
+ /* remove crap clients */
+ for (clients = slow; clients; clients = g_list_next (clients)) {
+ GstTCPClient *client;
+
+ client = (GstTCPClient *) slow->data;
+
+ gst_tcpserversink_client_remove (sink, client);
+ }
+ g_list_free (slow);
+ g_mutex_unlock (sink->clientslock);
+
+ /* and send a signal to thread if fd_set changed */
+ if (need_signal) {
+ SEND_COMMAND (sink, CONTROL_RESTART);
+ }
+}
+
+/* Handle the clients. Basically does a blocking select for one
+ * of the client fds to become read or writable. We also have a
+ * filedescriptor to receive commands on that we need to check.
+ *
+ * After going out of the select call, we read and write to all
+ * clients that can do so. Badly behaving clients are put on a
+ * garbage list and removed.
+ */
+static void
+gst_tcpserversink_handle_clients (GstTCPServerSink * sink)
{
int result;
- int fd;
fd_set testreadfds, testwritefds;
- struct timeval timeout;
- struct timeval *timeoutp;
+ GList *clients, *error = NULL;
+ gboolean try_again;
+
+ do {
+ try_again = FALSE;
+
+ /* check for:
+ * - server socket input (ie, new client connections)
+ * - client socket input (ie, clients saying goodbye)
+ * - client socket output (ie, client reads) */
+ testwritefds = sink->writefds;
+ testreadfds = sink->readfds;
+ FD_SET (sink->server_sock_fd, &testreadfds);
+ FD_SET (sink->control_sock[0], &testreadfds);
+
+ GST_LOG_OBJECT (sink, "doing select on server + client fds for reads");
+ gst_tcpserversink_debug_fdset (sink, &testreadfds);
+ GST_LOG_OBJECT (sink, "doing select on client fds for writes");
+ gst_tcpserversink_debug_fdset (sink, &testwritefds);
+
+ result =
+ select (FD_SETSIZE, &testreadfds, &testwritefds, (fd_set *) 0, NULL);
+
+ /* < 0 is an error, 0 just means a timeout happened */
+ if (result < 0) {
+ GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
+ ("select failed: %s", g_strerror (errno)));
+ return;
+ }
+
+ GST_LOG_OBJECT (sink, "%d sockets had action", result);
+ GST_LOG_OBJECT (sink, "done select on server/client fds for reads");
+ gst_tcpserversink_debug_fdset (sink, &testreadfds);
+ GST_LOG_OBJECT (sink, "done select on client fds for writes");
+ gst_tcpserversink_debug_fdset (sink, &testwritefds);
+ if (FD_ISSET (sink->control_sock[0], &testreadfds)) {
+ gchar command;
+
+ READ_COMMAND (sink, command);
+
+ switch (command) {
+ case CONTROL_RESTART:
+ /* need to restart the select call as the fd_set changed */
+ try_again = TRUE;
+ break;
+ case CONTROL_STOP:
+ /* stop this function */
+ return;
+ default:
+ g_warning ("tcpserversink: unknown control message received");
+ break;
+ }
+ }
+ } while (try_again);
+
+ if (FD_ISSET (sink->server_sock_fd, &testreadfds)) {
+ /* handle new client connection on server socket */
+ if (!gst_tcpserversink_handle_server_read (sink)) {
+ GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
+ ("client connection failed: %s", g_strerror (errno)));
+ return;
+ }
+ }
+
+ /* Check the reads */
+ g_mutex_lock (sink->clientslock);
+ for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+ GstTCPClient *client;
+ int fd;
+
+ client = (GstTCPClient *) clients->data;
+ fd = client->fd;
+
+ if (FD_ISSET (fd, &testreadfds)) {
+ /* handle client read */
+ if (!gst_tcpserversink_handle_client_read (sink, client)) {
+ error = g_list_prepend (error, client);
+ continue;
+ }
+ }
+ if (FD_ISSET (fd, &testwritefds)) {
+ /* handle client write */
+ if (!gst_tcpserversink_handle_client_write (sink, client)) {
+ error = g_list_prepend (error, client);
+ continue;
+ }
+ }
+ }
+ /* remove crappy clients */
+ for (clients = error; clients; clients = g_list_next (clients)) {
+ GstTCPClient *client;
+
+ client = (GstTCPClient *) error->data;
+
+ GST_LOG_OBJECT (sink, "removing client %p with fd %d with errors", client,
+ client->fd);
+ gst_tcpserversink_client_remove (sink, client);
+ }
+ g_list_free (error);
+ g_mutex_unlock (sink->clientslock);
+}
+
+static gpointer
+gst_tcpserversink_thread (GstTCPServerSink * sink)
+{
+ while (sink->running) {
+ gst_tcpserversink_handle_clients (sink);
+ }
+ return NULL;
+}
+
+static void
+gst_tcpserversink_chain (GstPad * pad, GstData * _data)
+{
GstBuffer *buf = GST_BUFFER (_data);
GstTCPServerSink *sink;
sink->streamheader = g_list_append (sink->streamheader, buf);
return;
}
- /* if the incoming buffer has a duration, we can use that as the timeout
- * value; otherwise, we block */
- timeout.tv_sec = 0;
- timeout.tv_usec = 0;
- timeoutp = NULL;
- GST_LOG_OBJECT (sink, "incoming buffer duration: %" GST_TIME_FORMAT,
- GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
- if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DURATION (buf))) {
- GST_TIME_TO_TIMEVAL (GST_BUFFER_DURATION (buf), timeout);
- timeoutp = &timeout;
- GST_LOG_OBJECT (sink, "select will be with timeout %" GST_TIME_FORMAT,
- GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
- GST_LOG_OBJECT (sink, "select will be with timeout %d.%d",
- timeout.tv_sec, timeout.tv_usec);
- }
- /* check for:
- * - server socket input (ie, new client connections)
- * - client socket input (ie, clients saying goodbye)
- * - client socket output (ie, client reads) */
- testwritefds = sink->clientfds;
- testreadfds = sink->clientfds;
- FD_SET (sink->server_sock_fd, &testreadfds);
-
- GST_LOG_OBJECT (sink, "doing select on server + client fds for reads");
- gst_tcpserversink_debug_fdset (sink, &testreadfds);
- GST_LOG_OBJECT (sink, "doing select on client fds for writes");
- gst_tcpserversink_debug_fdset (sink, &testwritefds);
-
- result = select (FD_SETSIZE, &testreadfds, &testwritefds, (fd_set *) 0,
- timeoutp);
- /* < 0 is an error, 0 just means a timeout happened */
- if (result < 0) {
- GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
- ("select failed: %s", g_strerror (errno)));
- return;
- }
- GST_LOG_OBJECT (sink, "%d sockets had action", result);
- GST_LOG_OBJECT (sink, "done select on server/client fds for reads");
- gst_tcpserversink_debug_fdset (sink, &testreadfds);
- GST_LOG_OBJECT (sink, "done select on client fds for writes");
- gst_tcpserversink_debug_fdset (sink, &testwritefds);
- /* Check the reads */
- for (fd = 0; fd < FD_SETSIZE; fd++) {
- if (FD_ISSET (fd, &testreadfds)) {
- if (fd == sink->server_sock_fd) {
- /* handle new client connection on server socket */
- if (!gst_tcpserversink_handle_server_read (sink))
- return;
- } else {
- /* handle client read */
- if (!gst_tcpserversink_handle_client_read (sink, fd))
- return;
- }
- }
- }
+ /* queue the buffer */
+ gst_tcpserversink_queue_buffer (sink, buf);
- /* Check the writes */
- for (fd = 0; fd < FD_SETSIZE; fd++) {
- if (FD_ISSET (fd, &testwritefds)) {
- if (!gst_tcpserversink_handle_client_write (sink, fd, pad, buf)) {
- gst_buffer_unref (buf);
- return;
- }
- }
- }
sink->data_written += GST_BUFFER_SIZE (buf);
-
- gst_buffer_unref (buf);
-
}
static void
case ARG_PROTOCOL:
tcpserversink->protocol = g_value_get_enum (value);
break;
+ case ARG_BUFFERS_MAX:
+ tcpserversink->buffers_max = g_value_get_int (value);
+ break;
+ case ARG_BUFFERS_SOFT_MAX:
+ tcpserversink->buffers_soft_max = g_value_get_int (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
case ARG_PROTOCOL:
g_value_set_enum (value, tcpserversink->protocol);
break;
+ case ARG_BUFFERS_MAX:
+ g_value_set_int (value, tcpserversink->buffers_max);
+ break;
+ case ARG_BUFFERS_SOFT_MAX:
+ g_value_set_int (value, tcpserversink->buffers_soft_max);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
"listened on server socket %d, returning from connection setup",
this->server_sock_fd);
- FD_ZERO (&this->clientfds);
- FD_ZERO (&this->caps_sent);
- FD_ZERO (&this->streamheader_sent);
- FD_SET (this->server_sock_fd, &this->clientfds);
+ FD_ZERO (&this->readfds);
+ FD_ZERO (&this->writefds);
+ FD_SET (this->server_sock_fd, &this->readfds);
+
+ if (socketpair (PF_UNIX, SOCK_STREAM, 0, this->control_sock) < 0) {
+ perror ("creating socket pair");
+ }
+
+ this->running = TRUE;
+ this->thread = g_thread_create ((GThreadFunc) gst_tcpserversink_thread,
+ this, TRUE, NULL);
+
GST_FLAG_SET (this, GST_TCPSERVERSINK_OPEN);
this->streamheader = NULL;
static void
gst_tcpserversink_close (GstTCPServerSink * this)
{
+ this->running = FALSE;
+
+ SEND_COMMAND (this, CONTROL_STOP);
+
+ g_thread_join (this->thread);
+
+ close (this->control_sock[0]);
+ close (this->control_sock[1]);
+
if (this->server_sock_fd != -1) {
close (this->server_sock_fd);
this->server_sock_fd = -1;