SIGNAL_ADD,
SIGNAL_REMOVE,
SIGNAL_CLEAR,
+ SIGNAL_GET_STATS,
/* signals */
SIGNAL_CLIENT_ADDED,
SIGNAL_CLIENT_REMOVED,
gst_multifdsink_signals[SIGNAL_CLEAR] =
g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstMultiFdSinkClass, clear),
- NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0);
+ NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+ gst_multifdsink_signals[SIGNAL_GET_STATS] =
+ g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+ G_STRUCT_OFFSET (GstMultiFdSinkClass, get_stats),
+ NULL, NULL, gst_tcp_marshal_BOXED__INT, G_TYPE_VALUE_ARRAY, 1,
+ G_TYPE_INT);
gst_multifdsink_signals[SIGNAL_CLIENT_ADDED] =
g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
klass->add = gst_multifdsink_add;
klass->remove = gst_multifdsink_remove;
klass->clear = gst_multifdsink_clear;
+ klass->get_stats = gst_multifdsink_get_stats;
GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink");
}
gst_multifdsink_add (GstMultiFdSink * sink, int fd)
{
GstTCPClient *client;
+ GTimeVal now;
/* create client datastructure */
client = g_new0 (GstTCPClient, 1);
client->bufpos = -1;
client->bufoffset = 0;
client->sending = NULL;
+ client->bytes_sent = 0;
+ client->dropped_buffers = 0;
+ client->avg_queue_size = 0;
+
+ /* update start time */
+ g_get_current_time (&now);
+ client->connect_time = GST_TIMEVAL_TO_TIME (now);
g_mutex_lock (sink->clientslock);
g_mutex_unlock (sink->clientslock);
}
+GValueArray *
+gst_multifdsink_get_stats (GstMultiFdSink * sink, int fd)
+{
+ GList *clients;
+ GValueArray *result = NULL;
+
+ g_mutex_lock (sink->clientslock);
+ /* loop over the clients to find the one with the fd */
+ for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+ GstTCPClient *client;
+
+ client = (GstTCPClient *) clients->data;
+
+ if (client->fd == fd) {
+ GValue value = { 0 };
+
+ result = g_value_array_new (4);
+
+ g_value_init (&value, G_TYPE_UINT64);
+ g_value_set_uint64 (&value, client->bytes_sent);
+ result = g_value_array_append (result, &value);
+ g_value_unset (&value);
+ g_value_init (&value, G_TYPE_UINT64);
+ g_value_set_uint64 (&value, client->connect_time);
+ result = g_value_array_append (result, &value);
+ g_value_unset (&value);
+ g_value_init (&value, G_TYPE_UINT64);
+ g_value_set_uint64 (&value, client->disconnect_time);
+ result = g_value_array_append (result, &value);
+ g_value_unset (&value);
+ g_value_init (&value, G_TYPE_UINT64);
+ g_value_set_uint64 (&value, client->connect_interval);
+ result = g_value_array_append (result, &value);
+ break;
+ }
+ }
+ g_mutex_unlock (sink->clientslock);
+
+ /* python doesn't like a NULL pointer yet */
+ if (result == NULL) {
+ result = g_value_array_new (0);
+ }
+
+ return result;
+}
+
static void
gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client)
{
int fd = client->fd;
+ GTimeVal now;
/* FIXME: if we keep track of ip we can log it here and signal */
GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
sink->clients = g_list_remove (sink->clients, client);
- g_free (client);
+ g_get_current_time (&now);
+ client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
+ client->connect_interval = client->disconnect_time = client->connect_time;
g_signal_emit (G_OBJECT (sink),
gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd);
+
+ g_free (client);
}
/* handle a read on a client fd,
* stop sending more */
GST_LOG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote);
client->bufoffset += wrote;
+ client->bytes_sent += wrote;
more = FALSE;
} else {
/* complete buffer was written, we can proceed to the next one */
gst_buffer_unref (head);
/* make sure we start from byte 0 for the next buffer */
client->bufoffset = 0;
+ client->bytes_sent += wrote;
}
}
} while (more);
gboolean caps_sent;
gboolean streamheader_sent;
+
+ /* stats */
+ guint64 bytes_sent;
+ guint64 connect_time;
+ guint64 disconnect_time;
+ guint64 connect_interval;
+ guint64 dropped_buffers;
+ guint64 avg_queue_size;
+
} GstTCPClient;
struct _GstMultiFdSink {
gint buffers_max; /* max buffers to queue */
gint buffers_soft_max; /* max buffers a client can lay before recoevery starts */
GstRecoverPolicy recover_policy;
+ GstClockTime timeout; /* max amount of nanoseconds to remain idle */
/* stats */
gint buffers_queued; /* number of queued buffers */
};
GstElementClass parent_class;
/* element methods */
- void (*add) (GstMultiFdSink *sink, int fd);
- void (*remove) (GstMultiFdSink *sink, int fd);
- void (*clear) (GstMultiFdSink *sink);
+ void (*add) (GstMultiFdSink *sink, int fd);
+ void (*remove) (GstMultiFdSink *sink, int fd);
+ void (*clear) (GstMultiFdSink *sink);
+ GValueArray* (*get_stats) (GstMultiFdSink *sink, int fd);
/* vtable */
gboolean (*init) (GstMultiFdSink *sink);
void gst_multifdsink_add (GstMultiFdSink *sink, int fd);
void gst_multifdsink_remove (GstMultiFdSink *sink, int fd);
void gst_multifdsink_clear (GstMultiFdSink *sink);
+GValueArray* gst_multifdsink_get_stats (GstMultiFdSink *sink, int fd);
#ifdef __cplusplus