From: Wim Taymans Date: Sat, 26 Jun 2004 16:49:42 +0000 (+0000) Subject: gst/tcp/: Added multifdsink, made tcpserversink a subclass of fdsink, removed one... X-Git-Tag: 1.19.3~511^2~14040 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=2c2b65c4b0011edd8534a66c0dad8dd75e69f773;p=platform%2Fupstream%2Fgstreamer.git gst/tcp/: Added multifdsink, made tcpserversink a subclass of fdsink, removed one of the locks, added recovery policy... Original commit message from CVS: * gst/tcp/Makefile.am: * gst/tcp/gstmultifdsink.c: (gst_recover_policy_get_type), (gst_multifdsink_get_type), (gst_multifdsink_base_init), (gst_multifdsink_class_init), (gst_multifdsink_init), (gst_multifdsink_debug_fdset), (gst_multifdsink_client_remove), (gst_multifdsink_handle_client_read), (gst_multifdsink_client_queue_data), (gst_multifdsink_client_queue_caps), (gst_multifdsink_client_queue_buffer), (gst_multifdsink_handle_client_write), (gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer), (gst_multifdsink_handle_clients), (gst_multifdsink_thread), (gst_multifdsink_chain), (gst_multifdsink_set_property), (gst_multifdsink_get_property), (gst_multifdsink_init_send), (gst_multifdsink_close), (gst_multifdsink_change_state): * gst/tcp/gstmultifdsink.h: * gst/tcp/gsttcpplugin.c: (plugin_init): * gst/tcp/gsttcpserversink.c: (gst_tcpserversink_get_type), (gst_tcpserversink_class_init), (gst_tcpserversink_init), (gst_tcpserversink_handle_server_read), (gst_tcpserversink_handle_select), (gst_tcpserversink_set_property), (gst_tcpserversink_get_property), (gst_tcpserversink_init_send), (gst_tcpserversink_close): * gst/tcp/gsttcpserversink.h: Added multifdsink, made tcpserversink a subclass of fdsink, removed one of the locks, added recovery policy to multifdsink. --- diff --git a/ChangeLog b/ChangeLog index 5b5453b..99f38ca 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,32 @@ +2004-06-26 Wim Taymans + + * gst/tcp/Makefile.am: + * gst/tcp/gstmultifdsink.c: (gst_recover_policy_get_type), + (gst_multifdsink_get_type), (gst_multifdsink_base_init), + (gst_multifdsink_class_init), (gst_multifdsink_init), + (gst_multifdsink_debug_fdset), (gst_multifdsink_client_remove), + (gst_multifdsink_handle_client_read), + (gst_multifdsink_client_queue_data), + (gst_multifdsink_client_queue_caps), + (gst_multifdsink_client_queue_buffer), + (gst_multifdsink_handle_client_write), + (gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer), + (gst_multifdsink_handle_clients), (gst_multifdsink_thread), + (gst_multifdsink_chain), (gst_multifdsink_set_property), + (gst_multifdsink_get_property), (gst_multifdsink_init_send), + (gst_multifdsink_close), (gst_multifdsink_change_state): + * gst/tcp/gstmultifdsink.h: + * gst/tcp/gsttcpplugin.c: (plugin_init): + * gst/tcp/gsttcpserversink.c: (gst_tcpserversink_get_type), + (gst_tcpserversink_class_init), (gst_tcpserversink_init), + (gst_tcpserversink_handle_server_read), + (gst_tcpserversink_handle_select), + (gst_tcpserversink_set_property), (gst_tcpserversink_get_property), + (gst_tcpserversink_init_send), (gst_tcpserversink_close): + * gst/tcp/gsttcpserversink.h: + Added multifdsink, made tcpserversink a subclass of fdsink, removed + one of the locks, added recovery policy to multifdsink. + 2004-06-26 Thomas Vander Stichele * gst/videorate/gstvideorate.c: (gst_videorate_chain): diff --git a/gst/tcp/Makefile.am b/gst/tcp/Makefile.am index 98b8ca7..dad81b8 100644 --- a/gst/tcp/Makefile.am +++ b/gst/tcp/Makefile.am @@ -16,6 +16,7 @@ libgsttcp_la_SOURCES = \ gsttcpplugin.c \ gsttcpsrc.c gsttcpsink.c \ gsttcp.c \ + gstmultifdsink.c \ gsttcpclientsrc.c gsttcpclientsink.c \ gsttcpserversrc.c gsttcpserversink.c @@ -31,6 +32,7 @@ noinst_HEADERS = \ gsttcpplugin.h \ gsttcpsrc.h gsttcpsink.h \ gsttcp.h \ + gstmultifdsink.h \ gsttcpclientsrc.h gsttcpclientsink.h \ gsttcpserversrc.h gsttcpserversink.h diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c new file mode 100644 index 0000000..f65803a --- /dev/null +++ b/gst/tcp/gstmultifdsink.c @@ -0,0 +1,941 @@ +/* GStreamer + * Copyright (C) <1999> Erik Walthinsen + * Copyright (C) <2004> Thomas Vander Stichele + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include + +#include + +#ifdef HAVE_FIONREAD_IN_SYS_FILIO +#include +#endif + +#include "gstmultifdsink.h" +#include "gsttcp-marshal.h" + +#define CONTROL_RESTART 'R' /* restart the select call */ +#define CONTROL_STOP 'S' /* stop the select call */ +#define CONTROL_SOCKETS(sink) sink->control_sock +#define WRITE_SOCKET(sink) sink->control_sock[1] +#define READ_SOCKET(sink) sink->control_sock[0] +#define SEND_COMMAND(sink, command) \ +G_STMT_START { \ + unsigned char c; c = command; \ + write (WRITE_SOCKET(sink), &c, 1); \ +} G_STMT_END + +#define READ_COMMAND(sink, command) \ +G_STMT_START { \ + read(READ_SOCKET(sink), &command, 1); \ +} G_STMT_END + +/* elementfactory information */ +static GstElementDetails gst_multifdsink_details = +GST_ELEMENT_DETAILS ("TCP Server sink", + "Sink/Network", + "Send data as a server over the network via TCP", + "Thomas Vander Stichele "); + +GST_DEBUG_CATEGORY (multifdsink_debug); +#define GST_CAT_DEFAULT (multifdsink_debug) + +/* MultiFdSink signals and args */ +enum +{ + SIGNAL_CLIENT_ADDED, + SIGNAL_CLIENT_REMOVED, + LAST_SIGNAL +}; + +/* this is really arbitrary choosen */ +#define DEFAULT_BUFFERS_MAX -1 +#define DEFAULT_BUFFERS_SOFT_MAX -1 + +enum +{ + ARG_0, + ARG_PROTOCOL, + ARG_BUFFERS_MAX, + ARG_BUFFERS_SOFT_MAX, + ARG_BUFFERS_QUEUED, + ARG_RECOVER_POLICY, +}; + +#define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type()) +static GType +gst_recover_policy_get_type (void) +{ + static GType recover_policy_type = 0; + static GEnumValue recover_policy[] = { + {GST_RECOVER_POLICY_NONE, "GST_RECOVER_POLICY_NONE", + "Do not try to recover"}, + {GST_RECOVER_POLICY_RESYNC_START, "GST_RECOVER_POLICY_RESYNC_START", + "Resync client to most recent buffer"}, + {GST_RECOVER_POLICY_RESYNC_SOFT, "GST_RECOVER_POLICY_RESYNC_SOFT", + "Resync client to soft limit"}, + {GST_RECOVER_POLICY_RESYNC_KEYFRAME, "GST_RECOVER_POLICY_RESYNC_KEYFRAME", + "Resync client to most recent keyframe"}, + {0, NULL, NULL}, + }; + + if (!recover_policy_type) { + recover_policy_type = + g_enum_register_static ("GstTCPRecoverPolicy", recover_policy); + } + return recover_policy_type; +} + +static void gst_multifdsink_base_init (gpointer g_class); +static void gst_multifdsink_class_init (GstMultiFdSink * klass); +static void gst_multifdsink_init (GstMultiFdSink * multifdsink); + +static void gst_multifdsink_chain (GstPad * pad, GstData * _data); +static GstElementStateReturn gst_multifdsink_change_state (GstElement * + element); + +static void gst_multifdsink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_multifdsink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + + +static GstElementClass *parent_class = NULL; + +static guint gst_multifdsink_signals[LAST_SIGNAL] = { 0 }; + +GType +gst_multifdsink_get_type (void) +{ + static GType multifdsink_type = 0; + + + if (!multifdsink_type) { + static const GTypeInfo multifdsink_info = { + sizeof (GstMultiFdSinkClass), + gst_multifdsink_base_init, + NULL, + (GClassInitFunc) gst_multifdsink_class_init, + NULL, + NULL, + sizeof (GstMultiFdSink), + 0, + (GInstanceInitFunc) gst_multifdsink_init, + NULL + }; + + multifdsink_type = + g_type_register_static (GST_TYPE_ELEMENT, "GstMultiFdSink", + &multifdsink_info, 0); + } + return multifdsink_type; +} + +static void +gst_multifdsink_base_init (gpointer g_class) +{ + GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); + + gst_element_class_set_details (element_class, &gst_multifdsink_details); +} + +static void +gst_multifdsink_class_init (GstMultiFdSink * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + + parent_class = g_type_class_ref (GST_TYPE_ELEMENT); + + g_object_class_install_property (gobject_class, ARG_PROTOCOL, + g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in", + GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE, + G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX, + g_param_spec_int ("buffers-max", "Buffers max", + "max number of buffers to queue (-1 = no limit)", -1, G_MAXINT, + DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_SOFT_MAX, + g_param_spec_int ("buffers-soft-max", "Buffers soft max", + "Recover client when going over this limit (-1 = no limit)", -1, + G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED, + g_param_spec_int ("buffers-queued", "Buffers queued", + "Number of buffers current queued", 0, G_MAXINT, 0, + G_PARAM_READABLE)); + g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY, + g_param_spec_enum ("recover-policy", "Recover Policy", + "How to recover when client reaches the soft max", + GST_TYPE_RECOVER_POLICY, GST_RECOVER_POLICY_NONE, G_PARAM_READWRITE)); + + + gst_multifdsink_signals[SIGNAL_CLIENT_ADDED] = + g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, client_added), + NULL, NULL, gst_tcp_marshal_VOID__STRING_UINT, G_TYPE_NONE, 2, + G_TYPE_STRING, G_TYPE_UINT); + gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED] = + g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, + client_removed), NULL, NULL, gst_tcp_marshal_VOID__STRING_UINT, + G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_UINT); + + gobject_class->set_property = gst_multifdsink_set_property; + gobject_class->get_property = gst_multifdsink_get_property; + + gstelement_class->change_state = gst_multifdsink_change_state; + + GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink"); +} + +static void +gst_multifdsink_init (GstMultiFdSink * this) +{ + /* create the sink pad */ + this->sinkpad = gst_pad_new ("sink", GST_PAD_SINK); + gst_element_add_pad (GST_ELEMENT (this), this->sinkpad); + gst_pad_set_chain_function (this->sinkpad, gst_multifdsink_chain); + + GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN); + + this->protocol = GST_TCP_PROTOCOL_TYPE_NONE; + + this->clients = NULL; + + this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *)); + this->buffers_max = DEFAULT_BUFFERS_MAX; + this->buffers_soft_max = DEFAULT_BUFFERS_SOFT_MAX; + + this->clientslock = g_mutex_new (); + this->recover_policy = GST_RECOVER_POLICY_NONE; +} + +static void +gst_multifdsink_debug_fdset (GstMultiFdSink * sink, fd_set * testfds) +{ + int fd; + + for (fd = 0; fd < FD_SETSIZE; fd++) { + if (FD_ISSET (fd, testfds)) { + GST_LOG_OBJECT (sink, "fd %d", fd); + } + } +} + +static void +gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client) +{ + int fd = client->fd; + + /* FIXME: if we keep track of ip we can log it here and signal */ + GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd); + if (close (fd) != 0) { + GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno)); + } + FD_CLR (fd, &sink->readfds); + FD_CLR (fd, &sink->writefds); + + sink->clients = g_list_remove (sink->clients, client); + + g_free (client); + + g_signal_emit (G_OBJECT (sink), + gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd); +} + +/* handle a read on a client fd, + * which either indicates a close or should be ignored + * returns FALSE if the client has been closed. */ +static gboolean +gst_multifdsink_handle_client_read (GstMultiFdSink * sink, + GstTCPClient * client) +{ + int nread, fd; + + fd = client->fd; + + GST_LOG_OBJECT (sink, "select reports client read on fd %d", fd); + + ioctl (fd, FIONREAD, &nread); + if (nread == 0) { + /* client sent close, so remove it */ + GST_DEBUG_OBJECT (sink, "client asked for close, removing on fd %d", fd); + return FALSE; + } else { + /* FIXME: we should probably just Read 'n' Drop */ + g_warning ("Don't know what to do with %d bytes to read", nread); + } + return TRUE; +} + +static gboolean +gst_multifdsink_client_queue_data (GstMultiFdSink * sink, GstTCPClient * client, + gchar * data, gint len) +{ + GstBuffer *buf; + + buf = gst_buffer_new (); + GST_BUFFER_DATA (buf) = data; + GST_BUFFER_SIZE (buf) = len; + + GST_DEBUG_OBJECT (sink, "Queueing data of length %d for fd %d", + len, client->fd); + client->sending = g_list_append (client->sending, buf); + + return TRUE; +} + +static gboolean +gst_multifdsink_client_queue_caps (GstMultiFdSink * sink, GstTCPClient * client, + const GstCaps * caps) +{ + guint8 *header; + guint8 *payload; + guint length; + gchar *string; + + string = gst_caps_to_string (caps); + GST_DEBUG_OBJECT (sink, "Queueing caps %s for fd %d through GDP", string, + client->fd); + g_free (string); + + if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) { + GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps"); + return FALSE; + } + gst_multifdsink_client_queue_data (sink, client, header, length); + + length = gst_dp_header_payload_length (header); + gst_multifdsink_client_queue_data (sink, client, payload, length); + + return TRUE; +} + +static gboolean +gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink, + GstTCPClient * client, GstBuffer * buffer) +{ + if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) { + guint8 *header; + guint len; + + if (!gst_dp_header_from_buffer (buffer, 0, &len, &header)) { + GST_DEBUG_OBJECT (sink, + "could not create header, removing client on fd %d", client->fd); + return FALSE; + } + gst_multifdsink_client_queue_data (sink, client, header, len); + } + + gst_buffer_ref (buffer); + client->sending = g_list_append (client->sending, buffer); + + return TRUE; +} + + +/* handle a write on a client, + * which indicates a read request from a client. + * + * The strategy is as follows, for each client we maintain a queue of GstBuffers + * that contain the raw bytes we need to send to the client. In the case of the + * GDP protocol, we create buffers out of the header bytes so that we can only focus + * on sending buffers. + * + * We first check to see if we need to send caps (in GDP) and streamheaders. If so, + * we queue them. + * + * Then we run into the main loop that tries to send as many buffers as possible. It + * will first exhaust the client->sending queue and if the queue is empty, it will + * pick a buffer from the global queue. + * + * Sending the Buffers from the client->sending queue is basically writing the bytes + * to the socket and maintaining a count of the bytes that were sent. When the buffer + * is completely sent, it is removed from the client->sending queue and we try to pick + * a new buffer for sending. + * + * When the sending returns a partial buffer we stop sending more data as the next send + * operation could block. + */ +static gboolean +gst_multifdsink_handle_client_write (GstMultiFdSink * sink, + GstTCPClient * client) +{ + int fd = client->fd; + gboolean more; + gboolean res; + + /* when using GDP, first check if we have queued caps yet */ + if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) { + if (!client->caps_sent) { + const GstCaps *caps = GST_PAD_CAPS (GST_PAD_PEER (sink->sinkpad)); + + /* queue caps for sending */ + res = gst_multifdsink_client_queue_caps (sink, client, caps); + if (!res) { + GST_DEBUG_OBJECT (sink, "Failed queueing caps, removing client"); + return FALSE; + } + client->caps_sent = TRUE; + } + } + /* if we have streamheader buffers, and haven't sent them to this client + * yet, send them out one by one */ + if (!client->streamheader_sent) { + if (sink->streamheader) { + GList *l; + + for (l = sink->streamheader; l; l = l->next) { + /* queue stream headers for sending */ + res = + gst_multifdsink_client_queue_buffer (sink, client, + GST_BUFFER (l->data)); + if (!res) { + GST_DEBUG_OBJECT (sink, + "Failed queueing streamheader, removing client"); + return FALSE; + } + } + } + client->streamheader_sent = TRUE; + } + + more = TRUE; + do { + gint maxsize; + + if (!client->sending) { + /* client is not working on a buffer */ + if (client->bufpos == -1) { + /* client is too fast, remove from write queue until new buffer is + * available */ + FD_CLR (fd, &sink->writefds); + return TRUE; + } else { + /* client can pick a buffer from the global queue */ + GstBuffer *buf; + + /* grab buffer and ref, we need to ref since it could be unreffed in + * another thread */ + buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos); + client->bufpos--; + gst_buffer_ref (buf); + + gst_multifdsink_client_queue_buffer (sink, client, buf); + /* it is safe to unref now as queueing a buffer will ref it */ + gst_buffer_unref (buf); + /* need to start from the first byte for this new buffer */ + client->bufoffset = 0; + } + } + + /* see if we need to send something */ + if (client->sending) { + ssize_t wrote; + GstBuffer *head; + + /* pick first buffer from list */ + head = GST_BUFFER (client->sending->data); + maxsize = GST_BUFFER_SIZE (head) - client->bufoffset; + + /* try to write the complete buffer */ + wrote = + send (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize, + MSG_NOSIGNAL); + if (wrote < 0) { + /* hmm error.. */ + if (errno == EAGAIN) { + /* nothing serious, resource was unavailable, try again later */ + more = FALSE; + } else { + GST_DEBUG_OBJECT (sink, "could not write, removing client on fd %d", + fd); + return FALSE; + } + } else if (wrote < maxsize) { + /* partial write means that the client cannot read more and we should + * stop sending more */ + GST_DEBUG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote); + client->bufoffset += wrote; + more = FALSE; + } else { + /* complete buffer was written, we can proceed to the next one */ + client->sending = g_list_remove (client->sending, head); + gst_buffer_unref (head); + /* make sure we start from byte 0 for the next buffer */ + client->bufoffset = 0; + } + } + } while (more); + + return TRUE; +} + +static void +gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) +{ + /* FIXME: implement recover procedure here, like moving the position to + * the next keyframe, dropping buffers back to the beginning of the queue, + * stuff like that... */ + GST_WARNING_OBJECT (sink, "client %p with fd %d is lagging", + client, client->fd); + switch (sink->recover_policy) { + case GST_RECOVER_POLICY_NONE: + /* do nothing, client will catch up or get kicked out when it reaches + * the hard max */ + break; + case GST_RECOVER_POLICY_RESYNC_START: + /* move to beginning of queue */ + client->bufpos = -1; + break; + case GST_RECOVER_POLICY_RESYNC_SOFT: + /* move to beginning of soft max */ + client->bufpos = sink->buffers_soft_max; + break; + case GST_RECOVER_POLICY_RESYNC_KEYFRAME: + /* FIXME, find keyframe in buffers */ + client->bufpos = sink->buffers_soft_max; + break; + } +} + +/* Queue a buffer on the global queue. + * + * This functions adds the buffer to the front of a GArray. It removes the + * tail buffer if the max queue size is exceeded. Unreffing the buffer that + * is queued. Note that unreffing the buffer is not a problem as clients who + * started writing out this buffer will still have a reference to it in the + * client->sending queue. + * + * After adding the buffer, we update all client positions in the queue. If + * a client moves of the soft max, we start the recovery procedure for this + * slow client. If it goes over the hard max, it is put into the slow list + * and removed. + * + * Special care is taken of clients that were waiting for a new buffer (they + * had a position of -1) because they can proceed after adding this new buffer. + * This is done by adding the client back into the write fd_set and signalling + * the select thread that the fd_set changed. + * + */ +static void +gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) +{ + GList *clients; + gint queuelen; + GList *slow = NULL; + gboolean need_signal = FALSE; + gint max_buffer_usage; + gint i; + + g_mutex_lock (sink->clientslock); + /* add buffer to queue */ + g_array_prepend_val (sink->bufqueue, buf); + queuelen = sink->bufqueue->len; + + /* then loop over the clients and update the positions */ + max_buffer_usage = 0; + for (clients = sink->clients; clients; clients = g_list_next (clients)) { + GstTCPClient *client; + + client = (GstTCPClient *) clients->data; + + client->bufpos++; + GST_LOG_OBJECT (sink, "client %p with fd %d at position %d", + client, client->fd, client->bufpos); + /* check soft max if needed, recover client */ + if (sink->buffers_soft_max > 0 && client->bufpos >= sink->buffers_soft_max) { + gst_multifdsink_recover_client (sink, client); + } + /* check hard max, remove client */ + if (sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) { + /* remove client */ + GST_WARNING_OBJECT (sink, "client %p with fd %d is too slow, removing", + client, client->fd); + FD_CLR (client->fd, &sink->readfds); + FD_CLR (client->fd, &sink->writefds); + slow = g_list_prepend (slow, client); + /* cannot send data to this client anymore. need to signal the select thread that + * the fd_set changed */ + need_signal = TRUE; + /* set client to invalid position while being removed */ + client->bufpos = -1; + } else if (client->bufpos == 0) { + /* can send data to this client now. need to signal the select thread that + * the fd_set changed */ + FD_SET (client->fd, &sink->writefds); + need_signal = TRUE; + } + /* keep track of maximum buffer usage */ + if (client->bufpos > max_buffer_usage) { + max_buffer_usage = client->bufpos; + } + } + /* remove crap clients */ + for (clients = slow; clients; clients = g_list_next (clients)) { + GstTCPClient *client; + + client = (GstTCPClient *) slow->data; + + gst_multifdsink_client_remove (sink, client); + } + g_list_free (slow); + /* nobody is referencing buffers after max_buffer_usage so we can + * remove them from the queue */ + for (i = queuelen - 1; i > max_buffer_usage; i--) { + GstBuffer *old; + + /* queue exceeded max size */ + queuelen--; + old = g_array_index (sink->bufqueue, GstBuffer *, i); + sink->bufqueue = g_array_remove_index (sink->bufqueue, i); + + /* unref tail buffer */ + gst_buffer_unref (old); + } + sink->buffers_queued = max_buffer_usage; + g_print ("%d\n", max_buffer_usage); + g_mutex_unlock (sink->clientslock); + + /* and send a signal to thread if fd_set changed */ + if (need_signal) { + SEND_COMMAND (sink, CONTROL_RESTART); + } +} + +/* Handle the clients. Basically does a blocking select for one + * of the client fds to become read or writable. We also have a + * filedescriptor to receive commands on that we need to check. + * + * After going out of the select call, we read and write to all + * clients that can do so. Badly behaving clients are put on a + * garbage list and removed. + */ +static void +gst_multifdsink_handle_clients (GstMultiFdSink * sink) +{ + int result; + fd_set testreadfds, testwritefds; + GList *clients, *error = NULL; + gboolean try_again; + GstMultiFdSinkClass *fclass; + + fclass = GST_MULTIFDSINK_GET_CLASS (sink); + + do { + try_again = FALSE; + + /* check for: + * - server socket input (ie, new client connections) + * - client socket input (ie, clients saying goodbye) + * - client socket output (ie, client reads) */ + testwritefds = sink->writefds; + testreadfds = sink->readfds; + + GST_LOG_OBJECT (sink, "doing select on server + client fds for reads"); + gst_multifdsink_debug_fdset (sink, &testreadfds); + GST_LOG_OBJECT (sink, "doing select on client fds for writes"); + gst_multifdsink_debug_fdset (sink, &testwritefds); + + result = + select (FD_SETSIZE, &testreadfds, &testwritefds, (fd_set *) 0, NULL); + + /* < 0 is an error, 0 just means a timeout happened */ + if (result < 0) { + GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL), + ("select failed: %s", g_strerror (errno))); + return; + } + + GST_LOG_OBJECT (sink, "%d sockets had action", result); + GST_LOG_OBJECT (sink, "done select on server/client fds for reads"); + gst_multifdsink_debug_fdset (sink, &testreadfds); + GST_LOG_OBJECT (sink, "done select on client fds for writes"); + gst_multifdsink_debug_fdset (sink, &testwritefds); + + if (FD_ISSET (READ_SOCKET (sink), &testreadfds)) { + gchar command; + + READ_COMMAND (sink, command); + + switch (command) { + case CONTROL_RESTART: + /* need to restart the select call as the fd_set changed */ + try_again = TRUE; + break; + case CONTROL_STOP: + /* stop this function */ + return; + default: + g_warning ("multifdsink: unknown control message received"); + break; + } + } + } while (try_again); + + if (fclass->select) + fclass->select (sink, &testreadfds, &testwritefds); + + /* Check the reads */ + g_mutex_lock (sink->clientslock); + for (clients = sink->clients; clients; clients = g_list_next (clients)) { + GstTCPClient *client; + int fd; + + client = (GstTCPClient *) clients->data; + fd = client->fd; + + if (FD_ISSET (fd, &testreadfds)) { + /* handle client read */ + if (!gst_multifdsink_handle_client_read (sink, client)) { + error = g_list_prepend (error, client); + continue; + } + } + if (FD_ISSET (fd, &testwritefds)) { + /* handle client write */ + if (!gst_multifdsink_handle_client_write (sink, client)) { + error = g_list_prepend (error, client); + continue; + } + } + } + /* remove crappy clients */ + for (clients = error; clients; clients = g_list_next (clients)) { + GstTCPClient *client; + + client = (GstTCPClient *) error->data; + + GST_LOG_OBJECT (sink, "removing client %p with fd %d with errors", client, + client->fd); + gst_multifdsink_client_remove (sink, client); + } + g_list_free (error); + g_mutex_unlock (sink->clientslock); +} + +static gpointer +gst_multifdsink_thread (GstMultiFdSink * sink) +{ + while (sink->running) { + gst_multifdsink_handle_clients (sink); + } + return NULL; +} + +static void +gst_multifdsink_chain (GstPad * pad, GstData * _data) +{ + GstBuffer *buf = GST_BUFFER (_data); + GstMultiFdSink *sink; + + g_return_if_fail (pad != NULL); + g_return_if_fail (GST_IS_PAD (pad)); + g_return_if_fail (buf != NULL); + sink = GST_MULTIFDSINK (GST_OBJECT_PARENT (pad)); + g_return_if_fail (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN)); + + if (GST_IS_EVENT (buf)) { + g_warning ("FIXME: handle events"); + return; + } + + /* if the incoming buffer is marked as IN CAPS, then we assume for now + * it's a streamheader that needs to be sent to each new client, so we + * put it on our internal list of streamheader buffers. + * After that we return, since we only send these out when we get + * non IN_CAPS buffers so we properly keep track of clients that got + * streamheaders. */ + if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS)) { + GST_DEBUG_OBJECT (sink, + "appending IN_CAPS buffer with length %d to streamheader", + GST_BUFFER_SIZE (buf)); + sink->streamheader = g_list_append (sink->streamheader, buf); + return; + } + + /* queue the buffer */ + gst_multifdsink_queue_buffer (sink, buf); + + sink->data_written += GST_BUFFER_SIZE (buf); +} + +static void +gst_multifdsink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstMultiFdSink *multifdsink; + + g_return_if_fail (GST_IS_MULTIFDSINK (object)); + multifdsink = GST_MULTIFDSINK (object); + + switch (prop_id) { + case ARG_PROTOCOL: + multifdsink->protocol = g_value_get_enum (value); + break; + case ARG_BUFFERS_MAX: + multifdsink->buffers_max = g_value_get_int (value); + break; + case ARG_BUFFERS_SOFT_MAX: + multifdsink->buffers_soft_max = g_value_get_int (value); + break; + case ARG_RECOVER_POLICY: + multifdsink->recover_policy = g_value_get_enum (value); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_multifdsink_get_property (GObject * object, guint prop_id, GValue * value, + GParamSpec * pspec) +{ + GstMultiFdSink *multifdsink; + + g_return_if_fail (GST_IS_MULTIFDSINK (object)); + multifdsink = GST_MULTIFDSINK (object); + + switch (prop_id) { + case ARG_PROTOCOL: + g_value_set_enum (value, multifdsink->protocol); + break; + case ARG_BUFFERS_MAX: + g_value_set_int (value, multifdsink->buffers_max); + break; + case ARG_BUFFERS_SOFT_MAX: + g_value_set_int (value, multifdsink->buffers_soft_max); + break; + case ARG_BUFFERS_QUEUED: + g_value_set_int (value, multifdsink->buffers_queued); + break; + case ARG_RECOVER_POLICY: + g_value_set_enum (value, multifdsink->recover_policy); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + + +/* create a socket for sending to remote machine */ +static gboolean +gst_multifdsink_init_send (GstMultiFdSink * this) +{ + GstMultiFdSinkClass *fclass; + + fclass = GST_MULTIFDSINK_GET_CLASS (this); + + FD_ZERO (&this->readfds); + FD_ZERO (&this->writefds); + + if (socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (this)) < 0) { + perror ("creating socket pair"); + } + FD_SET (READ_SOCKET (this), &this->readfds); + + this->streamheader = NULL; + this->data_written = 0; + + if (fclass->init) { + fclass->init (this); + } + + this->running = TRUE; + this->thread = g_thread_create ((GThreadFunc) gst_multifdsink_thread, + this, TRUE, NULL); + + return TRUE; +} + +static void +gst_multifdsink_close (GstMultiFdSink * this) +{ + GstMultiFdSinkClass *fclass; + + fclass = GST_MULTIFDSINK_GET_CLASS (this); + + this->running = FALSE; + + SEND_COMMAND (this, CONTROL_STOP); + g_thread_join (this->thread); + + close (READ_SOCKET (this)); + close (WRITE_SOCKET (this)); + + if (this->streamheader) { + GList *l; + + for (l = this->streamheader; l; l = l->next) { + gst_buffer_unref (l->data); + } + g_list_free (this->streamheader); + } + + if (fclass->close) + fclass->close (this); +} + +static GstElementStateReturn +gst_multifdsink_change_state (GstElement * element) +{ + GstMultiFdSink *sink; + + g_return_val_if_fail (GST_IS_MULTIFDSINK (element), GST_STATE_FAILURE); + sink = GST_MULTIFDSINK (element); + + switch (GST_STATE_TRANSITION (element)) { + case GST_STATE_NULL_TO_READY: + if (!GST_FLAG_IS_SET (element, GST_MULTIFDSINK_OPEN)) { + if (!gst_multifdsink_init_send (sink)) + return GST_STATE_FAILURE; + GST_FLAG_SET (sink, GST_MULTIFDSINK_OPEN); + } + break; + case GST_STATE_READY_TO_PAUSED: + break; + case GST_STATE_PAUSED_TO_PLAYING: + break; + case GST_STATE_PLAYING_TO_PAUSED: + break; + case GST_STATE_PAUSED_TO_READY: + break; + case GST_STATE_READY_TO_NULL: + if (GST_FLAG_IS_SET (element, GST_MULTIFDSINK_OPEN)) { + gst_multifdsink_close (GST_MULTIFDSINK (element)); + GST_FLAG_UNSET (sink, GST_MULTIFDSINK_OPEN); + } + break; + + } + + if (GST_ELEMENT_CLASS (parent_class)->change_state) + return GST_ELEMENT_CLASS (parent_class)->change_state (element); + + return GST_STATE_SUCCESS; +} diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h new file mode 100644 index 0000000..5eeecfe --- /dev/null +++ b/gst/tcp/gstmultifdsink.h @@ -0,0 +1,145 @@ +/* GStreamer + * Copyright (C) <1999> Erik Walthinsen + * Copyright (C) <2004> Thomas Vander Stichele + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#ifndef __GST_MULTIFDSINK_H__ +#define __GST_MULTIFDSINK_H__ + + +#include + +#ifdef __cplusplus +extern "C" { +#endif /* __cplusplus */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "gsttcp.h" + +#define GST_TYPE_MULTIFDSINK \ + (gst_multifdsink_get_type()) +#define GST_MULTIFDSINK(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_MULTIFDSINK,GstMultiFdSink)) +#define GST_MULTIFDSINK_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_MULTIFDSINK,GstMultiFdSink)) +#define GST_IS_MULTIFDSINK(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_MULTIFDSINK)) +#define GST_IS_MULTIFDSINK_CLASS(obj) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_MULTIFDSINK)) +#define GST_MULTIFDSINK_GET_CLASS(obj) \ + (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_MULTIFDSINK, GstMultiFdSinkClass)) + + +typedef struct _GstMultiFdSink GstMultiFdSink; +typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass; + +typedef enum { + GST_MULTIFDSINK_OPEN = GST_ELEMENT_FLAG_LAST, + + GST_MULTIFDSINK_FLAG_LAST = GST_ELEMENT_FLAG_LAST + 2, +} GstMultiFdSinkFlags; + +typedef enum +{ + GST_RECOVER_POLICY_NONE, + GST_RECOVER_POLICY_RESYNC_START, + GST_RECOVER_POLICY_RESYNC_SOFT, + GST_RECOVER_POLICY_RESYNC_KEYFRAME, +} GstRecoverPolicy; + +/* structure for a client + * */ +typedef struct { + int fd; + gint bufpos; /* position of this client in the global queue */ + + GList *sending; /* the buffers we need to send */ + gint bufoffset; /* offset in the first buffer */ + + GstTCPProtocolType protocol; + + gboolean caps_sent; + gboolean streamheader_sent; +} GstTCPClient; + +struct _GstMultiFdSink { + GstElement element; + + /* pad */ + GstPad *sinkpad; + + size_t data_written; /* how much bytes have we written ? */ + + GMutex *clientslock; /* lock to protect the clients list */ + GList *clients; /* list of clients we are serving */ + + fd_set readfds; /* all the client file descriptors that we can read from */ + fd_set writefds; /* all the client file descriptors that we can write to */ + + int control_sock[2]; /* sockets for controlling the select call */ + + GList *streamheader; /* GList of GstBuffers to use as streamheader */ + GstTCPProtocolType protocol; + guint mtu; + + GArray *bufqueue; /* global queue of buffers */ + + gboolean running; /* the thread state */ + GThread *thread; /* the sender thread */ + + gint buffers_max; /* max buffers to queue */ + gint buffers_soft_max; /* max buffers a client can lay before recoevery starts */ + GstRecoverPolicy recover_policy; + /* stats */ + gint buffers_queued; /* number of queued buffers */ +}; + +struct _GstMultiFdSinkClass { + GstElementClass parent_class; + + gboolean (*init) (GstMultiFdSink *sink); + gboolean (*select) (GstMultiFdSink *sink, fd_set *readfds, fd_set *writefds); + gboolean (*close) (GstMultiFdSink *sink); + + /* signals */ + void (*client_added) (GstElement *element, gchar *host, gint fd); + void (*client_removed) (GstElement *element, gchar *host, gint fd); +}; + +GType gst_multifdsink_get_type (void); + + +#ifdef __cplusplus +} +#endif /* __cplusplus */ + + +#endif /* __GST_MULTIFDSINK_H__ */ diff --git a/gst/tcp/gsttcpplugin.c b/gst/tcp/gsttcpplugin.c index 69ae7a2..2ca7b9e 100644 --- a/gst/tcp/gsttcpplugin.c +++ b/gst/tcp/gsttcpplugin.c @@ -27,6 +27,7 @@ #include "gsttcpclientsink.h" #include "gsttcpserversrc.h" #include "gsttcpserversink.h" +#include "gstmultifdsink.h" static gboolean plugin_init (GstPlugin * plugin) @@ -53,6 +54,9 @@ plugin_init (GstPlugin * plugin) if (!gst_element_register (plugin, "tcpserversrc", GST_RANK_NONE, GST_TYPE_TCPSERVERSRC)) return FALSE; + if (!gst_element_register (plugin, "multifdsink", GST_RANK_NONE, + GST_TYPE_MULTIFDSINK)) + return FALSE; return TRUE; } diff --git a/gst/tcp/gsttcpserversink.c b/gst/tcp/gsttcpserversink.c index 3c11d13..591f953 100644 --- a/gst/tcp/gsttcpserversink.c +++ b/gst/tcp/gsttcpserversink.c @@ -36,19 +36,6 @@ #define TCP_DEFAULT_PORT 4953 #define TCP_BACKLOG 5 -#define CONTROL_RESTART 'R' /* restart the select call */ -#define CONTROL_STOP 'S' /* stop the select call */ -#define SEND_COMMAND(sink, command) \ -G_STMT_START { \ - unsigned char c; c = command; \ - write (sink->control_sock[1], &c, 1); \ -} G_STMT_END - -#define READ_COMMAND(sink, command) \ -G_STMT_START { \ - read(sink->control_sock[0], &command, 1); \ -} G_STMT_END - /* elementfactory information */ static GstElementDetails gst_tcpserversink_details = GST_ELEMENT_DETAILS ("TCP Server sink", @@ -59,50 +46,21 @@ GST_ELEMENT_DETAILS ("TCP Server sink", GST_DEBUG_CATEGORY (tcpserversink_debug); #define GST_CAT_DEFAULT (tcpserversink_debug) -typedef struct -{ - int fd; - int bufpos; /* position of this client in the global queue */ - - GList *sending; /* the buffers we need to send */ - int bufoffset; /* offset in the first buffer */ - - gboolean caps_sent; - gboolean streamheader_sent; -} -GstTCPClient; - -/* TCPServerSink signals and args */ -enum -{ - SIGNAL_CLIENT_ADDED, - SIGNAL_CLIENT_REMOVED, - LAST_SIGNAL -}; - -#define DEFAULT_BUFFERS_MAX 25 -#define DEFAULT_BUFFERS_SOFT_MAX 20 - enum { ARG_0, ARG_HOST, ARG_PORT, - ARG_PROTOCOL, - ARG_BUFFERS_MAX, - ARG_BUFFERS_SOFT_MAX, }; static void gst_tcpserversink_base_init (gpointer g_class); static void gst_tcpserversink_class_init (GstTCPServerSink * klass); static void gst_tcpserversink_init (GstTCPServerSink * tcpserversink); -static void gst_tcpserversink_set_clock (GstElement * element, - GstClock * clock); - -static void gst_tcpserversink_chain (GstPad * pad, GstData * _data); -static GstElementStateReturn gst_tcpserversink_change_state (GstElement * - element); +static gboolean gst_tcpserversink_handle_select (GstMultiFdSink * sink, + fd_set * readfds, fd_set * writefds); +static gboolean gst_tcpserversink_init_send (GstMultiFdSink * this); +static gboolean gst_tcpserversink_close (GstMultiFdSink * this); static void gst_tcpserversink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); @@ -110,9 +68,9 @@ static void gst_tcpserversink_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static GstElementClass *parent_class = NULL; +static GstMultiFdSinkClass *parent_class = NULL; -static guint gst_tcpserversink_signals[LAST_SIGNAL] = { 0 }; +//static guint gst_tcpserversink_signals[LAST_SIGNAL] = { 0 }; GType gst_tcpserversink_get_type (void) @@ -135,7 +93,7 @@ gst_tcpserversink_get_type (void) }; tcpserversink_type = - g_type_register_static (GST_TYPE_ELEMENT, "GstTCPServerSink", + g_type_register_static (GST_TYPE_MULTIFDSINK, "GstTCPServerSink", &tcpserversink_info, 0); } return tcpserversink_type; @@ -154,11 +112,13 @@ gst_tcpserversink_class_init (GstTCPServerSink * klass) { GObjectClass *gobject_class; GstElementClass *gstelement_class; + GstMultiFdSinkClass *gstmultifdsink_class; gobject_class = (GObjectClass *) klass; gstelement_class = (GstElementClass *) klass; + gstmultifdsink_class = (GstMultiFdSinkClass *) klass; - parent_class = g_type_class_ref (GST_TYPE_ELEMENT); + parent_class = g_type_class_ref (GST_TYPE_MULTIFDSINK); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST, g_param_spec_string ("host", "host", "The host/IP to send the packets to", @@ -166,89 +126,25 @@ gst_tcpserversink_class_init (GstTCPServerSink * klass) g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT, g_param_spec_int ("port", "port", "The port to send the packets to", 0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE)); - g_object_class_install_property (gobject_class, ARG_PROTOCOL, - g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in", - GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE, - G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX, - g_param_spec_int ("buffers-max", "Buffers max", - "max number of buffers to queue (0 = no limit)", 0, G_MAXINT, - DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_SOFT_MAX, - g_param_spec_int ("buffers-soft-max", "Buffers soft max", - "Recover client when going over this limit (0 = no limit)", 0, - G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE)); - - gst_tcpserversink_signals[SIGNAL_CLIENT_ADDED] = - g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstTCPServerSinkClass, client_added), - NULL, NULL, gst_tcp_marshal_VOID__STRING_UINT, G_TYPE_NONE, 2, - G_TYPE_STRING, G_TYPE_UINT); - gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED] = - g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstTCPServerSinkClass, - client_removed), NULL, NULL, gst_tcp_marshal_VOID__STRING_UINT, - G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_UINT); gobject_class->set_property = gst_tcpserversink_set_property; gobject_class->get_property = gst_tcpserversink_get_property; - gstelement_class->change_state = gst_tcpserversink_change_state; - gstelement_class->set_clock = gst_tcpserversink_set_clock; + gstmultifdsink_class->init = gst_tcpserversink_init_send; + gstmultifdsink_class->select = gst_tcpserversink_handle_select; + gstmultifdsink_class->close = gst_tcpserversink_close; GST_DEBUG_CATEGORY_INIT (tcpserversink_debug, "tcpserversink", 0, "TCP sink"); } static void -gst_tcpserversink_set_clock (GstElement * element, GstClock * clock) -{ - GstTCPServerSink *tcpserversink; - - tcpserversink = GST_TCPSERVERSINK (element); - - tcpserversink->clock = clock; -} - -static void gst_tcpserversink_init (GstTCPServerSink * this) { - /* create the sink pad */ - this->sinkpad = gst_pad_new ("sink", GST_PAD_SINK); - gst_element_add_pad (GST_ELEMENT (this), this->sinkpad); - gst_pad_set_chain_function (this->sinkpad, gst_tcpserversink_chain); - this->server_port = TCP_DEFAULT_PORT; /* should support as minimum 576 for IPV4 and 1500 for IPV6 */ /* this->mtu = 1500; */ this->server_sock_fd = -1; - GST_FLAG_UNSET (this, GST_TCPSERVERSINK_OPEN); - - this->protocol = GST_TCP_PROTOCOL_TYPE_NONE; - this->clock = NULL; - - this->clients = NULL; - - this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *)); - this->queuelock = g_mutex_new (); - this->queuecond = g_cond_new (); - this->buffers_max = DEFAULT_BUFFERS_MAX; - this->buffers_soft_max = DEFAULT_BUFFERS_SOFT_MAX; - - this->clientslock = g_mutex_new (); - -} - -static void -gst_tcpserversink_debug_fdset (GstTCPServerSink * sink, fd_set * testfds) -{ - int fd; - - for (fd = 0; fd < FD_SETSIZE; fd++) { - if (FD_ISSET (fd, testfds)) { - GST_LOG_OBJECT (sink, "fd %d", fd); - } - } } /* handle a read request on the server, @@ -261,6 +157,7 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink) struct sockaddr_in client_address; int client_address_len; GstTCPClient *client; + GstMultiFdSink *parent = GST_MULTIFDSINK (sink); client_sock_fd = accept (sink->server_sock_fd, (struct sockaddr *) &client_address, @@ -278,559 +175,55 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink) client->bufoffset = 0; client->sending = NULL; - g_mutex_lock (sink->clientslock); - sink->clients = g_list_prepend (sink->clients, client); - g_mutex_unlock (sink->clientslock); + g_mutex_lock (parent->clientslock); + parent->clients = g_list_prepend (parent->clients, client); + g_mutex_unlock (parent->clientslock); /* we always read from a client */ - FD_SET (client_sock_fd, &sink->readfds); + FD_SET (client_sock_fd, &parent->readfds); /* set the socket to non blocking */ fcntl (client_sock_fd, F_SETFL, O_NONBLOCK); GST_DEBUG_OBJECT (sink, "added new client ip %s with fd %d", inet_ntoa (client_address.sin_addr), client_sock_fd); - g_signal_emit (G_OBJECT (sink), - gst_tcpserversink_signals[SIGNAL_CLIENT_ADDED], 0, - inet_ntoa (client_address.sin_addr), client_sock_fd); return TRUE; } -static void -gst_tcpserversink_client_remove (GstTCPServerSink * sink, GstTCPClient * client) -{ - int fd = client->fd; - - /* FIXME: if we keep track of ip we can log it here and signal */ - GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd); - if (close (fd) != 0) { - GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno)); - } - FD_CLR (fd, &sink->readfds); - FD_CLR (fd, &sink->writefds); - - sink->clients = g_list_remove (sink->clients, client); - - g_free (client); - - g_signal_emit (G_OBJECT (sink), - gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd); -} - -/* handle a read on a client fd, - * which either indicates a close or should be ignored - * returns FALSE if the client has been closed. */ static gboolean -gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, - GstTCPClient * client) -{ - int nread, fd; - - fd = client->fd; - - GST_LOG_OBJECT (sink, "select reports client read on fd %d", fd); - - ioctl (fd, FIONREAD, &nread); - if (nread == 0) { - /* client sent close, so remove it */ - GST_DEBUG_OBJECT (sink, "client asked for close, removing on fd %d", fd); - return FALSE; - } else { - /* FIXME: we should probably just Read 'n' Drop */ - g_warning ("Don't know what to do with %d bytes to read", nread); - } - return TRUE; -} - -static gboolean -gst_tcpserversink_client_queue_data (GstTCPServerSink * sink, - GstTCPClient * client, gchar * data, gint len) -{ - GstBuffer *buf; - - buf = gst_buffer_new (); - GST_BUFFER_DATA (buf) = data; - GST_BUFFER_SIZE (buf) = len; - - GST_DEBUG_OBJECT (sink, "Queueing data of length %d for fd %d", - len, client->fd); - client->sending = g_list_append (client->sending, buf); - - return TRUE; -} - -static gboolean -gst_tcpserversink_client_queue_caps (GstTCPServerSink * sink, - GstTCPClient * client, const GstCaps * caps) -{ - guint8 *header; - guint8 *payload; - guint length; - gchar *string; - - string = gst_caps_to_string (caps); - GST_DEBUG_OBJECT (sink, "Queueing caps %s for fd %d through GDP", string, - client->fd); - g_free (string); - - if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) { - GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps"); - return FALSE; - } - gst_tcpserversink_client_queue_data (sink, client, header, length); - - length = gst_dp_header_payload_length (header); - gst_tcpserversink_client_queue_data (sink, client, payload, length); - - return TRUE; -} - -static gboolean -gst_tcpserversink_client_queue_buffer (GstTCPServerSink * sink, - GstTCPClient * client, GstBuffer * buffer) -{ - if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) { - guint8 *header; - guint len; - - if (!gst_dp_header_from_buffer (buffer, 0, &len, &header)) { - GST_DEBUG_OBJECT (sink, - "could not create header, removing client on fd %d", client->fd); - return FALSE; - } - gst_tcpserversink_client_queue_data (sink, client, header, len); - } - - gst_buffer_ref (buffer); - client->sending = g_list_append (client->sending, buffer); - - return TRUE; -} - - -/* handle a write on a client, - * which indicates a read request from a client. - * - * The strategy is as follows, for each client we maintain a queue of GstBuffers - * that contain the raw bytes we need to send to the client. In the case of the - * GDP protocol, we create buffers out of the header bytes so that we can only focus - * on sending buffers. - * - * We first check to see if we need to send caps (in GDP) and streamheaders. If so, - * we queue them. - * - * Then we run into the main loop that tries to send as many buffers as possible. It - * will first exhaust the client->sending queue and if the queue is empty, it will - * pick a buffer from the global queue. - * - * Sending the Buffers from the client->sending queue is basically writing the bytes - * to the socket and maintaining a count of the bytes that were sent. When the buffer - * is completely sent, it is removed from the client->sending queue and we try to pick - * a new buffer for sending. - * - * When the sending returns a partial buffer we stop sending more data as the next send - * operation could block. - */ -static gboolean -gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, - GstTCPClient * client) -{ - int fd = client->fd; - gboolean more; - gboolean res; - - /* when using GDP, first check if we have queued caps yet */ - if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) { - if (!client->caps_sent) { - const GstCaps *caps = GST_PAD_CAPS (GST_PAD_PEER (sink->sinkpad)); - - /* queue caps for sending */ - res = gst_tcpserversink_client_queue_caps (sink, client, caps); - if (!res) { - GST_DEBUG_OBJECT (sink, "Failed queueing caps, removing client"); - return FALSE; - } - client->caps_sent = TRUE; - } - } - /* if we have streamheader buffers, and haven't sent them to this client - * yet, send them out one by one */ - if (!client->streamheader_sent) { - if (sink->streamheader) { - GList *l; - - for (l = sink->streamheader; l; l = l->next) { - /* queue stream headers for sending */ - res = - gst_tcpserversink_client_queue_buffer (sink, client, - GST_BUFFER (l->data)); - if (!res) { - GST_DEBUG_OBJECT (sink, - "Failed queueing streamheader, removing client"); - return FALSE; - } - } - } - client->streamheader_sent = TRUE; - } - - more = TRUE; - do { - gint maxsize; - - if (!client->sending) { - /* client is not working on a buffer */ - if (client->bufpos == -1) { - /* client is too fast, remove from write queue until new buffer is - * available */ - FD_CLR (fd, &sink->writefds); - return TRUE; - } else { - /* client can pick a buffer from the global queue */ - GstBuffer *buf; - - /* grab buffer and ref, we need to ref since it could be unreffed in - * another thread when we unlock the queuelock */ - g_mutex_lock (sink->queuelock); - buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos); - client->bufpos--; - gst_buffer_ref (buf); - g_mutex_unlock (sink->queuelock); - - gst_tcpserversink_client_queue_buffer (sink, client, buf); - /* it is safe to unref now as queueing a buffer will ref it */ - gst_buffer_unref (buf); - /* need to start from the first byte for this new buffer */ - client->bufoffset = 0; - } - } - - /* see if we need to send something */ - if (client->sending) { - ssize_t wrote; - GstBuffer *head; - - /* pick first buffer from list */ - head = GST_BUFFER (client->sending->data); - maxsize = GST_BUFFER_SIZE (head) - client->bufoffset; - - /* try to write the complete buffer */ - wrote = - send (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize, - MSG_NOSIGNAL); - if (wrote < 0) { - /* hmm error.. */ - if (errno == EAGAIN) { - /* nothing serious, resource was unavailable, try again later */ - more = FALSE; - } else { - GST_DEBUG_OBJECT (sink, "could not write, removing client on fd %d", - fd); - return FALSE; - } - } else if (wrote < maxsize) { - /* partial write means that the client cannot read more and we should - * stop sending more */ - GST_DEBUG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote); - client->bufoffset += wrote; - more = FALSE; - } else { - /* complete buffer was written, we can proceed to the next one */ - client->sending = g_list_remove (client->sending, head); - gst_buffer_unref (head); - /* make sure we start from byte 0 for the next buffer */ - client->bufoffset = 0; - } - } - } while (more); - - return TRUE; -} - -/* Queue a buffer on the global queue. - * - * This functions adds the buffer to the front of a GArray. It removes the - * tail buffer if the max queue size is exceeded. Unreffing the buffer that - * is queued. Note that unreffing the buffer is not a problem as clients who - * started writing out this buffer will still have a reference to it in the - * client->sending queue. - * - * After adding the buffer, we update all client positions in the queue. If - * a client moves of the soft max, we start the recovery procedure for this - * slow client. If it goes over the hard max, it is put into the slow list - * and removed. - * - * Special care is taken of clients that were waiting for a new buffer (they - * had a position of -1) because they can proceed after adding this new buffer. - * This is done by adding the client back into the write fd_set and signalling - * the select thread that the fd_set changed. - * - */ -static void -gst_tcpserversink_queue_buffer (GstTCPServerSink * sink, GstBuffer * buf) +gst_tcpserversink_handle_select (GstMultiFdSink * sink, + fd_set * readfds, fd_set * writefds) { - GList *clients; - gint queuelen; - GList *slow = NULL; - gboolean need_signal = FALSE; - - g_mutex_lock (sink->queuelock); - /* add buffer to queue */ - g_array_prepend_val (sink->bufqueue, buf); - queuelen = sink->bufqueue->len; - if (queuelen > sink->buffers_max) { - GstBuffer *old; - - /* queue exceeded max size */ - queuelen--; - old = g_array_index (sink->bufqueue, GstBuffer *, queuelen); - sink->bufqueue = g_array_remove_index (sink->bufqueue, queuelen); - - /* unref tail buffer */ - gst_buffer_unref (old); - } - g_mutex_unlock (sink->queuelock); - - /* then loop over the clients and update the positions */ - g_mutex_lock (sink->clientslock); - for (clients = sink->clients; clients; clients = g_list_next (clients)) { - GstTCPClient *client; - - client = (GstTCPClient *) clients->data; - - client->bufpos++; - GST_LOG_OBJECT (sink, "client %p with fd %d at position %d", - client, client->fd, client->bufpos); - if (client->bufpos >= sink->buffers_soft_max) { - if (client->bufpos == sink->buffers_soft_max) { - g_warning ("client %p with fd %d is lagging...", client, client->fd); - } - GST_LOG_OBJECT (sink, "client %p with fd %d is lagging", - client, client->fd); - } - if (client->bufpos >= queuelen) { - /* remove client */ - GST_LOG_OBJECT (sink, "client %p with fd %d is too slow, removing", - client, client->fd); - g_warning ("client %p with fd %d too slow, removing", client, client->fd); - FD_CLR (client->fd, &sink->readfds); - FD_CLR (client->fd, &sink->writefds); - slow = g_list_prepend (slow, client); - /* cannot send data to this client anymore. need to signal the select thread that - * the fd_set changed */ - need_signal = TRUE; - } else if (client->bufpos == 0) { - /* can send data to this client now. need to signal the select thread that - * the fd_set changed */ - FD_SET (client->fd, &sink->writefds); - need_signal = TRUE; - } - } - /* remove crap clients */ - for (clients = slow; clients; clients = g_list_next (clients)) { - GstTCPClient *client; - - client = (GstTCPClient *) slow->data; - - gst_tcpserversink_client_remove (sink, client); - } - g_list_free (slow); - g_mutex_unlock (sink->clientslock); + GstTCPServerSink *this = GST_TCPSERVERSINK (sink); - /* and send a signal to thread if fd_set changed */ - if (need_signal) { - SEND_COMMAND (sink, CONTROL_RESTART); - } -} - -/* Handle the clients. Basically does a blocking select for one - * of the client fds to become read or writable. We also have a - * filedescriptor to receive commands on that we need to check. - * - * After going out of the select call, we read and write to all - * clients that can do so. Badly behaving clients are put on a - * garbage list and removed. - */ -static void -gst_tcpserversink_handle_clients (GstTCPServerSink * sink) -{ - int result; - fd_set testreadfds, testwritefds; - GList *clients, *error = NULL; - gboolean try_again; - - do { - try_again = FALSE; - - /* check for: - * - server socket input (ie, new client connections) - * - client socket input (ie, clients saying goodbye) - * - client socket output (ie, client reads) */ - testwritefds = sink->writefds; - testreadfds = sink->readfds; - FD_SET (sink->server_sock_fd, &testreadfds); - FD_SET (sink->control_sock[0], &testreadfds); - - GST_LOG_OBJECT (sink, "doing select on server + client fds for reads"); - gst_tcpserversink_debug_fdset (sink, &testreadfds); - GST_LOG_OBJECT (sink, "doing select on client fds for writes"); - gst_tcpserversink_debug_fdset (sink, &testwritefds); - - result = - select (FD_SETSIZE, &testreadfds, &testwritefds, (fd_set *) 0, NULL); - - /* < 0 is an error, 0 just means a timeout happened */ - if (result < 0) { - GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL), - ("select failed: %s", g_strerror (errno))); - return; - } - - GST_LOG_OBJECT (sink, "%d sockets had action", result); - GST_LOG_OBJECT (sink, "done select on server/client fds for reads"); - gst_tcpserversink_debug_fdset (sink, &testreadfds); - GST_LOG_OBJECT (sink, "done select on client fds for writes"); - gst_tcpserversink_debug_fdset (sink, &testwritefds); - - if (FD_ISSET (sink->control_sock[0], &testreadfds)) { - gchar command; - - READ_COMMAND (sink, command); - - switch (command) { - case CONTROL_RESTART: - /* need to restart the select call as the fd_set changed */ - try_again = TRUE; - break; - case CONTROL_STOP: - /* stop this function */ - return; - default: - g_warning ("tcpserversink: unknown control message received"); - break; - } - } - } while (try_again); - - if (FD_ISSET (sink->server_sock_fd, &testreadfds)) { + if (FD_ISSET (this->server_sock_fd, readfds)) { /* handle new client connection on server socket */ - if (!gst_tcpserversink_handle_server_read (sink)) { - GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL), + if (!gst_tcpserversink_handle_server_read (this)) { + GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), ("client connection failed: %s", g_strerror (errno))); - return; - } - } - - /* Check the reads */ - g_mutex_lock (sink->clientslock); - for (clients = sink->clients; clients; clients = g_list_next (clients)) { - GstTCPClient *client; - int fd; - - client = (GstTCPClient *) clients->data; - fd = client->fd; - - if (FD_ISSET (fd, &testreadfds)) { - /* handle client read */ - if (!gst_tcpserversink_handle_client_read (sink, client)) { - error = g_list_prepend (error, client); - continue; - } - } - if (FD_ISSET (fd, &testwritefds)) { - /* handle client write */ - if (!gst_tcpserversink_handle_client_write (sink, client)) { - error = g_list_prepend (error, client); - continue; - } + return FALSE; } } - /* remove crappy clients */ - for (clients = error; clients; clients = g_list_next (clients)) { - GstTCPClient *client; - - client = (GstTCPClient *) error->data; - - GST_LOG_OBJECT (sink, "removing client %p with fd %d with errors", client, - client->fd); - gst_tcpserversink_client_remove (sink, client); - } - g_list_free (error); - g_mutex_unlock (sink->clientslock); -} - -static gpointer -gst_tcpserversink_thread (GstTCPServerSink * sink) -{ - while (sink->running) { - gst_tcpserversink_handle_clients (sink); - } - return NULL; -} - -static void -gst_tcpserversink_chain (GstPad * pad, GstData * _data) -{ - GstBuffer *buf = GST_BUFFER (_data); - GstTCPServerSink *sink; - - g_return_if_fail (pad != NULL); - g_return_if_fail (GST_IS_PAD (pad)); - g_return_if_fail (buf != NULL); - sink = GST_TCPSERVERSINK (GST_OBJECT_PARENT (pad)); - g_return_if_fail (GST_FLAG_IS_SET (sink, GST_TCPSERVERSINK_OPEN)); - - if (GST_IS_EVENT (buf)) { - g_warning ("FIXME: handle events"); - return; - } - - /* if the incoming buffer is marked as IN CAPS, then we assume for now - * it's a streamheader that needs to be sent to each new client, so we - * put it on our internal list of streamheader buffers. - * After that we return, since we only send these out when we get - * non IN_CAPS buffers so we properly keep track of clients that got - * streamheaders. */ - if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS)) { - GST_DEBUG_OBJECT (sink, - "appending IN_CAPS buffer with length %d to streamheader", - GST_BUFFER_SIZE (buf)); - sink->streamheader = g_list_append (sink->streamheader, buf); - return; - } - - /* queue the buffer */ - gst_tcpserversink_queue_buffer (sink, buf); - - sink->data_written += GST_BUFFER_SIZE (buf); + return TRUE; } static void gst_tcpserversink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { - GstTCPServerSink *tcpserversink; + GstTCPServerSink *sink; g_return_if_fail (GST_IS_TCPSERVERSINK (object)); - tcpserversink = GST_TCPSERVERSINK (object); + sink = GST_TCPSERVERSINK (object); switch (prop_id) { case ARG_HOST: - g_free (tcpserversink->host); - tcpserversink->host = g_strdup (g_value_get_string (value)); + g_free (sink->host); + sink->host = g_strdup (g_value_get_string (value)); break; case ARG_PORT: - tcpserversink->server_port = g_value_get_int (value); - break; - case ARG_PROTOCOL: - tcpserversink->protocol = g_value_get_enum (value); - break; - case ARG_BUFFERS_MAX: - tcpserversink->buffers_max = g_value_get_int (value); - break; - case ARG_BUFFERS_SOFT_MAX: - tcpserversink->buffers_soft_max = g_value_get_int (value); + sink->server_port = g_value_get_int (value); break; default: @@ -843,26 +236,17 @@ static void gst_tcpserversink_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { - GstTCPServerSink *tcpserversink; + GstTCPServerSink *sink; g_return_if_fail (GST_IS_TCPSERVERSINK (object)); - tcpserversink = GST_TCPSERVERSINK (object); + sink = GST_TCPSERVERSINK (object); switch (prop_id) { case ARG_HOST: - g_value_set_string (value, tcpserversink->host); + g_value_set_string (value, sink->host); break; case ARG_PORT: - g_value_set_int (value, tcpserversink->server_port); - break; - case ARG_PROTOCOL: - g_value_set_enum (value, tcpserversink->protocol); - break; - case ARG_BUFFERS_MAX: - g_value_set_int (value, tcpserversink->buffers_max); - break; - case ARG_BUFFERS_SOFT_MAX: - g_value_set_int (value, tcpserversink->buffers_soft_max); + g_value_set_int (value, sink->server_port); break; default: @@ -874,9 +258,10 @@ gst_tcpserversink_get_property (GObject * object, guint prop_id, GValue * value, /* create a socket for sending to remote machine */ static gboolean -gst_tcpserversink_init_send (GstTCPServerSink * this) +gst_tcpserversink_init_send (GstMultiFdSink * parent) { int ret; + GstTCPServerSink *this = GST_TCPSERVERSINK (parent); /* create sending server socket */ if ((this->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) { @@ -936,71 +321,19 @@ gst_tcpserversink_init_send (GstTCPServerSink * this) "listened on server socket %d, returning from connection setup", this->server_sock_fd); - FD_ZERO (&this->readfds); - FD_ZERO (&this->writefds); - FD_SET (this->server_sock_fd, &this->readfds); - - if (socketpair (PF_UNIX, SOCK_STREAM, 0, this->control_sock) < 0) { - perror ("creating socket pair"); - } - - this->running = TRUE; - this->thread = g_thread_create ((GThreadFunc) gst_tcpserversink_thread, - this, TRUE, NULL); - - GST_FLAG_SET (this, GST_TCPSERVERSINK_OPEN); - this->streamheader = NULL; - - this->data_written = 0; + FD_SET (this->server_sock_fd, &parent->readfds); return TRUE; } -static void -gst_tcpserversink_close (GstTCPServerSink * this) +static gboolean +gst_tcpserversink_close (GstMultiFdSink * parent) { - this->running = FALSE; - - SEND_COMMAND (this, CONTROL_STOP); - - g_thread_join (this->thread); - - close (this->control_sock[0]); - close (this->control_sock[1]); + GstTCPServerSink *this = GST_TCPSERVERSINK (parent); if (this->server_sock_fd != -1) { close (this->server_sock_fd); this->server_sock_fd = -1; } - - if (this->streamheader) { - GList *l; - - for (l = this->streamheader; l; l = l->next) { - gst_buffer_unref (l->data); - } - g_list_free (this->streamheader); - } - GST_FLAG_UNSET (this, GST_TCPSERVERSINK_OPEN); -} - -static GstElementStateReturn -gst_tcpserversink_change_state (GstElement * element) -{ - g_return_val_if_fail (GST_IS_TCPSERVERSINK (element), GST_STATE_FAILURE); - - if (GST_STATE_PENDING (element) == GST_STATE_NULL) { - if (GST_FLAG_IS_SET (element, GST_TCPSERVERSINK_OPEN)) - gst_tcpserversink_close (GST_TCPSERVERSINK (element)); - } else { - if (!GST_FLAG_IS_SET (element, GST_TCPSERVERSINK_OPEN)) { - if (!gst_tcpserversink_init_send (GST_TCPSERVERSINK (element))) - return GST_STATE_FAILURE; - } - } - - if (GST_ELEMENT_CLASS (parent_class)->change_state) - return GST_ELEMENT_CLASS (parent_class)->change_state (element); - - return GST_STATE_SUCCESS; + return TRUE; } diff --git a/gst/tcp/gsttcpserversink.h b/gst/tcp/gsttcpserversink.h index 903b62c..92a1847 100644 --- a/gst/tcp/gsttcpserversink.h +++ b/gst/tcp/gsttcpserversink.h @@ -42,7 +42,7 @@ extern "C" { #include #include #include -#include "gsttcp.h" +#include "gstmultifdsink.h" #define GST_TYPE_TCPSERVERSINK \ (gst_tcpserversink_get_type()) @@ -65,10 +65,7 @@ typedef enum { } GstTCPServerSinkFlags; struct _GstTCPServerSink { - GstElement element; - - /* pad */ - GstPad *sinkpad; + GstMultiFdSink element; /* server information */ int server_port; @@ -77,39 +74,10 @@ struct _GstTCPServerSink { /* socket */ int server_sock_fd; - - size_t data_written; /* how much bytes have we written ? */ - - GMutex *clientslock; - GList *clients; /* list of clients we are serving */ - - fd_set readfds; /* all the client file descriptors that we can read from */ - fd_set writefds; /* all the client file descriptors that we can write to */ - - int control_sock[2]; /* sockets for controlling the select call */ - - GList *streamheader; /* GList of GstBuffers to use as streamheader */ - GstTCPProtocolType protocol; - guint mtu; - GstClock *clock; - - GArray *bufqueue; - GMutex *queuelock; - GCond *queuecond; - - gboolean running; - GThread *thread; - - gint buffers_max; - gint buffers_soft_max; }; struct _GstTCPServerSinkClass { - GstElementClass parent_class; - - /* signals */ - void (*client_added) (GstElement *element, gchar *host, gint fd); - void (*client_removed) (GstElement *element, gchar *host, gint fd); + GstMultiFdSinkClass parent_class; }; GType gst_tcpserversink_get_type (void);