+2005-01-13 Thomas Vander Stichele <thomas at apestaart dot org>
+
+ * ext/ogg/gstoggmux.c:
+ eos/bos debugging
+ * gst/tcp/gstmultifdsink.c:
+ * gst/tcp/gstmultifdsink.h:
+ * gst/tcp/gsttcp.c:
+ * gst/tcp/gsttcp.h:
+ * gst/tcp/gsttcpclientsink.c:
+ * gst/tcp/gsttcpclientsrc.c:
+ * gst/tcp/gsttcpserversink.c:
+ * gst/tcp/gsttcpserversrc.c:
+ improve reusability of elements after state changes and errors
+ make multifdsink throw away streamheaders when receiving new ones
+
2005-01-13 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
* ext/alsa/gstalsa.c: (gst_alsa_rates_probe):
pad->prev_delta = delta_unit;
/* swap the packet in */
+ if (packet.e_o_s == 1)
+ GST_DEBUG_OBJECT (pad, "swapping in EOS packet");
+ if (packet.b_o_s == 1)
+ GST_DEBUG_OBJECT (pad, "swapping in BOS packet");
+
ogg_stream_packetin (&pad->stream, &packet);
/* don't need the old buffer anymore */
LAST_SIGNAL
};
-/* this is really arbitrary choosen */
+/* this is really arbitrarily chosen */
#define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_TYPE_NONE
#define DEFAULT_MODE GST_FDSET_MODE_POLL
#define DEFAULT_BUFFERS_MAX -1
#define DEFAULT_BUFFERS_SOFT_MAX -1
-#define DEFAULT_UNIT_TYPE GST_UNIT_TYPE_BUFFERS
+#define DEFAULT_UNIT_TYPE GST_UNIT_TYPE_BUFFERS
#define DEFAULT_UNITS_MAX -1
#define DEFAULT_UNITS_SOFT_MAX -1
#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE
/* if we have streamheader buffers, and haven't sent them to this client
* yet, send them out one by one */
if (!client->streamheader_sent) {
+ GST_DEBUG_OBJECT (sink, "[fd %5d] Sending streamheader, %d buffers", fd,
+ g_slist_length (sink->streamheader));
if (sink->streamheader) {
GSList *l;
return;
}
+ GST_LOG_OBJECT (sink, "received buffer %p", buf);
+ /* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS,
+ * it means we're getting new streamheader buffers, and we should clear
+ * the old ones */
+ if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS) &&
+ sink->previous_buffer_in_caps == FALSE) {
+ GST_DEBUG_OBJECT (sink,
+ "receiving new IN_CAPS buffers, clearing old streamheader");
+ g_slist_foreach (sink->streamheader, (GFunc) gst_data_unref, NULL);
+ g_slist_free (sink->streamheader);
+ sink->streamheader = NULL;
+ }
/* if the incoming buffer is marked as IN CAPS, then we assume for now
* it's a streamheader that needs to be sent to each new client, so we
* put it on our internal list of streamheader buffers.
* non IN_CAPS buffers so we properly keep track of clients that got
* streamheaders. */
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS)) {
+ sink->previous_buffer_in_caps = TRUE;
GST_DEBUG_OBJECT (sink,
"appending IN_CAPS buffer with length %d to streamheader",
GST_BUFFER_SIZE (buf));
return;
}
+ sink->previous_buffer_in_caps = FALSE;
/* queue the buffer */
gst_multifdsink_queue_buffer (sink, buf);
guint64 last_activity_time;
guint64 dropped_buffers;
guint64 avg_queue_size;
-
+
} GstTCPClient;
struct _GstMultiFdSink {
GMutex *clientslock; /* lock to protect the clients list */
GList *clients; /* list of clients we are serving */
GHashTable *fd_hash; /* index on fd to client */
-
+
GstFDSetMode mode;
GstFDSet *fdset;
GstFD control_sock[2];/* sockets for controlling the select call */
GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
+ gboolean previous_buffer_in_caps;
+
GstTCPProtocolType protocol;
guint mtu;
/* resolve host to IP address, throwing errors if it fails */
/* host can already be an IP address */
-/* returns a newly allocated gchar * with the dotted ip address */
+/* returns a newly allocated gchar * with the dotted ip address,
+ or NULL, in which case it already fired an error. */
gchar *
gst_tcp_host_to_ip (GstElement * element, const gchar * host)
{
return bytes_read;
}
+/* close the socket and reset the fd. Used to clean up after errors. */
+void
+gst_tcp_socket_close (int *socket)
+{
+ close (*socket);
+ *socket = -1;
+}
+
/* read the gdp buffer header from the given socket
- * returns a GstData,
- * representing the new GstBuffer to read data into, or an EOS event
+ * returns:
+ * - a GstData representing a GstBuffer in which data should be read
+ * - a GstData representing a GstEvent
+ * - NULL, indicating a connection close or an error, to be handled with
+ * EOS
*/
GstData *
gst_tcp_gdp_read_header (GstElement * this, int socket)
ret = gst_tcp_socket_read (socket, header, readsize);
/* if we read 0 bytes, and we're blocking, we hit eos */
if (ret == 0) {
- GST_DEBUG ("blocking read returns 0, EOS");
+ GST_DEBUG ("blocking read returns 0, returning NULL");
g_free (header);
- gst_element_set_eos (GST_ELEMENT (this));
- return GST_DATA (gst_event_new (GST_EVENT_EOS));
+ return NULL;
}
if (ret < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
gint gst_tcp_socket_write (int socket, const void *buf, size_t count);
gint gst_tcp_socket_read (int socket, void *buf, size_t count);
+void gst_tcp_socket_close (int *socket);
+
GstData * gst_tcp_gdp_read_header (GstElement *this, int socket);
GstCaps * gst_tcp_gdp_read_caps (GstElement *this, int socket);
switch (prop_id) {
case ARG_HOST:
+ if (!g_value_get_string (value)) {
+ g_warning ("host property cannot be NULL");
+ break;
+ }
g_free (tcpclientsink->host);
tcpclientsink->host = g_strdup (g_value_get_string (value));
break;
/* look up name if we need to */
ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
- if (!ip)
+ if (!ip) {
+ gst_tcp_socket_close (&this->sock_fd);
return FALSE;
+ }
GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip);
/* connect to server */
sizeof (this->server_sin));
if (ret) {
+ gst_tcp_socket_close (&this->sock_fd);
switch (errno) {
case ECONNREFUSED:
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE,
return caps;
}
+/* close the socket and associated resources
+ * unset OPEN flag
+ * used both to recover from errors and go to NULL state */
+static void
+gst_tcpclientsrc_close (GstTCPClientSrc * this)
+{
+ GST_DEBUG_OBJECT (this, "closing socket");
+ if (this->sock_fd != -1) {
+ close (this->sock_fd);
+ this->sock_fd = -1;
+ }
+ this->caps_received = FALSE;
+ if (this->caps) {
+ gst_caps_free (this->caps);
+ this->caps = NULL;
+ }
+ GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN);
+}
+
+/* close socket and related items and return an EOS GstData
+ * called from _get */
+static GstData *
+gst_tcpclientsrc_eos (GstTCPClientSrc * src)
+{
+ GST_DEBUG_OBJECT (src, "going to EOS");
+ gst_element_set_eos (GST_ELEMENT (src));
+ gst_tcpclientsrc_close (src);
+ return GST_DATA (gst_event_new (GST_EVENT_EOS));
+}
+
static GstData *
gst_tcpclientsrc_get (GstPad * pad)
{
g_return_val_if_fail (pad != NULL, NULL);
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
src = GST_TCPCLIENTSRC (GST_OBJECT_PARENT (pad));
- g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN), NULL);
+ if (!GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN)) {
+ GST_DEBUG_OBJECT (src, "connection to server closed, cannot give data");
+ return NULL;
+ }
+ GST_LOG_OBJECT (src, "asked for a buffer");
/* try to negotiate here */
if (!gst_pad_is_negotiated (pad)) {
if (GST_PAD_LINK_FAILED (gst_pad_renegotiate (pad))) {
GST_ELEMENT_ERROR (src, CORE, NEGOTIATION, (NULL), GST_ERROR_SYSTEM);
gst_buffer_unref (buf);
- return GST_DATA (gst_event_new (GST_EVENT_EOS));
+ return gst_tcpclientsrc_eos (src);
}
}
if (ret <= 0) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("select failed: %s", g_strerror (errno)));
- return GST_DATA (gst_event_new (GST_EVENT_EOS));
+ return gst_tcpclientsrc_eos (src);
}
/* ask how much is available for reading on the socket */
if (ret < 0) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("ioctl failed: %s", g_strerror (errno)));
- return GST_DATA (gst_event_new (GST_EVENT_EOS));
+ return gst_tcpclientsrc_eos (src);
}
GST_LOG_OBJECT (src, "ioctl says %d bytes available", readsize);
buf = gst_buffer_new_and_alloc (readsize);
break;
case GST_TCP_PROTOCOL_TYPE_GDP:
if (!(data = gst_tcp_gdp_read_header (GST_ELEMENT (src), src->sock_fd))) {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("Could not read data header through GDP"));
- return GST_DATA (gst_event_new (GST_EVENT_EOS));
+ return gst_tcpclientsrc_eos (src);
}
- if (GST_IS_EVENT (data))
+ if (GST_IS_EVENT (data)) {
+ /* if we got back an EOS event, then we should go into eos ourselves */
+ if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) {
+ gst_event_unref (data);
+ return gst_tcpclientsrc_eos (src);
+ }
return data;
+ }
+
buf = GST_BUFFER (data);
GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
break;
}
- GST_LOG_OBJECT (src, "Reading %d bytes", readsize);
+ GST_LOG_OBJECT (src, "Reading %d bytes into buffer", readsize);
ret = gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf), readsize);
if (ret < 0) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
gst_buffer_unref (buf);
- return GST_DATA (gst_event_new (GST_EVENT_EOS));
+ return gst_tcpclientsrc_eos (src);
}
/* if we read 0 bytes, and we're blocking, we hit eos */
if (ret == 0) {
- GST_DEBUG ("blocking read returns 0, EOS");
+ GST_DEBUG_OBJECT (src, "blocking read returns 0, EOS");
gst_buffer_unref (buf);
- gst_element_set_eos (GST_ELEMENT (src));
- return GST_DATA (gst_event_new (GST_EVENT_EOS));
+ return gst_tcpclientsrc_eos (src);
}
readsize = ret;
switch (prop_id) {
case ARG_HOST:
+ if (!g_value_get_string (value)) {
+ g_warning ("host property cannot be NULL");
+ break;
+ }
g_free (tcpclientsrc->host);
tcpclientsrc->host = g_strdup (g_value_get_string (value));
break;
}
GST_DEBUG_OBJECT (this, "opened receiving client socket with fd %d",
this->sock_fd);
+ GST_FLAG_SET (this, GST_TCPCLIENTSRC_OPEN);
/* look up name if we need to */
ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
- if (!ip)
+ if (!ip) {
+ gst_tcpclientsrc_close (this);
return FALSE;
+ }
GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip);
/* connect to server */
sizeof (this->server_sin));
if (ret) {
+ gst_tcpclientsrc_close (this);
switch (errno) {
case ECONNREFUSED:
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ,
this->send_discont = TRUE;
this->buffer_after_discont = NULL;
- GST_FLAG_SET (this, GST_TCPCLIENTSRC_OPEN);
/* get the caps if we're using GDP */
if (this->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
GST_DEBUG_OBJECT (this, "getting caps through GDP");
if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (this), this->sock_fd))) {
+ gst_tcpclientsrc_close (this);
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("Could not read caps through GDP"));
return FALSE;
}
if (!GST_IS_CAPS (caps)) {
+ gst_tcpclientsrc_close (this);
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("Could not read caps through GDP"));
return FALSE;
return TRUE;
}
-static void
-gst_tcpclientsrc_close (GstTCPClientSrc * this)
-{
- if (this->sock_fd != -1) {
- close (this->sock_fd);
- this->sock_fd = -1;
- }
- this->caps_received = FALSE;
- if (this->caps) {
- gst_caps_free (this->caps);
- this->caps = NULL;
- }
- GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN);
-}
-
static GstElementStateReturn
gst_tcpclientsrc_change_state (GstElement * element)
{
g_return_val_if_fail (GST_IS_TCPCLIENTSRC (element), GST_STATE_FAILURE);
- if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
- if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN))
- gst_tcpclientsrc_close (GST_TCPCLIENTSRC (element));
- } else {
- if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN)) {
- if (!gst_tcpclientsrc_init_receive (GST_TCPCLIENTSRC (element)))
- return GST_STATE_FAILURE;
- }
+ /* if open and going to NULL, close it */
+ if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN) &&
+ GST_STATE_PENDING (element) == GST_STATE_NULL) {
+ gst_tcpclientsrc_close (GST_TCPCLIENTSRC (element));
+ }
+ /* if closed and going to a state higher than NULL, open it */
+ if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN) &&
+ GST_STATE_PENDING (element) > GST_STATE_NULL) {
+ if (!gst_tcpclientsrc_init_receive (GST_TCPCLIENTSRC (element)))
+ return GST_STATE_FAILURE;
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)
switch (prop_id) {
case ARG_HOST:
+ if (!g_value_get_string (value)) {
+ g_warning ("host property cannot be NULL");
+ break;
+ }
g_free (sink->host);
sink->host = g_strdup (g_value_get_string (value));
break;
/* make address reusable */
if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_REUSEADDR, &ret,
sizeof (int)) < 0) {
+ gst_tcp_socket_close (&this->server_sock.fd);
GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
("Could not setsockopt: %s", g_strerror (errno)));
return FALSE;
/* keep connection alive; avoids SIGPIPE during write */
if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_KEEPALIVE, &ret,
sizeof (int)) < 0) {
+ gst_tcp_socket_close (&this->server_sock.fd);
GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
("Could not setsockopt: %s", g_strerror (errno)));
return FALSE;
sizeof (this->server_sin));
if (ret) {
+ gst_tcp_socket_close (&this->server_sock.fd);
switch (errno) {
default:
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d",
this->server_sock.fd, TCP_BACKLOG);
if (listen (this->server_sock.fd, TCP_BACKLOG) == -1) {
+ gst_tcp_socket_close (&this->server_sock.fd);
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("Could not listen on server socket: %s", g_strerror (errno)));
return FALSE;
switch (prop_id) {
case ARG_HOST:
+ if (!g_value_get_string (value)) {
+ g_warning ("host property cannot be NULL");
+ break;
+ }
g_free (tcpserversrc->host);
tcpserversrc->host = g_strdup (g_value_get_string (value));
break;
if (this->host) {
gchar *host = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
- if (!host)
+ if (!host) {
+ gst_tcp_socket_close (&this->server_sock_fd);
return FALSE;
+ }
this->server_sin.sin_addr.s_addr = inet_addr (host);
g_free (host);
sizeof (this->server_sin));
if (ret) {
+ gst_tcp_socket_close (&this->server_sock_fd);
switch (errno) {
default:
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d",
this->server_sock_fd, TCP_BACKLOG);
if (listen (this->server_sock_fd, TCP_BACKLOG) == -1) {
+ gst_tcp_socket_close (&this->server_sock_fd);
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("Could not listen on server socket: %s", g_strerror (errno)));
return FALSE;
accept (this->server_sock_fd, (struct sockaddr *) &this->client_sin,
&this->client_sin_len);
if (this->client_sock_fd == -1) {
+ gst_tcp_socket_close (&this->server_sock_fd);
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("Could not accept client on server socket: %s", g_strerror (errno)));
return FALSE;