#define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_TYPE_NONE
#define DEFAULT_BUFFERS_MAX -1
#define DEFAULT_BUFFERS_SOFT_MAX -1
+#define DEFAULT_UNIT_TYPE GST_UNIT_TYPE_BUFFERS
+#define DEFAULT_UNITS_MAX -1
+#define DEFAULT_UNITS_SOFT_MAX -1
#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE
#define DEFAULT_TIMEOUT 0
{
ARG_0,
ARG_PROTOCOL,
+ ARG_BUFFERS_QUEUED,
+ ARG_BYTES_QUEUED,
+ ARG_TIME_QUEUED,
+
+ ARG_UNIT_TYPE,
+ ARG_UNITS_MAX,
+ ARG_UNITS_SOFT_MAX,
+
ARG_BUFFERS_MAX,
ARG_BUFFERS_SOFT_MAX,
- ARG_BUFFERS_QUEUED,
+
ARG_RECOVER_POLICY,
ARG_TIMEOUT,
ARG_BYTES_TO_SERVE,
return recover_policy_type;
}
+#define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type())
+static GType
+gst_unit_type_get_type (void)
+{
+ static GType unit_type_type = 0;
+ static GEnumValue unit_type[] = {
+ {GST_UNIT_TYPE_BUFFERS, "GST_UNIT_TYPE_BUFFERS", "Buffers"},
+ {GST_UNIT_TYPE_BYTES, "GST_UNIT_TYPE_BYTES", "Bytes"},
+ {GST_UNIT_TYPE_TIME, "GST_UNIT_TYPE_TIME", "Time"},
+ {0, NULL, NULL},
+ };
+
+ if (!unit_type_type) {
+ unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type);
+ }
+ return unit_type_type;
+}
+
+#define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type())
+static GType
+gst_client_status_get_type (void)
+{
+ static GType client_status_type = 0;
+ static GEnumValue client_status[] = {
+ {GST_CLIENT_STATUS_OK, "GST_CLIENT_STATUS_OK", "OK"},
+ {GST_CLIENT_STATUS_CLOSED, "GST_CLIENT_STATUS_CLOSED", "Closed"},
+ {GST_CLIENT_STATUS_REMOVED, "GST_CLIENT_STATUS_REMOVED", "Removed"},
+ {GST_CLIENT_STATUS_SLOW, "GST_CLIENT_STATUS_SLOW", "Too slow"},
+ {GST_CLIENT_STATUS_ERROR, "GST_CLIENT_STATUS_ERROR", "Error"},
+ {0, NULL, NULL},
+ };
+
+ if (!client_status_type) {
+ client_status_type =
+ g_enum_register_static ("GstTCPClientStatus", client_status);
+ }
+ return client_status_type;
+}
+
static void gst_multifdsink_base_init (gpointer g_class);
static void gst_multifdsink_class_init (GstMultiFdSinkClass * klass);
static void gst_multifdsink_init (GstMultiFdSink * multifdsink);
g_object_class_install_property (gobject_class, ARG_PROTOCOL,
g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
GST_TYPE_TCP_PROTOCOL_TYPE, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
+
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX,
g_param_spec_int ("buffers-max", "Buffers max",
"max number of buffers to queue (-1 = no limit)", -1, G_MAXINT,
g_param_spec_int ("buffers-soft-max", "Buffers soft max",
"Recover client when going over this limit (-1 = no limit)", -1,
G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE));
+
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNIT_TYPE,
+ g_param_spec_enum ("unit-type", "Units type",
+ "The unit to measure the max/soft-max/queued properties",
+ GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, G_PARAM_READWRITE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_MAX,
+ g_param_spec_int ("units-max", "Units max",
+ "max number of units to queue (-1 = no limit)", -1, G_MAXINT,
+ DEFAULT_UNITS_MAX, G_PARAM_READWRITE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_SOFT_MAX,
+ g_param_spec_int ("units-soft-max", "Units soft max",
+ "Recover client when going over this limit (-1 = no limit)", -1,
+ G_MAXINT, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE));
+
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED,
- g_param_spec_int ("buffers-queued", "Buffers queued",
- "Number of buffers currently queued", 0, G_MAXINT, 0,
+ g_param_spec_uint ("buffers-queued", "Buffers queued",
+ "Number of buffers currently queued", 0, G_MAXUINT, 0,
G_PARAM_READABLE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_QUEUED,
+ g_param_spec_uint ("bytes-queued", "Bytes queued",
+ "Number of bytes currently queued", 0, G_MAXUINT, 0,
+ G_PARAM_READABLE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIME_QUEUED,
+ g_param_spec_uint64 ("time-queued", "Time queued",
+ "Number of time currently queued", 0, G_MAXUINT64, 0,
+ G_PARAM_READABLE));
+
g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY,
g_param_spec_enum ("recover-policy", "Recover Policy",
"How to recover when client reaches the soft max",
gst_multifdsink_signals[SIGNAL_REMOVE] =
g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstMultiFdSinkClass, remove),
- NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
+ NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
gst_multifdsink_signals[SIGNAL_CLEAR] =
g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstMultiFdSinkClass, clear),
gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED] =
g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass,
- client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT,
- G_TYPE_NONE, 1, G_TYPE_INT);
+ client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_BOXED,
+ G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
gobject_class->set_property = gst_multifdsink_set_property;
gobject_class->get_property = gst_multifdsink_get_property;
this->clients = NULL;
this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
- this->buffers_max = DEFAULT_BUFFERS_MAX;
- this->buffers_soft_max = DEFAULT_BUFFERS_SOFT_MAX;
+ this->unit_type = DEFAULT_UNIT_TYPE;
+ this->units_max = DEFAULT_UNITS_MAX;
+ this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
this->recover_policy = DEFAULT_RECOVER_POLICY;
this->timeout = DEFAULT_TIMEOUT;
g_mutex_unlock (sink->clientslock);
g_signal_emit (G_OBJECT (sink),
- gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd);
+ gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, client->status);
/* lock again before we remove the client completely */
g_mutex_lock (sink->clientslock);
/* client can pick a buffer from the global queue */
GstBuffer *buf;
- /* grab buffer and ref, we need to ref since it could be unreffed in
- * another thread */
+ /* grab buffer */
buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
client->bufpos--;
{
gint newbufpos;
- /* FIXME: implement recover procedure here, like moving the position to
- * the next keyframe, dropping buffers back to the beginning of the queue,
- * stuff like that... */
GST_WARNING_OBJECT (sink,
- "client %p with fd %d is lagging, recover using policy %d", client,
- client->fd, sink->recover_policy);
+ "client %p with fd %d is lagging at %d, recover using policy %d", client,
+ client->fd, client->bufpos, sink->recover_policy);
+
switch (sink->recover_policy) {
case GST_RECOVER_POLICY_NONE:
/* do nothing, client will catch up or get kicked out when it reaches
break;
case GST_RECOVER_POLICY_RESYNC_SOFT:
/* move to beginning of soft max */
- newbufpos = sink->buffers_soft_max;
+ newbufpos = sink->units_soft_max;
break;
case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
/* FIXME, find keyframe in buffers */
- newbufpos = sink->buffers_soft_max;
+ newbufpos = sink->units_soft_max;
break;
default:
- newbufpos = sink->buffers_soft_max;
+ /* unknown recovery procedure */
+ newbufpos = sink->units_soft_max;
break;
}
return newbufpos;
GST_LOG_OBJECT (sink, "client %p with fd %d at position %d",
client, client->fd, client->bufpos);
/* check soft max if needed, recover client */
- if (sink->buffers_soft_max > 0 && client->bufpos >= sink->buffers_soft_max) {
+ if (sink->units_soft_max > 0 && client->bufpos >= sink->units_soft_max) {
gint newpos;
newpos = gst_multifdsink_recover_client (sink, client);
}
}
/* check hard max and timeout, remove client */
- if ((sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) ||
+ if ((sink->units_max > 0 && client->bufpos >= sink->units_max) ||
(sink->timeout > 0
&& now - client->last_activity_time > sink->timeout)) {
/* remove client */
max_buffer_usage = client->bufpos;
}
}
- /* nobody is referencing buffers after max_buffer_usage so we can
+ /* nobody is referencing units after max_buffer_usage so we can
* remove them from the queue */
for (i = queuelen - 1; i > max_buffer_usage; i--) {
GstBuffer *old;
multifdsink->protocol = g_value_get_enum (value);
break;
case ARG_BUFFERS_MAX:
- multifdsink->buffers_max = g_value_get_int (value);
+ multifdsink->units_max = g_value_get_int (value);
break;
case ARG_BUFFERS_SOFT_MAX:
- multifdsink->buffers_soft_max = g_value_get_int (value);
+ multifdsink->units_soft_max = g_value_get_int (value);
+ break;
+ case ARG_UNIT_TYPE:
+ multifdsink->unit_type = g_value_get_enum (value);
+ break;
+ case ARG_UNITS_MAX:
+ multifdsink->units_max = g_value_get_int (value);
+ break;
+ case ARG_UNITS_SOFT_MAX:
+ multifdsink->units_soft_max = g_value_get_int (value);
break;
case ARG_RECOVER_POLICY:
multifdsink->recover_policy = g_value_get_enum (value);
g_value_set_enum (value, multifdsink->protocol);
break;
case ARG_BUFFERS_MAX:
- g_value_set_int (value, multifdsink->buffers_max);
+ g_value_set_int (value, multifdsink->units_max);
break;
case ARG_BUFFERS_SOFT_MAX:
- g_value_set_int (value, multifdsink->buffers_soft_max);
+ g_value_set_int (value, multifdsink->units_soft_max);
break;
case ARG_BUFFERS_QUEUED:
- g_value_set_int (value, multifdsink->buffers_queued);
+ g_value_set_uint (value, multifdsink->buffers_queued);
+ break;
+ case ARG_BYTES_QUEUED:
+ g_value_set_uint (value, multifdsink->bytes_queued);
+ break;
+ case ARG_TIME_QUEUED:
+ g_value_set_uint64 (value, multifdsink->time_queued);
+ break;
+ case ARG_UNIT_TYPE:
+ g_value_set_enum (value, multifdsink->unit_type);
+ break;
+ case ARG_UNITS_MAX:
+ g_value_set_int (value, multifdsink->units_max);
+ break;
+ case ARG_UNITS_SOFT_MAX:
+ g_value_set_int (value, multifdsink->units_soft_max);
break;
case ARG_RECOVER_POLICY:
g_value_set_enum (value, multifdsink->recover_policy);