#define NOT_IMPLEMENTED 0
-static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
- GST_PAD_SINK,
- GST_PAD_ALWAYS,
- GST_STATIC_CAPS_ANY);
-
GST_DEBUG_CATEGORY_STATIC (multifdsink_debug);
#define GST_CAT_DEFAULT (multifdsink_debug)
/* this is really arbitrarily chosen */
#define DEFAULT_MODE 1
-#define DEFAULT_BUFFERS_MAX -1
-#define DEFAULT_BUFFERS_SOFT_MAX -1
-#define DEFAULT_UNIT_TYPE GST_TCP_UNIT_TYPE_BUFFERS
-#define DEFAULT_UNITS_MAX -1
-#define DEFAULT_UNITS_SOFT_MAX -1
+#define DEFAULT_UNIT_FORMAT GST_FORMAT_BUFFERS
-#define DEFAULT_BURST_UNIT GST_TCP_UNIT_TYPE_UNDEFINED
+#define DEFAULT_BURST_FORMAT GST_FORMAT_UNDEFINED
#define DEFAULT_BURST_VALUE 0
-#define DEFAULT_QOS_DSCP -1
#define DEFAULT_HANDLE_READ TRUE
enum
PROP_0,
PROP_MODE,
- PROP_UNIT_TYPE,
- PROP_UNITS_MAX,
- PROP_UNITS_SOFT_MAX,
-
- PROP_BUFFERS_MAX,
- PROP_BUFFERS_SOFT_MAX,
+ PROP_UNIT_FORMAT,
- PROP_BURST_UNIT,
+ PROP_BURST_FORMAT,
PROP_BURST_VALUE,
- PROP_QOS_DSCP,
-
PROP_HANDLE_READ,
PROP_NUM_FDS,
return fdset_mode_type;
}
-#define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type())
+#define GST_TYPE_UNIT_FORMAT (gst_unit_format_get_type())
static GType
-gst_unit_type_get_type (void)
+gst_unit_format_get_type (void)
{
- static GType unit_type_type = 0;
- static const GEnumValue unit_type[] = {
- {GST_TCP_UNIT_TYPE_UNDEFINED, "Undefined", "undefined"},
- {GST_TCP_UNIT_TYPE_BUFFERS, "Buffers", "buffers"},
- {GST_TCP_UNIT_TYPE_BYTES, "Bytes", "bytes"},
- {GST_TCP_UNIT_TYPE_TIME, "Time", "time"},
+ static GType unit_format_type = 0;
+ static const GEnumValue unit_format[] = {
+ {GST_TCP_UNIT_FORMAT_UNDEFINED, "Undefined", "undefined"},
+ {GST_TCP_UNIT_FORMAT_BYTES, "Bytes", "bytes"},
+ {GST_TCP_UNIT_FORMAT_TIME, "Time", "time"},
+ {GST_TCP_UNIT_FORMAT_BUFFERS, "Buffers", "buffers"},
{0, NULL, NULL},
};
- if (!unit_type_type) {
- unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type);
+ if (!unit_format_type) {
+ unit_format_type = g_enum_register_static ("GstTCPUnitType", unit_format);
}
- return unit_type_type;
+ return unit_format_type;
}
static void gst_multi_fd_sink_finalize (GObject * object);
static void gst_multi_fd_sink_stop_post (GstMultiHandleSink * mhsink);
static gboolean gst_multi_fd_sink_start_pre (GstMultiHandleSink * mhsink);
static gpointer gst_multi_fd_sink_thread (GstMultiHandleSink * mhsink);
+static void gst_multi_fd_sink_queue_buffer (GstMultiHandleSink * mhsink,
+ GstBuffer * buffer);
+static gboolean gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink *
+ mhsink, GstMultiHandleClient * mhclient, GstBuffer * buffer);
+static int gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client);
static void gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink,
GList * link);
-static GstFlowReturn gst_multi_fd_sink_render (GstBaseSink * bsink,
- GstBuffer * buf);
-
static void gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_multi_fd_sink_get_property (GObject * object, guint prop_id,
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
- GstBaseSinkClass *gstbasesink_class;
GstMultiHandleSinkClass *gstmultihandlesink_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
- gstbasesink_class = (GstBaseSinkClass *) klass;
gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass;
gobject_class->set_property = gst_multi_fd_sink_set_property;
GST_TYPE_FDSET_MODE, DEFAULT_MODE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
- g_param_spec_int ("buffers-max", "Buffers max",
- "max number of buffers to queue for a client (-1 = no limit)", -1,
- G_MAXINT, DEFAULT_BUFFERS_MAX,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX,
- g_param_spec_int ("buffers-soft-max", "Buffers soft max",
- "Recover client when going over this limit (-1 = no limit)", -1,
- G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
- g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
- g_param_spec_enum ("unit-type", "Units type",
+ g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT,
+ g_param_spec_enum ("unit-format", "Units format",
"The unit to measure the max/soft-max/queued properties",
- GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
- g_param_spec_int64 ("units-max", "Units max",
- "max number of units to queue (-1 = no limit)", -1, G_MAXINT64,
- DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX,
- g_param_spec_int64 ("units-soft-max", "Units soft max",
- "Recover client when going over this limit (-1 = no limit)", -1,
- G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
+ GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_BURST_UNIT,
- g_param_spec_enum ("burst-unit", "Burst unit",
+ g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
+ g_param_spec_enum ("burst-format", "Burst format",
"The format of the burst units (when sync-method is burst[[-with]-keyframe])",
- GST_TYPE_UNIT_TYPE, DEFAULT_BURST_UNIT,
+ GST_TYPE_FORMAT, DEFAULT_BURST_FORMAT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
g_param_spec_uint64 ("burst-value", "Burst value",
"The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
- g_param_spec_int ("qos-dscp", "QoS diff srv code point",
- "Quality of Service, differentiated services code point (-1 default)",
- -1, 63, DEFAULT_QOS_DSCP,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstMultiFdSink::handle-read
*
* @gstmultifdsink: the multifdsink element to emit this signal on
* @fd: the file descriptor to add to multifdsink
* @sync: the sync method to use
- * @unit_type_min: the unit-type of @value_min
+ * @unit_format_min: the unit-format of @value_min
* @value_min: the minimum amount of data to burst expressed in
- * @unit_type_min units.
- * @unit_type_max: the unit-type of @value_max
+ * @unit_format_min units.
+ * @unit_format_max: the unit-format of @value_max
* @value_max: the maximum amount of data to burst expressed in
- * @unit_type_max units.
+ * @unit_format_max units.
*
* Hand the given open file descriptor to multifdsink to write to and
* specify the burst parameters for the new connection.
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
add_full), NULL, NULL,
gst_tcp_marshal_VOID__INT_ENUM_INT_UINT64_INT_UINT64,
- G_TYPE_NONE, 6, G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_TYPE,
- G_TYPE_UINT64, GST_TYPE_UNIT_TYPE, G_TYPE_UINT64);
+ G_TYPE_NONE, 6, G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_FORMAT,
+ G_TYPE_UINT64, GST_TYPE_UNIT_FORMAT, G_TYPE_UINT64);
/**
* GstMultiFdSink::remove:
* @gstmultifdsink: the multifdsink element to emit this signal on
client_fd_removed), NULL, NULL, gst_tcp_marshal_VOID__INT,
G_TYPE_NONE, 1, G_TYPE_INT);
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&sinktemplate));
-
gst_element_class_set_details_simple (gstelement_class,
"Multi filedescriptor sink", "Sink/Network",
"Send data to multiple filedescriptors",
"Thomas Vander Stichele <thomas at apestaart dot org>, "
"Wim Taymans <wim@fluendo.com>");
- gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_render);
-
gstmultihandlesink_class->clear_post =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear_post);
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_start_pre);
gstmultihandlesink_class->thread =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_thread);
+ gstmultihandlesink_class->queue_buffer =
+ GST_DEBUG_FUNCPTR (gst_multi_fd_sink_queue_buffer);
+ gstmultihandlesink_class->client_queue_buffer =
+ GST_DEBUG_FUNCPTR (gst_multi_fd_sink_client_queue_buffer);
+ gstmultihandlesink_class->client_get_fd =
+ GST_DEBUG_FUNCPTR (gst_multi_fd_sink_client_get_fd);
gstmultihandlesink_class->remove_client_link =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_client_link);
this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal);
- this->unit_type = DEFAULT_UNIT_TYPE;
- this->units_max = DEFAULT_UNITS_MAX;
- this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
+ this->unit_format = DEFAULT_UNIT_FORMAT;
- this->def_burst_unit = DEFAULT_BURST_UNIT;
+ this->def_burst_format = DEFAULT_BURST_FORMAT;
this->def_burst_value = DEFAULT_BURST_VALUE;
- this->qos_dscp = DEFAULT_QOS_DSCP;
this->handle_read = DEFAULT_HANDLE_READ;
-
- this->header_flags = 0;
}
static void
G_OBJECT_CLASS (parent_class)->finalize (object);
}
-static gint
-setup_dscp_client (GstMultiFdSink * sink, GstTCPClient * client)
+static int
+gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client)
{
- gint tos;
- gint ret;
- union gst_sockaddr
- {
- struct sockaddr sa;
- struct sockaddr_in6 sa_in6;
- struct sockaddr_storage sa_stor;
- } sa;
- socklen_t slen = sizeof (sa);
- gint af;
-
- /* don't touch */
- if (sink->qos_dscp < 0)
- return 0;
-
- if ((ret = getsockname (client->fd.fd, &sa.sa, &slen)) < 0) {
- GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
- return ret;
- }
-
- af = sa.sa.sa_family;
-
- /* if this is an IPv4-mapped address then do IPv4 QoS */
- if (af == AF_INET6) {
-
- GST_DEBUG_OBJECT (sink, "check IP6 socket");
- if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
- GST_DEBUG_OBJECT (sink, "mapped to IPV4");
- af = AF_INET;
- }
- }
-
- /* extract and shift 6 bits of the DSCP */
- tos = (sink->qos_dscp & 0x3f) << 2;
+ GstTCPClient *tclient = (GstTCPClient *) client;
- switch (af) {
- case AF_INET:
- ret = setsockopt (client->fd.fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos));
- break;
- case AF_INET6:
-#ifdef IPV6_TCLASS
- ret =
- setsockopt (client->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, &tos,
- sizeof (tos));
- break;
-#endif
- default:
- ret = 0;
- GST_ERROR_OBJECT (sink, "unsupported AF");
- break;
- }
- if (ret)
- GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
-
- return ret;
-}
-
-
-static void
-setup_dscp (GstMultiFdSink * sink)
-{
- GList *clients, *next;
- GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-
- CLIENTS_LOCK (sink);
- for (clients = mhsink->clients; clients; clients = next) {
- GstTCPClient *client;
-
- client = (GstTCPClient *) clients->data;
- next = g_list_next (clients);
-
- setup_dscp_client (sink, client);
- }
- CLIENTS_UNLOCK (sink);
+ return tclient->fd.fd;
}
/* "add-full" signal implementation */
void
gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
- GstSyncMethod sync_method, GstTCPUnitType min_unit, guint64 min_value,
- GstTCPUnitType max_unit, guint64 max_value)
+ GstSyncMethod sync_method, GstFormat min_format, guint64 min_value,
+ GstFormat max_format, guint64 max_value)
{
GstTCPClient *client;
GstMultiHandleClient *mhclient;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GST_DEBUG_OBJECT (sink, "[fd %5d] adding client, sync_method %d, "
- "min_unit %d, min_value %" G_GUINT64_FORMAT
- ", max_unit %d, max_value %" G_GUINT64_FORMAT, fd, sync_method,
- min_unit, min_value, max_unit, max_value);
+ "min_format %d, min_value %" G_GUINT64_FORMAT
+ ", max_format %d, max_value %" G_GUINT64_FORMAT, fd, sync_method,
+ min_format, min_value, max_format, max_value);
/* do limits check if we can */
- if (min_unit == max_unit) {
+ if (min_format == max_format) {
if (max_value != -1 && min_value != -1 && max_value < min_value)
goto wrong_limits;
}
g_snprintf (mhclient->debug, 30, "[fd %5d]", fd);
client->fd.fd = fd;
- client->burst_min_unit = min_unit;
+ client->burst_min_format = min_format;
client->burst_min_value = min_value;
- client->burst_max_unit = max_unit;
+ client->burst_max_format = max_format;
client->burst_max_value = max_value;
CLIENTS_LOCK (sink);
/* figure out the mode, can't use send() for non sockets */
if (fstat (fd, &statbuf) == 0 && S_ISSOCK (statbuf.st_mode)) {
client->is_socket = TRUE;
- setup_dscp_client (sink, client);
+ gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient);
}
gst_poll_restart (sink->fdset);
GST_WARNING_OBJECT (sink,
"[fd %5d] wrong values min =%" G_GUINT64_FORMAT ", max=%"
G_GUINT64_FORMAT ", unit %d specified when adding client", fd,
- min_value, max_value, min_unit);
+ min_value, max_value, min_format);
return;
}
duplicate:
mhsink = GST_MULTI_HANDLE_SINK (sink);
gst_multi_fd_sink_add_full (sink, fd, mhsink->def_sync_method,
- sink->def_burst_unit, sink->def_burst_value, sink->def_burst_unit, -1);
+ sink->def_burst_format, sink->def_burst_value, sink->def_burst_format,
+ -1);
}
/* "remove" signal implementation */
/* queue the given buffer for the given client */
static gboolean
-gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink,
- GstTCPClient * client, GstBuffer * buffer)
+gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
+ GstMultiHandleClient * mhclient, GstBuffer * buffer)
{
GstCaps *caps;
/* TRUE: send them if the new caps have them */
gboolean send_streamheader = FALSE;
GstStructure *s;
- GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
- GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
+ GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink);
+ GstTCPClient *client = (GstTCPClient *) mhclient;
/* before we queue the buffer, we check if we need to queue streamheader
* buffers (because it's a new client, or because they changed) */
{
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
- switch (sink->unit_type) {
- case GST_TCP_UNIT_TYPE_BUFFERS:
+ switch (sink->unit_format) {
+ case GST_TCP_UNIT_FORMAT_BUFFERS:
return max;
- case GST_TCP_UNIT_TYPE_TIME:
+ case GST_TCP_UNIT_FORMAT_TIME:
{
GstBuffer *buf;
int i;
}
return len + 1;
}
- case GST_TCP_UNIT_TYPE_BYTES:
+ case GST_TCP_UNIT_FORMAT_BYTES:
{
GstBuffer *buf;
int i;
* Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
*/
static gboolean
-assign_value (GstTCPUnitType unit, guint64 value, gint * bytes, gint * buffers,
+assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers,
GstClockTime * time)
{
gboolean res = TRUE;
/* set only the limit of the given format to the given value */
- switch (unit) {
- case GST_TCP_UNIT_TYPE_BUFFERS:
+ switch (format) {
+ case GST_TCP_UNIT_FORMAT_BUFFERS:
*buffers = (gint) value;
break;
- case GST_TCP_UNIT_TYPE_TIME:
+ case GST_TCP_UNIT_FORMAT_TIME:
*time = value;
break;
- case GST_TCP_UNIT_TYPE_BYTES:
+ case GST_TCP_UNIT_FORMAT_BYTES:
*bytes = (gint) value;
break;
- case GST_TCP_UNIT_TYPE_UNDEFINED:
+ case GST_TCP_UNIT_FORMAT_UNDEFINED:
default:
res = FALSE;
break;
* function returns FALSE.
*/
static gboolean
-count_burst_unit (GstMultiFdSink * sink, gint * min_idx,
- GstTCPUnitType min_unit, guint64 min_value, gint * max_idx,
- GstTCPUnitType max_unit, guint64 max_value)
+count_burst_format (GstMultiFdSink * sink, gint * min_idx,
+ GstFormat min_format, guint64 min_value, gint * max_idx,
+ GstFormat max_format, guint64 max_value)
{
gint bytes_min = -1, buffers_min = -1;
gint bytes_max = -1, buffers_max = -1;
GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE;
- assign_value (min_unit, min_value, &bytes_min, &buffers_min, &time_min);
- assign_value (max_unit, max_value, &bytes_max, &buffers_max, &time_max);
+ assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min);
+ assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max);
return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
max_idx, bytes_max, buffers_max, time_max);
* is not enough data, we just send what we have (which is in result).
* We use the max value to limit the search
*/
- ok = count_burst_unit (sink, &result, client->burst_min_unit,
- client->burst_min_value, &max, client->burst_max_unit,
+ ok = count_burst_format (sink, &result, client->burst_min_format,
+ client->burst_min_value, &max, client->burst_max_format,
client->burst_max_value);
GST_DEBUG_OBJECT (sink,
- "[fd %5d] SYNC_METHOD_BURST: burst_unit returned %d, result %d",
+ "[fd %5d] SYNC_METHOD_BURST: burst_format returned %d, result %d",
client->fd.fd, ok, result);
GST_LOG_OBJECT (sink, "min %d, max %d", result, max);
* NEXT_KEYFRAME.
*/
/* gather burst limits */
- count_burst_unit (sink, &min_idx, client->burst_min_unit,
- client->burst_min_value, &max_idx, client->burst_max_unit,
+ count_burst_format (sink, &min_idx, client->burst_min_format,
+ client->burst_min_value, &max_idx, client->burst_max_format,
client->burst_max_value);
GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
* amount of data up 'till min.
*/
/* gather enough data to burst */
- count_burst_unit (sink, &min_idx, client->burst_min_unit,
- client->burst_min_value, &max_idx, client->burst_max_unit,
+ count_burst_format (sink, &min_idx, client->burst_min_format,
+ client->burst_min_value, &max_idx, client->burst_max_format,
client->burst_max_value);
GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
gboolean flushing;
GstClockTime now;
GTimeVal nowtv;
- GstMultiHandleSink *mhsink;
+ GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
+ GstMultiHandleSinkClass *mhsinkclass =
+ GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
- mhsink = GST_MULTI_HANDLE_SINK (sink);
g_get_current_time (&nowtv);
now = GST_TIMEVAL_TO_TIME (nowtv);
fd, client, mhclient->bufpos);
/* queueing a buffer will ref it */
- gst_multi_fd_sink_client_queue_buffer (sink, client, buf);
+ mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
/* need to start from the first byte for this new buffer */
mhclient->bufoffset = 0;
break;
case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
/* move to beginning of soft max */
- newbufpos = get_buffers_max (sink, sink->units_soft_max);
+ newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
break;
case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
/* find keyframe in buffers, we search backwards to find the
* closest keyframe relative to what this client already received. */
newbufpos = MIN (mhsink->bufqueue->len - 1,
- get_buffers_max (sink, sink->units_soft_max) - 1);
+ get_buffers_max (sink, mhsink->units_soft_max) - 1);
while (newbufpos >= 0) {
GstBuffer *buf;
break;
default:
/* unknown recovery procedure */
- newbufpos = get_buffers_max (sink, sink->units_soft_max);
+ newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
break;
}
return newbufpos;
* the select thread that the fd_set changed.
*/
static void
-gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
+gst_multi_fd_sink_queue_buffer (GstMultiHandleSink * mhsink, GstBuffer * buffer)
{
GList *clients, *next;
gint queuelen;
GstClockTime now;
gint max_buffers, soft_max_buffers;
guint cookie;
- GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
+ GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
g_get_current_time (&nowtv);
now = GST_TIMEVAL_TO_TIME (nowtv);
- CLIENTS_LOCK (sink);
+ CLIENTS_LOCK (mhsink);
/* add buffer to queue */
- g_array_prepend_val (mhsink->bufqueue, buf);
+ g_array_prepend_val (mhsink->bufqueue, buffer);
queuelen = mhsink->bufqueue->len;
- if (sink->units_max > 0)
- max_buffers = get_buffers_max (sink, sink->units_max);
+ if (mhsink->units_max > 0)
+ max_buffers = get_buffers_max (sink, mhsink->units_max);
else
max_buffers = -1;
- if (sink->units_soft_max > 0)
- soft_max_buffers = get_buffers_max (sink, sink->units_soft_max);
+ if (mhsink->units_soft_max > 0)
+ soft_max_buffers = get_buffers_max (sink, mhsink->units_soft_max);
else
soft_max_buffers = -1;
GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers,
return NULL;
}
-static GstFlowReturn
-gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
-{
- GstMultiFdSink *sink;
- gboolean in_caps;
-#if 0
- GstCaps *bufcaps, *padcaps;
-#endif
- GstMultiHandleSink *mhsink;
-
- sink = GST_MULTI_FD_SINK (bsink);
- mhsink = GST_MULTI_HANDLE_SINK (sink);
-
- g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink,
- GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING);
-
-#if 0
- /* since we check every buffer for streamheader caps, we need to make
- * sure every buffer has caps set */
- bufcaps = gst_buffer_get_caps (buf);
- padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink));
-
- /* make sure we have caps on the pad */
- if (!padcaps && !bufcaps)
- goto no_caps;
-#endif
-
- /* get HEADER first, code below might mess with the flags */
- in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
-
-#if 0
- /* stamp the buffer with previous caps if no caps set */
- if (!bufcaps) {
- if (!gst_buffer_is_writable (buf)) {
- /* metadata is not writable, copy will be made and original buffer
- * will be unreffed so we need to ref so that we don't lose the
- * buffer in the render method. */
- gst_buffer_ref (buf);
- /* the new buffer is ours only, we keep it out of the scope of this
- * function */
- buf = gst_buffer_make_writable (buf);
- } else {
- /* else the metadata is writable, we ref because we keep the buffer
- * out of the scope of this method */
- gst_buffer_ref (buf);
- }
- /* buffer metadata is writable now, set the caps */
- gst_buffer_set_caps (buf, padcaps);
- } else {
- gst_caps_unref (bufcaps);
-
- /* since we keep this buffer out of the scope of this method */
- gst_buffer_ref (buf);
- }
-#endif
- gst_buffer_ref (buf);
-
- GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %"
- G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
- ", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT,
- buf, in_caps ? "yes" : "no", GST_BUFFER_OFFSET (buf),
- GST_BUFFER_OFFSET_END (buf),
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
- GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
-
- /* if we get HEADER buffers, but the previous buffer was not HEADER,
- * it means we're getting new streamheader buffers, and we should clear
- * the old ones */
- if (in_caps && sink->previous_buffer_in_caps == FALSE) {
- GST_DEBUG_OBJECT (sink,
- "receiving new HEADER buffers, clearing old streamheader");
- g_slist_foreach (mhsink->streamheader, (GFunc) gst_mini_object_unref, NULL);
- g_slist_free (mhsink->streamheader);
- mhsink->streamheader = NULL;
- }
-
- /* save the current in_caps */
- sink->previous_buffer_in_caps = in_caps;
-
- /* if the incoming buffer is marked as IN CAPS, then we assume for now
- * it's a streamheader that needs to be sent to each new client, so we
- * put it on our internal list of streamheader buffers.
- * FIXME: we could check if the buffer's contents are in fact part of the
- * current streamheader.
- *
- * We don't send the buffer to the client, since streamheaders are sent
- * separately when necessary. */
- if (in_caps) {
- GST_DEBUG_OBJECT (sink, "appending HEADER buffer with length %"
- G_GSIZE_FORMAT " to streamheader", gst_buffer_get_size (buf));
- mhsink->streamheader = g_slist_append (mhsink->streamheader, buf);
- } else {
- /* queue the buffer, this is a regular data buffer. */
- gst_multi_fd_sink_queue_buffer (sink, buf);
-
- mhsink->bytes_to_serve += gst_buffer_get_size (buf);
- }
- return GST_FLOW_OK;
-
- /* ERRORS */
-#if 0
-no_caps:
- {
- GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL),
- ("Received first buffer without caps set"));
- return GST_FLOW_NOT_NEGOTIATED;
- }
-#endif
-}
-
static void
gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
case PROP_MODE:
multifdsink->mode = g_value_get_enum (value);
break;
- case PROP_BUFFERS_MAX:
- multifdsink->units_max = g_value_get_int (value);
- break;
- case PROP_BUFFERS_SOFT_MAX:
- multifdsink->units_soft_max = g_value_get_int (value);
- break;
- case PROP_UNIT_TYPE:
- multifdsink->unit_type = g_value_get_enum (value);
- break;
- case PROP_UNITS_MAX:
- multifdsink->units_max = g_value_get_int64 (value);
- break;
- case PROP_UNITS_SOFT_MAX:
- multifdsink->units_soft_max = g_value_get_int64 (value);
+ case PROP_UNIT_FORMAT:
+ multifdsink->unit_format = g_value_get_enum (value);
break;
- case PROP_BURST_UNIT:
- multifdsink->def_burst_unit = g_value_get_enum (value);
+ case PROP_BURST_FORMAT:
+ multifdsink->def_burst_format = g_value_get_enum (value);
break;
case PROP_BURST_VALUE:
multifdsink->def_burst_value = g_value_get_uint64 (value);
break;
- case PROP_QOS_DSCP:
- multifdsink->qos_dscp = g_value_get_int (value);
- setup_dscp (multifdsink);
- break;
case PROP_HANDLE_READ:
multifdsink->handle_read = g_value_get_boolean (value);
break;
case PROP_MODE:
g_value_set_enum (value, multifdsink->mode);
break;
- case PROP_BUFFERS_MAX:
- g_value_set_int (value, multifdsink->units_max);
- break;
- case PROP_BUFFERS_SOFT_MAX:
- g_value_set_int (value, multifdsink->units_soft_max);
- break;
- case PROP_UNIT_TYPE:
- g_value_set_enum (value, multifdsink->unit_type);
+ case PROP_UNIT_FORMAT:
+ g_value_set_enum (value, multifdsink->unit_format);
break;
- case PROP_UNITS_MAX:
- g_value_set_int64 (value, multifdsink->units_max);
- break;
- case PROP_UNITS_SOFT_MAX:
- g_value_set_int64 (value, multifdsink->units_soft_max);
- break;
- case PROP_BURST_UNIT:
- g_value_set_enum (value, multifdsink->def_burst_unit);
+ case PROP_BURST_FORMAT:
+ g_value_set_enum (value, multifdsink->def_burst_format);
break;
case PROP_BURST_VALUE:
g_value_set_uint64 (value, multifdsink->def_burst_value);
break;
- case PROP_QOS_DSCP:
- g_value_set_int (value, multifdsink->qos_dscp);
- break;
case PROP_HANDLE_READ:
g_value_set_boolean (value, multifdsink->handle_read);
break;
/**
* GstTCPUnitType:
- * @GST_TCP_UNIT_TYPE_UNDEFINED: undefined
- * @GST_TCP_UNIT_TYPE_BUFFERS : buffers
- * @GST_TCP_UNIT_TYPE_TIME : timeunits (in nanoseconds)
- * @GST_TCP_UNIT_TYPE_BYTES : bytes
+ * @GST_TCP_UNIT_FORMAT_UNDEFINED: undefined
+ * @GST_TCP_UNIT_FORMAT_BUFFERS : buffers
+ * @GST_TCP_UNIT_FORMAT_TIME : timeunits (in nanoseconds)
+ * @GST_TCP_UNIT_FORMAT_BYTES : bytes
*
* The units used to specify limits.
*/
typedef enum
{
- GST_TCP_UNIT_TYPE_UNDEFINED,
- GST_TCP_UNIT_TYPE_BUFFERS,
- GST_TCP_UNIT_TYPE_TIME,
- GST_TCP_UNIT_TYPE_BYTES
+ GST_TCP_UNIT_FORMAT_UNDEFINED,
+ GST_TCP_UNIT_FORMAT_BUFFERS,
+ GST_TCP_UNIT_FORMAT_TIME,
+ GST_TCP_UNIT_FORMAT_BYTES,
} GstTCPUnitType;
/* structure for a client
/* method to sync client when connecting */
GstSyncMethod sync_method;
- GstTCPUnitType burst_min_unit;
+ GstFormat burst_min_format;
guint64 burst_min_value;
- GstTCPUnitType burst_max_unit;
+ GstFormat burst_max_format;
guint64 burst_max_value;
} GstTCPClient;
gint mode;
GstPoll *fdset;
- gboolean previous_buffer_in_caps;
-
- guint mtu;
- gint qos_dscp;
gboolean handle_read;
/* these values are used to check if a client is reading fast
* enough and to control receovery */
- GstTCPUnitType unit_type;/* the type of the units */
- gint64 units_max; /* max units to queue for a client */
- gint64 units_soft_max; /* max units a client can lag before recovery starts */
+ GstFormat unit_format;/* the type of the units */
- GstTCPUnitType def_burst_unit;
+ GstFormat def_burst_format;
guint64 def_burst_value;
guint8 header_flags;
/* element methods */
void (*add) (GstMultiFdSink *sink, int fd);
void (*add_full) (GstMultiFdSink *sink, int fd, GstSyncMethod sync,
- GstTCPUnitType format, guint64 value,
- GstTCPUnitType max_unit, guint64 max_value);
+ GstFormat format, guint64 value,
+ GstFormat max_format, guint64 max_value);
void (*remove) (GstMultiFdSink *sink, int fd);
void (*remove_flush) (GstMultiFdSink *sink, int fd);
void (*clear) (GstMultiFdSink *sink);
void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd);
void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync,
- GstTCPUnitType min_unit, guint64 min_value,
- GstTCPUnitType max_unit, guint64 max_value);
+ GstFormat min_format, guint64 min_value,
+ GstFormat max_format, guint64 max_value);
void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd);
void gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, int fd);
void gst_multi_fd_sink_clear (GstMultiHandleSink *sink);
/* this is really arbitrarily chosen */
-#define DEFAULT_MODE 1
#define DEFAULT_BUFFERS_MAX -1
#define DEFAULT_BUFFERS_SOFT_MAX -1
#define DEFAULT_TIME_MIN -1
#define DEFAULT_BYTES_MIN -1
#define DEFAULT_BUFFERS_MIN -1
-#define DEFAULT_UNIT_TYPE GST_FORMAT_BUFFERS
+#define DEFAULT_UNIT_FORMAT GST_FORMAT_BUFFERS
#define DEFAULT_UNITS_MAX -1
#define DEFAULT_UNITS_SOFT_MAX -1
#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE
enum
{
PROP_0,
-#if 0
- PROP_MODE,
-#endif
PROP_BUFFERS_QUEUED,
PROP_BYTES_QUEUED,
PROP_TIME_QUEUED,
#if 0
- PROP_UNIT_TYPE,
+ PROP_UNIT_FORMAT,
+#endif
PROP_UNITS_MAX,
PROP_UNITS_SOFT_MAX,
PROP_BUFFERS_MAX,
PROP_BUFFERS_SOFT_MAX,
-#endif
PROP_TIME_MIN,
PROP_BYTES_MIN,
#if 0
PROP_BURST_FORMAT,
PROP_BURST_VALUE,
+#endif
PROP_QOS_DSCP,
-#endif
PROP_RESEND_STREAMHEADER,
GIOCondition condition, GstMultiHandleSink * sink);
#endif
-#if 0
static GstFlowReturn gst_multi_handle_sink_render (GstBaseSink * bsink,
GstBuffer * buf);
+#if 0
static gboolean gst_multi_handle_sink_unlock (GstBaseSink * bsink);
static gboolean gst_multi_handle_sink_unlock_stop (GstBaseSink * bsink);
#endif
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
-#if 0
GstBaseSinkClass *gstbasesink_class;
-#endif
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
-#if 0
gstbasesink_class = (GstBaseSinkClass *) klass;
-#endif
gobject_class->set_property = gst_multi_handle_sink_set_property;
gobject_class->get_property = gst_multi_handle_sink_get_property;
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
#if 0
- g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
- g_param_spec_enum ("unit-type", "Units type",
+ g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT,
+ g_param_spec_enum ("unit-format", "Units format",
"The unit to measure the max/soft-max/queued properties",
- GST_TYPE_FORMAT, DEFAULT_UNIT_TYPE,
+ GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
g_param_spec_int64 ("units-max", "Units max",
g_param_spec_uint64 ("burst-value", "Burst value",
"The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+#endif
g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
g_param_spec_int ("qos-dscp", "QoS diff srv code point",
-1, 63, DEFAULT_QOS_DSCP,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+#if 0
/**
* GstMultiHandleSink::handle-read
*
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_multi_handle_sink_change_state);
-#if 0
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_render);
+#if 0
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_unlock);
gstbasesink_class->unlock_stop =
GST_DEBUG_FUNCPTR (gst_multi_handle_sink_unlock_stop);
#endif
this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
- this->unit_type = DEFAULT_UNIT_TYPE;
+ this->unit_format = DEFAULT_UNIT_FORMAT;
this->units_max = DEFAULT_UNITS_MAX;
this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
this->time_min = DEFAULT_TIME_MIN;
this->timeout = DEFAULT_TIMEOUT;
this->def_sync_method = DEFAULT_SYNC_METHOD;
+
+#if 0
this->def_burst_format = DEFAULT_BURST_FORMAT;
this->def_burst_value = DEFAULT_BURST_VALUE;
+#endif
this->qos_dscp = DEFAULT_QOS_DSCP;
this->resend_streamheader = DEFAULT_RESEND_STREAMHEADER;
-
- this->header_flags = 0;
- this->cancellable = g_cancellable_new ();
}
static void
G_OBJECT_CLASS (parent_class)->finalize (object);
}
-#if 0
-static gint
-setup_dscp_client (GstMultiHandleSink * sink, GstSocketClient * client)
+gint
+gst_multi_handle_sink_setup_dscp_client (GstMultiHandleSink * sink,
+ GstMultiHandleClient * client)
{
#ifndef IP_TOS
return 0;
} sa;
socklen_t slen = sizeof (sa);
gint af;
+ GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (sink);
/* don't touch */
if (sink->qos_dscp < 0)
return 0;
- fd = g_socket_get_fd (client->socket);
+ fd = mhsinkclass->client_get_fd (client);
if ((ret = getsockname (fd, &sa.sa, &slen)) < 0) {
GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
#endif
}
-static void
-setup_dscp (GstMultiHandleSink * sink)
-{
- GList *clients;
-
- CLIENTS_LOCK (sink);
- for (clients = sink->clients; clients; clients = clients->next) {
- GstSocketClient *client;
-
- client = clients->data;
-
- setup_dscp_client (sink, client);
- }
- CLIENTS_UNLOCK (sink);
-}
-#endif
-
void
gst_multi_handle_sink_client_init (GstMultiHandleClient * client,
GstSyncMethod sync_method)
client->last_activity_time = client->connect_time;
}
+void
+gst_multi_handle_sink_setup_dscp (GstMultiHandleSink * mhsink)
+{
+ GList *clients;
+
+ CLIENTS_LOCK (mhsink);
+ for (clients = mhsink->clients; clients; clients = clients->next) {
+ GstMultiHandleClient *client;
+
+ client = clients->data;
+
+ gst_multi_handle_sink_setup_dscp_client (mhsink, client);
+ }
+ CLIENTS_UNLOCK (mhsink);
+}
+
+
#if 0
/* "add-full" signal implementation */
void
static gint
get_buffers_max (GstMultiHandleSink * sink, gint64 max)
{
- switch (sink->unit_type) {
+ switch (sink->unit_format) {
case GST_FORMAT_BUFFERS:
return max;
case GST_FORMAT_TIME:
}
#endif
-#if 0
static GstFlowReturn
gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf)
{
- GstMultiHandleSink *sink;
gboolean in_caps;
#if 0
GstCaps *bufcaps, *padcaps;
#endif
- sink = GST_MULTI_HANDLE_SINK (bsink);
+ GstMultiHandleSink *sink = GST_MULTI_HANDLE_SINK (bsink);
+ GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (sink);
g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink,
- GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_WRONG_STATE);
+ GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING);
#if 0
/* since we check every buffer for streamheader caps, we need to make
#endif
/* get IN_CAPS first, code below might mess with the flags */
- in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
+ in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
#if 0
/* stamp the buffer with previous caps if no caps set */
gst_buffer_ref (buf);
}
#endif
+ gst_buffer_ref (buf);
GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %"
G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
* the old ones */
if (in_caps && sink->previous_buffer_in_caps == FALSE) {
GST_DEBUG_OBJECT (sink,
- "receiving new IN_CAPS buffers, clearing old streamheader");
+ "receiving new HEADER buffers, clearing old streamheader");
g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (sink->streamheader);
sink->streamheader = NULL;
* We don't send the buffer to the client, since streamheaders are sent
* separately when necessary. */
if (in_caps) {
- GST_DEBUG_OBJECT (sink, "appending IN_CAPS buffer with length %"
+ GST_DEBUG_OBJECT (sink, "appending HEADER buffer with length %"
G_GSIZE_FORMAT " to streamheader", gst_buffer_get_size (buf));
sink->streamheader = g_slist_append (sink->streamheader, buf);
} else {
/* queue the buffer, this is a regular data buffer. */
- gst_multi_handle_sink_queue_buffer (sink, buf);
+ mhsinkclass->queue_buffer (sink, buf);
sink->bytes_to_serve += gst_buffer_get_size (buf);
}
}
#endif
}
-#endif
static void
gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
multihandlesink = GST_MULTI_HANDLE_SINK (object);
switch (prop_id) {
-#if 0
case PROP_BUFFERS_MAX:
multihandlesink->units_max = g_value_get_int (value);
break;
case PROP_BUFFERS_SOFT_MAX:
multihandlesink->units_soft_max = g_value_get_int (value);
break;
-#endif
case PROP_TIME_MIN:
multihandlesink->time_min = g_value_get_int64 (value);
break;
multihandlesink->buffers_min = g_value_get_int (value);
break;
#if 0
- case PROP_UNIT_TYPE:
- multihandlesink->unit_type = g_value_get_enum (value);
+ case PROP_UNIT_FORMAT:
+ multihandlesink->unit_format = g_value_get_enum (value);
break;
case PROP_UNITS_MAX:
multihandlesink->units_max = g_value_get_int64 (value);
case PROP_BURST_VALUE:
multihandlesink->def_burst_value = g_value_get_uint64 (value);
break;
+#endif
case PROP_QOS_DSCP:
multihandlesink->qos_dscp = g_value_get_int (value);
- setup_dscp (multihandlesink);
+ gst_multi_handle_sink_setup_dscp (multihandlesink);
break;
+
case PROP_RESEND_STREAMHEADER:
multihandlesink->resend_streamheader = g_value_get_boolean (value);
break;
-#endif
+
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
multihandlesink = GST_MULTI_HANDLE_SINK (object);
switch (prop_id) {
-#if 0
case PROP_BUFFERS_MAX:
g_value_set_int (value, multihandlesink->units_max);
break;
case PROP_BUFFERS_SOFT_MAX:
g_value_set_int (value, multihandlesink->units_soft_max);
break;
-#endif
case PROP_TIME_MIN:
g_value_set_int64 (value, multihandlesink->time_min);
break;
g_value_set_uint64 (value, multihandlesink->time_queued);
break;
#if 0
- case PROP_UNIT_TYPE:
- g_value_set_enum (value, multihandlesink->unit_type);
+ case PROP_UNIT_FORMAT:
+ g_value_set_enum (value, multihandlesink->unit_format);
break;
case PROP_UNITS_MAX:
g_value_set_int64 (value, multihandlesink->units_max);
case PROP_BURST_VALUE:
g_value_set_uint64 (value, multihandlesink->def_burst_value);
break;
+#endif
case PROP_QOS_DSCP:
g_value_set_int (value, multihandlesink->qos_dscp);
break;
case PROP_RESEND_STREAMHEADER:
g_value_set_boolean (value, multihandlesink->resend_streamheader);
break;
+#if 0
case PROP_NUM_SOCKETS:
g_value_set_uint (value,
g_hash_table_size (multihandlesink->socket_hash));
gboolean is_sync_frame (GstMultiHandleSink * sink, GstBuffer * buffer);
gboolean gst_multi_handle_sink_stop (GstBaseSink * bsink);
gboolean gst_multi_handle_sink_start (GstBaseSink * bsink);
+void gst_multi_handle_sink_setup_dscp (GstMultiHandleSink * mhsink);
+gint gst_multi_handle_sink_setup_dscp_client (GstMultiHandleSink * sink, GstMultiHandleClient * client);
/**
* GstMultiHandleSink:
GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
gboolean previous_buffer_in_caps;
- guint mtu;
gint qos_dscp;
GArray *bufqueue; /* global queue of buffers */
/* these values are used to check if a client is reading fast
* enough and to control receovery */
- GstFormat unit_type;/* the format of the units */
+ GstFormat unit_format;/* the format of the units */
gint64 units_max; /* max units to queue for a client */
gint64 units_soft_max; /* max units a client can lag before recovery starts */
GstRecoverPolicy recover_policy;
void (*stop_post) (GstMultiHandleSink *sink);
gboolean (*start_pre) (GstMultiHandleSink *sink);
gpointer (*thread) (GstMultiHandleSink *sink);
+ void (*queue_buffer) (GstMultiHandleSink *sink,
+ GstBuffer *buffer);
+ gboolean (*client_queue_buffer)
+ (GstMultiHandleSink *sink,
+ GstMultiHandleClient *client,
+ GstBuffer *buffer);
+ int (*client_get_fd)
+ (GstMultiHandleClient *client);
+
GstStructure* (*get_stats) (GstMultiHandleSink *sink, GSocket *socket);
void (*remove_client_link) (GstMultiHandleSink * sink, GList * link);
#define NOT_IMPLEMENTED 0
-static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
- GST_PAD_SINK,
- GST_PAD_ALWAYS,
- GST_STATIC_CAPS_ANY);
-
GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug);
#define GST_CAT_DEFAULT (multisocketsink_debug)
/* this is really arbitrarily chosen */
-#define DEFAULT_MODE 1
-#define DEFAULT_BUFFERS_MAX -1
-#define DEFAULT_BUFFERS_SOFT_MAX -1
-#define DEFAULT_UNIT_TYPE GST_FORMAT_BUFFERS
-#define DEFAULT_UNITS_MAX -1
-#define DEFAULT_UNITS_SOFT_MAX -1
+#define DEFAULT_UNIT_FORMAT GST_FORMAT_BUFFERS
#define DEFAULT_BURST_FORMAT GST_FORMAT_UNDEFINED
#define DEFAULT_BURST_VALUE 0
-#define DEFAULT_QOS_DSCP -1
-
enum
{
PROP_0,
- PROP_MODE,
- PROP_UNIT_TYPE,
- PROP_UNITS_MAX,
- PROP_UNITS_SOFT_MAX,
-
- PROP_BUFFERS_MAX,
- PROP_BUFFERS_SOFT_MAX,
+ PROP_UNIT_FORMAT,
PROP_BURST_FORMAT,
PROP_BURST_VALUE,
- PROP_QOS_DSCP,
-
PROP_NUM_SOCKETS,
PROP_LAST
static void gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink);
static gboolean gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink);
static gpointer gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink);
+static void gst_multi_socket_sink_queue_buffer (GstMultiHandleSink * mhsink,
+ GstBuffer * buffer);
+static gboolean gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink *
+ mhsink, GstMultiHandleClient * mhclient, GstBuffer * buffer);
+static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client);
+
static void gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
GList * link);
static gboolean gst_multi_socket_sink_socket_condition (GSocket * socket,
GIOCondition condition, GstMultiSocketSink * sink);
-static GstFlowReturn gst_multi_socket_sink_render (GstBaseSink * bsink,
- GstBuffer * buf);
static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink);
static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink);
gobject_class->get_property = gst_multi_socket_sink_get_property;
gobject_class->finalize = gst_multi_socket_sink_finalize;
- g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
- g_param_spec_int ("buffers-max", "Buffers max",
- "max number of buffers to queue for a client (-1 = no limit)", -1,
- G_MAXINT, DEFAULT_BUFFERS_MAX,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX,
- g_param_spec_int ("buffers-soft-max", "Buffers soft max",
- "Recover client when going over this limit (-1 = no limit)", -1,
- G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
- g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
- g_param_spec_enum ("unit-type", "Units type",
+ g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT,
+ g_param_spec_enum ("unit-format", "Units format",
"The unit to measure the max/soft-max/queued properties",
- GST_TYPE_FORMAT, DEFAULT_UNIT_TYPE,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
- g_param_spec_int64 ("units-max", "Units max",
- "max number of units to queue (-1 = no limit)", -1, G_MAXINT64,
- DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX,
- g_param_spec_int64 ("units-soft-max", "Units soft max",
- "Recover client when going over this limit (-1 = no limit)", -1,
- G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
+ GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
"The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
- g_param_spec_int ("qos-dscp", "QoS diff srv code point",
- "Quality of Service, differentiated services code point (-1 default)",
- -1, 63, DEFAULT_QOS_DSCP,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
g_object_class_install_property (gobject_class, PROP_NUM_SOCKETS,
g_param_spec_uint ("num-sockets", "Number of sockets",
"The current number of client sockets",
client_socket_removed), NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
G_TYPE_NONE, 1, G_TYPE_SOCKET);
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&sinktemplate));
-
gst_element_class_set_details_simple (gstelement_class,
"Multi socket sink", "Sink/Network",
"Send data to multiple sockets",
"Wim Taymans <wim@fluendo.com>, "
"Sebastian Dröge <sebastian.droege@collabora.co.uk>");
- gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_render);
-
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock);
gstbasesink_class->unlock_stop =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop);
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_start_pre);
gstmultihandlesink_class->thread =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_thread);
+ gstmultihandlesink_class->queue_buffer =
+ GST_DEBUG_FUNCPTR (gst_multi_socket_sink_queue_buffer);
+ gstmultihandlesink_class->client_queue_buffer =
+ GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_queue_buffer);
+ gstmultihandlesink_class->client_get_fd =
+ GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_get_fd);
gstmultihandlesink_class->remove_client_link =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_client_link);
{
this->socket_hash = g_hash_table_new (g_direct_hash, g_int_equal);
- this->unit_type = DEFAULT_UNIT_TYPE;
- this->units_max = DEFAULT_UNITS_MAX;
- this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
+ this->unit_format = DEFAULT_UNIT_FORMAT;
this->def_burst_format = DEFAULT_BURST_FORMAT;
this->def_burst_value = DEFAULT_BURST_VALUE;
- this->qos_dscp = DEFAULT_QOS_DSCP;
-
- this->header_flags = 0;
this->cancellable = g_cancellable_new ();
}
G_OBJECT_CLASS (parent_class)->finalize (object);
}
-static gint
-setup_dscp_client (GstMultiSocketSink * sink, GstSocketClient * client)
+static int
+gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client)
{
-#ifndef IP_TOS
- return 0;
-#else
- gint tos;
- gint ret;
- int fd;
- union gst_sockaddr
- {
- struct sockaddr sa;
- struct sockaddr_in6 sa_in6;
- struct sockaddr_storage sa_stor;
- } sa;
- socklen_t slen = sizeof (sa);
- gint af;
-
- /* don't touch */
- if (sink->qos_dscp < 0)
- return 0;
-
- fd = g_socket_get_fd (client->socket);
-
- if ((ret = getsockname (fd, &sa.sa, &slen)) < 0) {
- GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
- return ret;
- }
+ GstSocketClient *msclient = (GstSocketClient *) client;
- af = sa.sa.sa_family;
-
- /* if this is an IPv4-mapped address then do IPv4 QoS */
- if (af == AF_INET6) {
-
- GST_DEBUG_OBJECT (sink, "check IP6 socket");
- if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
- GST_DEBUG_OBJECT (sink, "mapped to IPV4");
- af = AF_INET;
- }
- }
-
- /* extract and shift 6 bits of the DSCP */
- tos = (sink->qos_dscp & 0x3f) << 2;
-
- switch (af) {
- case AF_INET:
- ret = setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos));
- break;
- case AF_INET6:
-#ifdef IPV6_TCLASS
- ret = setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos));
- break;
-#endif
- default:
- ret = 0;
- GST_ERROR_OBJECT (sink, "unsupported AF");
- break;
- }
- if (ret)
- GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
-
- return ret;
-#endif
-}
-
-static void
-setup_dscp (GstMultiSocketSink * sink)
-{
- GList *clients;
- GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-
- CLIENTS_LOCK (sink);
- for (clients = mhsink->clients; clients; clients = clients->next) {
- GstSocketClient *client;
-
- client = clients->data;
-
- setup_dscp_client (sink, client);
- }
- CLIENTS_UNLOCK (sink);
+ return g_socket_get_fd (msclient->socket);
}
/* "add-full" signal implementation */
g_source_attach (client->source, sink->main_context);
}
- setup_dscp_client (sink, client);
+ gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient);
CLIENTS_UNLOCK (sink);
/* queue the given buffer for the given client */
static gboolean
-gst_multi_socket_sink_client_queue_buffer (GstMultiSocketSink * sink,
- GstSocketClient * client, GstBuffer * buffer)
+gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
+ GstMultiHandleClient * mhclient, GstBuffer * buffer)
{
- GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
- GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
+ GstSocketClient *client = (GstSocketClient *) mhclient;
+ GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
GstCaps *caps;
/* TRUE: send them if the new caps have them */
{
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
- switch (sink->unit_type) {
+ switch (sink->unit_format) {
case GST_FORMAT_BUFFERS:
return max;
case GST_FORMAT_TIME:
GstClockTime now;
GTimeVal nowtv;
GError *err = NULL;
- GstMultiHandleSink *mhsink;
+ GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
+ GstMultiHandleSinkClass *mhsinkclass =
+ GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
- mhsink = GST_MULTI_HANDLE_SINK (sink);
g_get_current_time (&nowtv);
now = GST_TIMEVAL_TO_TIME (nowtv);
socket, client, mhclient->bufpos);
/* queueing a buffer will ref it */
- gst_multi_socket_sink_client_queue_buffer (sink, client, buf);
+ mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
/* need to start from the first byte for this new buffer */
mhclient->bufoffset = 0;
break;
case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
/* move to beginning of soft max */
- newbufpos = get_buffers_max (sink, sink->units_soft_max);
+ newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
break;
case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
/* find keyframe in buffers, we search backwards to find the
* closest keyframe relative to what this client already received. */
newbufpos = MIN (mhsink->bufqueue->len - 1,
- get_buffers_max (sink, sink->units_soft_max) - 1);
+ get_buffers_max (sink, mhsink->units_soft_max) - 1);
while (newbufpos >= 0) {
GstBuffer *buf;
break;
default:
/* unknown recovery procedure */
- newbufpos = get_buffers_max (sink, sink->units_soft_max);
+ newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
break;
}
return newbufpos;
* the select thread that the fd_set changed.
*/
static void
-gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf)
+gst_multi_socket_sink_queue_buffer (GstMultiHandleSink * mhsink,
+ GstBuffer * buf)
{
GList *clients, *next;
gint queuelen;
GstClockTime now;
gint max_buffers, soft_max_buffers;
guint cookie;
- GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
+ GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
g_array_prepend_val (mhsink->bufqueue, buf);
queuelen = mhsink->bufqueue->len;
- if (sink->units_max > 0)
- max_buffers = get_buffers_max (sink, sink->units_max);
+ if (mhsink->units_max > 0)
+ max_buffers = get_buffers_max (sink, mhsink->units_max);
else
max_buffers = -1;
- if (sink->units_soft_max > 0)
- soft_max_buffers = get_buffers_max (sink, sink->units_soft_max);
+ if (mhsink->units_soft_max > 0)
+ soft_max_buffers = get_buffers_max (sink, mhsink->units_soft_max);
else
soft_max_buffers = -1;
GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers,
return NULL;
}
-static GstFlowReturn
-gst_multi_socket_sink_render (GstBaseSink * bsink, GstBuffer * buf)
-{
- GstMultiSocketSink *sink;
- gboolean in_caps;
-#if 0
- GstCaps *bufcaps, *padcaps;
-#endif
- GstMultiHandleSink *mhsink;
-
- sink = GST_MULTI_SOCKET_SINK (bsink);
- mhsink = GST_MULTI_HANDLE_SINK (sink);
-
- g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink,
- GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING);
-
-#if 0
- /* since we check every buffer for streamheader caps, we need to make
- * sure every buffer has caps set */
- bufcaps = gst_buffer_get_caps (buf);
- padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink));
-
- /* make sure we have caps on the pad */
- if (!padcaps && !bufcaps)
- goto no_caps;
-#endif
-
- /* get HEADER first, code below might mess with the flags */
- in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
-
-#if 0
- /* stamp the buffer with previous caps if no caps set */
- if (!bufcaps) {
- if (!gst_buffer_is_writable (buf)) {
- /* metadata is not writable, copy will be made and original buffer
- * will be unreffed so we need to ref so that we don't lose the
- * buffer in the render method. */
- gst_buffer_ref (buf);
- /* the new buffer is ours only, we keep it out of the scope of this
- * function */
- buf = gst_buffer_make_writable (buf);
- } else {
- /* else the metadata is writable, we ref because we keep the buffer
- * out of the scope of this method */
- gst_buffer_ref (buf);
- }
- /* buffer metadata is writable now, set the caps */
- gst_buffer_set_caps (buf, padcaps);
- } else {
- gst_caps_unref (bufcaps);
-
- /* since we keep this buffer out of the scope of this method */
- gst_buffer_ref (buf);
- }
-#endif
- gst_buffer_ref (buf);
-
- GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %"
- G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
- ", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT,
- buf, in_caps ? "yes" : "no", GST_BUFFER_OFFSET (buf),
- GST_BUFFER_OFFSET_END (buf),
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
- GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
-
- /* if we get HEADER buffers, but the previous buffer was not HEADER,
- * it means we're getting new streamheader buffers, and we should clear
- * the old ones */
- if (in_caps && sink->previous_buffer_in_caps == FALSE) {
- GST_DEBUG_OBJECT (sink,
- "receiving new HEADER buffers, clearing old streamheader");
- g_slist_foreach (mhsink->streamheader, (GFunc) gst_mini_object_unref, NULL);
- g_slist_free (mhsink->streamheader);
- mhsink->streamheader = NULL;
- }
-
- /* save the current in_caps */
- sink->previous_buffer_in_caps = in_caps;
-
- /* if the incoming buffer is marked as IN CAPS, then we assume for now
- * it's a streamheader that needs to be sent to each new client, so we
- * put it on our internal list of streamheader buffers.
- * FIXME: we could check if the buffer's contents are in fact part of the
- * current streamheader.
- *
- * We don't send the buffer to the client, since streamheaders are sent
- * separately when necessary. */
- if (in_caps) {
- GST_DEBUG_OBJECT (sink, "appending HEADER buffer with length %"
- G_GSIZE_FORMAT " to streamheader", gst_buffer_get_size (buf));
- mhsink->streamheader = g_slist_append (mhsink->streamheader, buf);
- } else {
- /* queue the buffer, this is a regular data buffer. */
- gst_multi_socket_sink_queue_buffer (sink, buf);
-
- mhsink->bytes_to_serve += gst_buffer_get_size (buf);
- }
- return GST_FLOW_OK;
-
- /* ERRORS */
-#if 0
-no_caps:
- {
- GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL),
- ("Received first buffer without caps set"));
- return GST_FLOW_NOT_NEGOTIATED;
- }
-#endif
-}
-
static void
gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
multisocketsink = GST_MULTI_SOCKET_SINK (object);
switch (prop_id) {
- case PROP_BUFFERS_MAX:
- multisocketsink->units_max = g_value_get_int (value);
- break;
- case PROP_BUFFERS_SOFT_MAX:
- multisocketsink->units_soft_max = g_value_get_int (value);
- break;
- case PROP_UNIT_TYPE:
- multisocketsink->unit_type = g_value_get_enum (value);
- break;
- case PROP_UNITS_MAX:
- multisocketsink->units_max = g_value_get_int64 (value);
- break;
- case PROP_UNITS_SOFT_MAX:
- multisocketsink->units_soft_max = g_value_get_int64 (value);
+ case PROP_UNIT_FORMAT:
+ multisocketsink->unit_format = g_value_get_enum (value);
break;
case PROP_BURST_FORMAT:
multisocketsink->def_burst_format = g_value_get_enum (value);
case PROP_BURST_VALUE:
multisocketsink->def_burst_value = g_value_get_uint64 (value);
break;
- case PROP_QOS_DSCP:
- multisocketsink->qos_dscp = g_value_get_int (value);
- setup_dscp (multisocketsink);
- break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
multisocketsink = GST_MULTI_SOCKET_SINK (object);
switch (prop_id) {
- case PROP_BUFFERS_MAX:
- g_value_set_int (value, multisocketsink->units_max);
- break;
- case PROP_BUFFERS_SOFT_MAX:
- g_value_set_int (value, multisocketsink->units_soft_max);
- break;
- case PROP_UNIT_TYPE:
- g_value_set_enum (value, multisocketsink->unit_type);
- break;
- case PROP_UNITS_MAX:
- g_value_set_int64 (value, multisocketsink->units_max);
- break;
- case PROP_UNITS_SOFT_MAX:
- g_value_set_int64 (value, multisocketsink->units_soft_max);
+ case PROP_UNIT_FORMAT:
+ g_value_set_enum (value, multisocketsink->unit_format);
break;
case PROP_BURST_FORMAT:
g_value_set_enum (value, multisocketsink->def_burst_format);
case PROP_BURST_VALUE:
g_value_set_uint64 (value, multisocketsink->def_burst_value);
break;
- case PROP_QOS_DSCP:
- g_value_set_int (value, multisocketsink->qos_dscp);
- break;
case PROP_NUM_SOCKETS:
g_value_set_uint (value,
g_hash_table_size (multisocketsink->socket_hash));
gboolean previous_buffer_in_caps;
guint mtu;
- gint qos_dscp;
/* these values are used to check if a client is reading fast
* enough and to control receovery */
- GstFormat unit_type;/* the format of the units */
- gint64 units_max; /* max units to queue for a client */
- gint64 units_soft_max; /* max units a client can lag before recovery starts */
+ GstFormat unit_format;/* the format of the units */
GstFormat def_burst_format;
guint64 def_burst_value;
/* element methods */
void (*add) (GstMultiSocketSink *sink, GSocket *socket);
void (*add_full) (GstMultiSocketSink *sink, GSocket *socket, GstSyncMethod sync,
- GstFormat format, guint64 value,
+ GstFormat format, guint64 value,
GstFormat max_format, guint64 max_value);
void (*remove) (GstMultiSocketSink *sink, GSocket *socket);
void (*remove_flush) (GstMultiSocketSink *sink, GSocket *socket);
#include <gst/check/gstcheck.h>
+/* FIXME: remove this header once formats are refactored */
+#include "gst/tcp/gstmultifdsink.h"
+
static GstPad *mysrcpad;
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
int pfd1[2];
int pfd2[2];
int pfd3[2];
- gchar data[16];
gint i;
guint buffers_queued;
/* make sure we keep at least 100 bytes at all times */
g_object_set (sink, "bytes-min", 100, NULL);
g_object_set (sink, "sync-method", 3, NULL); /* 3 = burst */
- g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */
+ g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL);
g_object_set (sink, "burst-value", (guint64) 80, NULL);
fail_if (pipe (pfd1) == -1);
/* now we should only read the last 5 buffers (5 * 16 = 80 bytes) */
GST_DEBUG ("Reading from client 1");
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000005");
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000006");
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000007");
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000008");
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000009");
/* second client only bursts 50 bytes = 4 buffers (we get 4 buffers since
* the max alows it) */
GST_DEBUG ("Reading from client 2");
- fail_if (read (pfd2[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
- fail_if (read (pfd2[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
- fail_if (read (pfd2[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
- fail_if (read (pfd2[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+ fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000006");
+ fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000007");
+ fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000008");
+ fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000009");
/* third client only bursts 50 bytes = 4 buffers, we can't send
* more than 50 bytes so we only get 3 buffers (48 bytes). */
GST_DEBUG ("Reading from client 3");
- fail_if (read (pfd3[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
- fail_if (read (pfd3[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
- fail_if (read (pfd3[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+ fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000007");
+ fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000008");
+ fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000009");
GST_DEBUG ("cleaning up multifdsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
int pfd1[2];
int pfd2[2];
int pfd3[2];
- gchar data[16];
gint i;
guint buffers_queued;
sink = setup_multifdsink ();
/* make sure we keep at least 100 bytes at all times */
g_object_set (sink, "bytes-min", 100, NULL);
- g_object_set (sink, "sync-method", 4, NULL); /* 3 = burst_keyframe */
- g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */
+ g_object_set (sink, "sync-method", 4, NULL); /* 4 = burst_keyframe */
+ g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL);
g_object_set (sink, "burst-value", (guint64) 80, NULL);
fail_if (pipe (pfd1) == -1);
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
caps = gst_caps_from_string ("application/x-gst-check");
- GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
gst_pad_set_caps (mysrcpad, caps);
+ GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
/* push buffers in, 9 * 16 bytes = 144 bytes */
for (i = 0; i < 9; i++) {
/* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes),
* keyframe at buffer 4 */
GST_DEBUG ("Reading from client 1");
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000004", 16) == 0);
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000004");
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000005");
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000006");
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000007");
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000008");
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000009");
/* second client only bursts 50 bytes = 4 buffers, there is
* no keyframe above min and below max, so get one below min */
GST_DEBUG ("Reading from client 2");
- fail_if (read (pfd2[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
- fail_if (read (pfd2[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+ fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000008");
+ fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000009");
/* third client only bursts 50 bytes = 4 buffers, we can't send
* more than 50 bytes so we only get 2 buffers (32 bytes). */
GST_DEBUG ("Reading from client 3");
- fail_if (read (pfd3[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
- fail_if (read (pfd3[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+ fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000008");
+ fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000009");
GST_DEBUG ("cleaning up multifdsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
int pfd1[2];
int pfd2[2];
int pfd3[2];
- gchar data[16];
gint i;
guint buffers_queued;
sink = setup_multifdsink ();
/* make sure we keep at least 100 bytes at all times */
g_object_set (sink, "bytes-min", 100, NULL);
- g_object_set (sink, "sync-method", 5, NULL); /* 3 = burst_with_keyframe */
- g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */
+ g_object_set (sink, "sync-method", 5, NULL); /* 5 = burst_with_keyframe */
+ g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL);
g_object_set (sink, "burst-value", (guint64) 80, NULL);
fail_if (pipe (pfd1) == -1);
/* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes),
* keyframe at buffer 4 */
GST_DEBUG ("Reading from client 1");
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000004", 16) == 0);
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000004");
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000005");
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000006");
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000007");
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000008");
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000009");
/* second client only bursts 50 bytes = 4 buffers, there is
* no keyframe above min and below max, so send min */
GST_DEBUG ("Reading from client 2");
- fail_if (read (pfd2[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
- fail_if (read (pfd2[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
- fail_if (read (pfd2[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
- fail_if (read (pfd2[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+ fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000006");
+ fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000007");
+ fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000008");
+ fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000009");
/* third client only bursts 50 bytes = 4 buffers, we can't send
* more than 50 bytes so we only get 3 buffers (48 bytes). */
GST_DEBUG ("Reading from client 3");
- fail_if (read (pfd3[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
- fail_if (read (pfd3[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
- fail_if (read (pfd3[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+ fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000007");
+ fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000008");
+ fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000009");
GST_DEBUG ("cleaning up multifdsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
GstElement *sink;
GstCaps *caps;
int pfd1[2];
- gchar data[16];
gint i;
sink = setup_multifdsink ();
/* now we should be able to read some data */
GST_DEBUG ("Reading from client 1");
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000000", 16) == 0);
- fail_if (read (pfd1[0], data, 16) < 16);
- fail_unless (strncmp (data, "deadbee00000001", 16) == 0);
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000000");
+ fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000001");
GST_DEBUG ("cleaning up multifdsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
sink = setup_multisocketsink ();
/* make sure we keep at least 100 bytes at all times */
g_object_set (sink, "bytes-min", 100, NULL);
- g_object_set (sink, "sync-method", 4, NULL); /* 3 = burst_keyframe */
+ g_object_set (sink, "sync-method", 4, NULL); /* 4 = burst_keyframe */
g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL);
g_object_set (sink, "burst-value", (guint64) 80, NULL);
/* make sure we keep at least 100 bytes at all times */
g_object_set (sink, "bytes-min", 100, NULL);
- g_object_set (sink, "sync-method", 5, NULL); /* 3 = burst_with_keyframe */
+ g_object_set (sink, "sync-method", 5, NULL); /* 5 = burst_with_keyframe */
g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL);
g_object_set (sink, "burst-value", (guint64) 80, NULL);