#include "gstmultifdsink.h"
#include "gsttcp-marshal.h"
+/* the select call is also performed on the control sockets, that way
+ * we can send special commands to unblock or restart the select call */
#define CONTROL_RESTART 'R' /* restart the select call */
#define CONTROL_STOP 'S' /* stop the select call */
#define CONTROL_SOCKETS(sink) sink->control_sock
/* elementfactory information */
static GstElementDetails gst_multifdsink_details =
-GST_ELEMENT_DETAILS ("TCP Server sink",
+GST_ELEMENT_DETAILS ("MultiFd sink",
"Sink/Network",
- "Send data as a server over the network via TCP",
- "Thomas Vander Stichele <thomas at apestaart dot org>");
+ "Send data to multiple filedescriptors",
+ "Thomas Vander Stichele <thomas at apestaart dot org>, "
+ "Wim Taymans <wim@fluendo.com>");
GST_DEBUG_CATEGORY (multifdsink_debug);
#define GST_CAT_DEFAULT (multifdsink_debug)
SIGNAL_REMOVE,
SIGNAL_CLEAR,
SIGNAL_GET_STATS,
+
/* signals */
SIGNAL_CLIENT_ADDED,
SIGNAL_CLIENT_REMOVED,
+
LAST_SIGNAL
};
/* this is really arbitrary choosen */
+#define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_TYPE_NONE
#define DEFAULT_BUFFERS_MAX -1
#define DEFAULT_BUFFERS_SOFT_MAX -1
+#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE
+#define DEFAULT_TIMEOUT 0
enum
{
ARG_BUFFERS_SOFT_MAX,
ARG_BUFFERS_QUEUED,
ARG_RECOVER_POLICY,
+ ARG_TIMEOUT,
+ ARG_BYTES_TO_SERVE,
+ ARG_BYTES_SERVED,
};
#define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type())
g_object_class_install_property (gobject_class, ARG_PROTOCOL,
g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
- GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE,
- G_PARAM_READWRITE));
+ GST_TYPE_TCP_PROTOCOL_TYPE, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX,
g_param_spec_int ("buffers-max", "Buffers max",
"max number of buffers to queue (-1 = no limit)", -1, G_MAXINT,
G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED,
g_param_spec_int ("buffers-queued", "Buffers queued",
- "Number of buffers current queued", 0, G_MAXINT, 0,
+ "Number of buffers currently queued", 0, G_MAXINT, 0,
G_PARAM_READABLE));
g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY,
g_param_spec_enum ("recover-policy", "Recover Policy",
"How to recover when client reaches the soft max",
- GST_TYPE_RECOVER_POLICY, GST_RECOVER_POLICY_NONE, G_PARAM_READWRITE));
+ GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY, G_PARAM_READWRITE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT,
+ g_param_spec_uint64 ("timeout", "Timeout",
+ "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)",
+ 0, G_MAXUINT64, DEFAULT_TIMEOUT, G_PARAM_READABLE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_TO_SERVE,
+ g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
+ "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
+ G_PARAM_READABLE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_SERVED,
+ g_param_spec_uint64 ("bytes-served", "Bytes served",
+ "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
+ G_PARAM_READABLE));
gst_multifdsink_signals[SIGNAL_ADD] =
g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN);
- this->protocol = GST_TCP_PROTOCOL_TYPE_NONE;
+ this->protocol = DEFAULT_PROTOCOL;
+ this->clientslock = g_mutex_new ();
this->clients = NULL;
this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
this->buffers_max = DEFAULT_BUFFERS_MAX;
this->buffers_soft_max = DEFAULT_BUFFERS_SOFT_MAX;
+ this->recover_policy = DEFAULT_RECOVER_POLICY;
- this->clientslock = g_mutex_new ();
- this->recover_policy = GST_RECOVER_POLICY_NONE;
+ this->timeout = DEFAULT_TIMEOUT;
}
static void
/* update start time */
g_get_current_time (&now);
client->connect_time = GST_TIMEVAL_TO_TIME (now);
+ /* send last activity time to connect time */
+ client->last_activity_time = GST_TIMEVAL_TO_TIME (now);
g_mutex_lock (sink->clientslock);
}
SEND_COMMAND (sink, CONTROL_RESTART);
- sink->clients = g_list_remove (sink->clients, client);
-
g_get_current_time (&now);
client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
client->connect_interval = client->disconnect_time = client->connect_time;
g_signal_emit (G_OBJECT (sink),
gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd);
+ sink->clients = g_list_remove (sink->clients, client);
+
g_free (client);
}
int fd = client->fd;
gboolean more;
gboolean res;
+ GstClockTime now;
+ GTimeVal nowtv;
+
+ g_get_current_time (&nowtv);
+ now = GST_TIMEVAL_TO_TIME (nowtv);
/* when using GDP, first check if we have queued caps yet */
if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
fd);
return FALSE;
}
- } else if (wrote < maxsize) {
- /* partial write means that the client cannot read more and we should
- * stop sending more */
- GST_LOG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote);
- client->bufoffset += wrote;
- client->bytes_sent += wrote;
- more = FALSE;
} else {
- /* complete buffer was written, we can proceed to the next one */
- client->sending = g_list_remove (client->sending, head);
- gst_buffer_unref (head);
- /* make sure we start from byte 0 for the next buffer */
- client->bufoffset = 0;
+ if (wrote < maxsize) {
+ /* partial write means that the client cannot read more and we should
+ * stop sending more */
+ GST_LOG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote);
+ client->bufoffset += wrote;
+ more = FALSE;
+ } else {
+ /* complete buffer was written, we can proceed to the next one */
+ client->sending = g_list_remove (client->sending, head);
+ gst_buffer_unref (head);
+ /* make sure we start from byte 0 for the next buffer */
+ client->bufoffset = 0;
+ }
+ /* update stats */
client->bytes_sent += wrote;
+ client->last_activity_time = now;
+ sink->bytes_served += wrote;
}
}
} while (more);
gboolean need_signal = FALSE;
gint max_buffer_usage;
gint i;
+ GTimeVal nowtv;
+ GstClockTime now;
+
+ g_get_current_time (&nowtv);
+ now = GST_TIMEVAL_TO_TIME (nowtv);
g_mutex_lock (sink->clientslock);
/* add buffer to queue */
"client %p with fd %d not recovering position", client, client->fd);
}
}
- /* check hard max, remove client */
- if (sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) {
+ /* check hard max and timeout, remove client */
+ if ((sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) ||
+ (sink->timeout > 0
+ && now - client->last_activity_time > sink->timeout)) {
/* remove client */
GST_WARNING_OBJECT (sink, "client %p with fd %d is too slow, removing",
client, client->fd);
g_mutex_unlock (sink->clientslock);
}
+/* we handle the client communication in another thread so that we do not block
+ * the gstreamer thread while we select() on the client fds */
static gpointer
gst_multifdsink_thread (GstMultiFdSink * sink)
{
/* queue the buffer */
gst_multifdsink_queue_buffer (sink, buf);
- sink->data_written += GST_BUFFER_SIZE (buf);
+ sink->bytes_to_serve += GST_BUFFER_SIZE (buf);
}
static void
case ARG_RECOVER_POLICY:
multifdsink->recover_policy = g_value_get_enum (value);
break;
+ case ARG_TIMEOUT:
+ multifdsink->timeout = g_value_get_uint64 (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
case ARG_RECOVER_POLICY:
g_value_set_enum (value, multifdsink->recover_policy);
break;
+ case ARG_TIMEOUT:
+ g_value_set_uint64 (value, multifdsink->timeout);
+ break;
+ case ARG_BYTES_TO_SERVE:
+ g_value_set_uint64 (value, multifdsink->bytes_to_serve);
+ break;
+ case ARG_BYTES_SERVED:
+ g_value_set_uint64 (value, multifdsink->bytes_served);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
fcntl (WRITE_SOCKET (this), F_SETFL, O_NONBLOCK);
this->streamheader = NULL;
- this->data_written = 0;
+ this->bytes_to_serve = 0;
+ this->bytes_served = 0;
if (fclass->init) {
fclass->init (this);