return TRUE;
}
+static void
+gst_tcpserversink_client_remove (GstTCPServerSink * sink, int fd)
+{
+ /* FIXME: if we keep track of ip we can log it here and signal */
+ GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
+ if (close (fd) != 0) {
+ GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno));
+ }
+ FD_CLR (fd, &sink->clientfds);
+ FD_CLR (fd, &sink->caps_sent);
+ FD_CLR (fd, &sink->streamheader_sent);
+ g_signal_emit (G_OBJECT (sink),
+ gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd);
+}
+
/* handle a read on a client fd,
- * which either indicates a close or should be ignored */
+ * which either indicates a close or should be ignored
+ * returns FALSE if the client has been closed. */
static gboolean
gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd)
{
ioctl (fd, FIONREAD, &nread);
if (nread == 0) {
/* client sent close, so remove it */
- GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
- if (close (fd) != 0) {
- GST_ELEMENT_ERROR (sink, RESOURCE, CLOSE, (NULL),
- ("error closing fd %d: %s", fd, g_strerror (errno)));
- return FALSE;
- }
- FD_CLR (fd, &sink->clientfds);
- FD_CLR (fd, &sink->caps_sent);
- /* FIXME: we need to keep track of IP info so we can signal it here */
- g_signal_emit (G_OBJECT (sink),
- gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd);
+ GST_DEBUG_OBJECT (sink, "client asked for close, removing on fd %d", fd);
+ gst_tcpserversink_client_remove (sink, fd);
+ return FALSE;
} else {
/* FIXME: we should probably just Read 'n' Drop */
g_warning ("Don't know what to do with %d bytes to read", nread);
return TRUE;
}
-/* handle a write on a client fd,
- * which indicates a read request from a client */
-static gboolean
-gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd,
- GstPad * pad, GstBuffer * buf)
+/* Write a buffer to the given fd for the given element using the given
+ * protocol.
+ * Return number of buffer bytes written.
+ */
+static gint
+gst_tcp_buffer_write (GstBuffer * buf, int fd, GstElement * element,
+ GstTCPProtocolType protocol)
{
+ gint wrote = 0;
+
/* write the buffer header if we have one */
- switch (sink->protocol) {
+ switch (protocol) {
case GST_TCP_PROTOCOL_TYPE_NONE:
break;
case GST_TCP_PROTOCOL_TYPE_GDP:
- /* if we haven't sent caps yet, send them first */
- if (!FD_ISSET (fd, &(sink->caps_sent))) {
- const GstCaps *caps;
- gchar *string;
-
- caps = GST_PAD_CAPS (GST_PAD_PEER (pad));
- string = gst_caps_to_string (caps);
- GST_DEBUG_OBJECT (sink, "Sending caps %s for fd %d through GDP", string,
- fd);
- /* FIXME: fix this again so that write_caps is non-fatal for multiple clients; also use a fd, host, port struct */
- if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), fd, caps, FALSE,
- "unknown", 0)) {
- g_free (string);
- return FALSE;
- }
- g_free (string);
- FD_SET (fd, &(sink->caps_sent));
- }
- GST_LOG_OBJECT (sink, "Sending buffer header through GDP");
- if (!gst_tcp_gdp_write_header (GST_ELEMENT (sink), fd, buf, FALSE,
- "unknown", 0))
- return FALSE;
+ GST_LOG_OBJECT (element, "Sending buffer header through GDP");
+ if (!gst_tcp_gdp_write_header (element, fd, buf, FALSE, "unknown", 0))
+ return 0;
break;
default:
g_warning ("Unhandled protocol type");
}
/* serve data to client */
- GST_LOG_OBJECT (sink, "serving data buffer of size %d to client on fd %d",
+ GST_LOG_OBJECT (element, "serving data buffer of size %d to client on fd %d",
GST_BUFFER_SIZE (buf), fd);
- int wrote = 0;
-
wrote =
gst_tcp_socket_write (fd, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf));
/* FIXME: keep track of client ip and port and so on */
/*
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
- (_("Error while sending data to \"%s:%d\"."), sink->host, sink->port),
+ (_("Error while sending data to \"%s:%d\"."),
+ sink->host, sink->port),
("Only %d of %d bytes written: %s",
bytes_written, GST_BUFFER_SIZE (buf), g_strerror (errno)));
*/
/* FIXME: there should be a better way to report problems, since we
want to continue for other clients and just drop this particular one */
- GST_DEBUG_OBJECT (sink, "Write failed: %d of %d bytes written", wrote,
+ GST_DEBUG_OBJECT (element, "Write failed: %d of %d bytes written", wrote,
GST_BUFFER_SIZE (buf));
+ }
+
+ return wrote;
+}
+
+/* handle a write on a client fd,
+ * which indicates a read request from a client */
+static gboolean
+gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd,
+ GstPad * pad, GstBuffer * buf)
+{
+ gint wrote = 0;
+
+ /* when using GDP, first check if we have sent caps yet */
+ if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
+ if (!FD_ISSET (fd, &(sink->caps_sent))) {
+ const GstCaps *caps;
+ gchar *string;
+
+ caps = GST_PAD_CAPS (GST_PAD_PEER (pad));
+ string = gst_caps_to_string (caps);
+ GST_DEBUG_OBJECT (sink, "Sending caps %s for fd %d through GDP", string,
+ fd);
+ if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), fd, caps, FALSE,
+ "unknown", 0)) {
+ GST_DEBUG_OBJECT (sink, "Failed sending caps, removing client");
+ gst_tcpserversink_client_remove (sink, fd);
+ g_free (string);
+ return FALSE;
+ }
+ g_free (string);
+ FD_SET (fd, &(sink->caps_sent));
+ }
+ }
+ /* if we have streamheader buffers, and haven't sent them to this client
+ * yet, send them out one by one */
+ if (!FD_ISSET (fd, &(sink->streamheader_sent))) {
+ if (sink->streamheader) {
+ GList *l;
+
+ for (l = sink->streamheader; l; l = l->next) {
+ wrote = gst_tcp_buffer_write (l->data, fd, GST_ELEMENT (sink),
+ sink->protocol);
+ if (wrote < GST_BUFFER_SIZE (l->data)) {
+ GST_DEBUG_OBJECT (sink,
+ "Failed sending streamheader, removing client");
+ gst_tcpserversink_client_remove (sink, fd);
+ }
+ }
+ }
+ FD_SET (fd, &(sink->streamheader_sent));
+ }
+
+ /* now we write the data buffer */
+ wrote = gst_tcp_buffer_write (buf, fd, GST_ELEMENT (sink), sink->protocol);
+ if (wrote < GST_BUFFER_SIZE (buf)) {
+ gst_tcpserversink_client_remove (sink, fd);
/* write failed, so drop the client */
GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
if (close (fd) != 0) {
GST_DEBUG_OBJECT (sink, "error closing fd %d after failed write: %s",
fd, g_strerror (errno));
}
- FD_CLR (fd, &sink->clientfds);
- FD_CLR (fd, &sink->caps_sent);
- g_signal_emit (G_OBJECT (sink),
- gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd);
return FALSE;
}
return TRUE;
g_return_if_fail (GST_FLAG_IS_SET (sink, GST_TCPSERVERSINK_OPEN));
if (GST_IS_EVENT (buf)) {
- g_warning ("FIXME: handl events");
+ g_warning ("FIXME: handle events");
return;
}
+ /* if the incoming buffer is marked as IN CAPS, then we assume for now
+ * it's a streamheader that needs to be sent to each new client, so we
+ * put it on our internal list of streamheader buffers.
+ * After that we return, since we only send these out when we get
+ * non IN_CAPS buffers so we properly keep track of clients that got
+ * streamheaders. */
+ if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS)) {
+ GST_DEBUG_OBJECT (sink,
+ "appending IN_CAPS buffer with length %d to streamheader",
+ GST_BUFFER_SIZE (buf));
+ sink->streamheader = g_list_append (sink->streamheader, buf);
+ return;
+ }
/* if the incoming buffer has a duration, we can use that as the timeout
* value; otherwise, we block */
timeout.tv_sec = 0;
gst_buffer_unref (buf);
- /* FIXME: emit signal ? */
}
static void
FD_ZERO (&this->clientfds);
FD_ZERO (&this->caps_sent);
+ FD_ZERO (&this->streamheader_sent);
FD_SET (this->server_sock_fd, &this->clientfds);
GST_FLAG_SET (this, GST_TCPSERVERSINK_OPEN);
+ this->streamheader = NULL;
this->data_written = 0;
this->server_sock_fd = -1;
}
+ if (this->streamheader) {
+ GList *l;
+
+ for (l = sink->streamheader; l; l = l->next) {
+ gst_buffer_unref (l->data);
+ }
+ g_list_free (this->streamheader);
+ }
GST_FLAG_UNSET (this, GST_TCPSERVERSINK_OPEN);
}