* each descriptor added, the "client-added" signal will be called.
* </para>
* <para>
+ * As of version 0.10.8, a client can also be added with the "add-full" signal
+ * that allows for more control over what and how much data a client
+ * initially receives.
+ * </para>
+ * <para>
* Clients can be removed from multifdsink by emiting the "remove" signal. For
* each descriptor removed, the "client-removed" signal will be called. The
* "client-removed" signal can also be fired when multifdsink decides that a
* client is not active anymore or, depending on the value of the
* "recover-policy," if the client is reading to slow.
* In all cases, multifdsink will never ever close a file descriptor itself.
- * The user of multifdsink is responsible for closing the file descriptor.
+ * The user of multifdsink is responsible for closing all file descriptors.
* This can for example be done in response to the "client-fd-removed" signal.
* Note that multifdsink still has a reference to the file descriptor when the
* "client-removed" signal is emited so that "get-stats" can be performed on
* the descriptor; It is therefore not allowed to close the file descriptor in
- * the "client-removed" signal, use the "client-fd-removed" signal to close the
- * fd.
+ * the "client-removed" signal, use the "client-fd-removed" signal to safely
+ * close the fd.
* </para>
* <para>
* Multifdsink internally keeps a queue of the incomming buffers and uses a
* </para>
* <para>
* When adding a client to multifdsink, the "sync-method" property will define
- * which buffer will be sent first to the client. Clients can be sent
- * respectively the most recent buffer (which might not be decodable by the
- * client when it is not a keyframe), the next keyframe received in multifdsink
- * (which can take some time depending on the keyframe rate, or the last
- * received keyframe (which will cause a burst-on-connect).
+ * which buffer in the queued buffers will be sent first to the client. Clients
+ * can be sent respectively the most recent buffer (which might not be decodable
+ * by the client when it is not a keyframe), the next keyframe received in
+ * multifdsink (which can take some time depending on the keyframe rate, or the
+ * last received keyframe (which will cause a simpl burst-on-connect).
+ * Multifdsink will always keep at least one keyframe in its internal buffers
+ * when the sync-mode is set to latest-keyframe.
+ * </para>
+ * <para>
+ * Multifdsink can be instructed to keep at least a minimum amount of data
+ * expressed in time or byte units in its internal queues with the the
+ * "time-min" and "bytes-min" properties respectively. These properties are
+ * usefull if the application adds clients with the "add-full" signal to
+ * make sure that a burst connect can actually be honored.
* </para>
* <para>
* When streaming data, clients are allowed to read at a different rate than
* buffer queue.
* </para>
* <para>
- * multifdsink will synchronize on the clock before serving the buffers to the
- * clients.
+ * multifdsink will by default synchronize on the clock before serving the buffers
+ * to the clients. This behaviour can be disabled by setting the sync
+ * property to FALSE. Multifdsink will be default not do QoS and will never
+ * drop late buffers.
* </para>
* </refsect2>
*
- * Last reviewed on 2006-04-28 (0.10.7)
+ * Last reviewed on 2006-06-13 (0.10.9)
*/
#ifdef HAVE_CONFIG_H
{
/* methods */
SIGNAL_ADD,
+ SIGNAL_ADD_BURST,
SIGNAL_REMOVE,
SIGNAL_CLEAR,
SIGNAL_GET_STATS,
};
/* this is really arbitrarily chosen */
-#define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_NONE
-#define DEFAULT_MODE GST_FDSET_MODE_POLL
+#define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_NONE
+#define DEFAULT_MODE GST_FDSET_MODE_POLL
#define DEFAULT_BUFFERS_MAX -1
#define DEFAULT_BUFFERS_SOFT_MAX -1
+#define DEFAULT_TIME_MIN -1
+#define DEFAULT_BYTES_MIN -1
+#define DEFAULT_BUFFERS_MIN -1
#define DEFAULT_UNIT_TYPE GST_UNIT_TYPE_BUFFERS
#define DEFAULT_UNITS_MAX -1
#define DEFAULT_UNITS_SOFT_MAX -1
-#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE
-#define DEFAULT_TIMEOUT 0
-#define DEFAULT_SYNC_METHOD GST_SYNC_METHOD_LATEST
+#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE
+#define DEFAULT_TIMEOUT 0
+#define DEFAULT_SYNC_METHOD GST_SYNC_METHOD_LATEST
+
+#define DEFAULT_BURST_UNIT GST_UNIT_TYPE_UNDEFINED
+#define DEFAULT_BURST_VALUE 0
enum
{
- ARG_0,
- ARG_PROTOCOL,
- ARG_MODE,
- ARG_BUFFERS_QUEUED,
- ARG_BYTES_QUEUED,
- ARG_TIME_QUEUED,
-
- ARG_UNIT_TYPE,
- ARG_UNITS_MAX,
- ARG_UNITS_SOFT_MAX,
-
- ARG_BUFFERS_MAX,
- ARG_BUFFERS_SOFT_MAX,
-
- ARG_RECOVER_POLICY,
- ARG_TIMEOUT,
- ARG_SYNC_METHOD,
- ARG_BYTES_TO_SERVE,
- ARG_BYTES_SERVED,
+ PROP_0,
+ PROP_PROTOCOL,
+ PROP_MODE,
+ PROP_BUFFERS_QUEUED,
+ PROP_BYTES_QUEUED,
+ PROP_TIME_QUEUED,
+
+ PROP_UNIT_TYPE,
+ PROP_UNITS_MAX,
+ PROP_UNITS_SOFT_MAX,
+
+ PROP_BUFFERS_MAX,
+ PROP_BUFFERS_SOFT_MAX,
+
+ PROP_TIME_MIN,
+ PROP_BYTES_MIN,
+ PROP_BUFFERS_MIN,
+
+ PROP_RECOVER_POLICY,
+ PROP_TIMEOUT,
+ PROP_SYNC_METHOD,
+ PROP_BYTES_TO_SERVE,
+ PROP_BYTES_SERVED,
+
+ PROP_BURST_UNIT,
+ PROP_BURST_VALUE,
};
#define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type())
{GST_SYNC_METHOD_LATEST_KEYFRAME,
"Serve everything since the latest keyframe (burst)",
"latest-keyframe"},
+ {GST_SYNC_METHOD_BURST, "Serve burst-value data to client", "burst"},
+ {GST_SYNC_METHOD_BURST_KEYFRAME,
+ "Serve burst-value data starting on a keyframe",
+ "burst-keyframe"},
+ {GST_SYNC_METHOD_BURST_WITH_KEYFRAME,
+ "Serve burst-value data preferably starting on a keyframe",
+ "burst-with-keyframe"},
{0, NULL, NULL},
};
return sync_method_type;
}
-#if NOT_IMPLEMENTED
#define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type())
static GType
gst_unit_type_get_type (void)
{
static GType unit_type_type = 0;
static const GEnumValue unit_type[] = {
+ {GST_UNIT_TYPE_UNDEFINED, "Undefined", "undefined"},
{GST_UNIT_TYPE_BUFFERS, "Buffers", "buffers"},
{GST_UNIT_TYPE_BYTES, "Bytes", "bytes"},
{GST_UNIT_TYPE_TIME, "Time", "time"},
}
return unit_type_type;
}
-#endif
#define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type())
static GType
gobject_class->get_property = gst_multi_fd_sink_get_property;
gobject_class->finalize = gst_multi_fd_sink_finalize;
- g_object_class_install_property (gobject_class, ARG_PROTOCOL,
+ g_object_class_install_property (gobject_class, PROP_PROTOCOL,
g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
- g_object_class_install_property (gobject_class, ARG_MODE,
+ g_object_class_install_property (gobject_class, PROP_MODE,
g_param_spec_enum ("mode", "Mode",
"The mode for selecting activity on the fds", GST_TYPE_FDSET_MODE,
DEFAULT_MODE, G_PARAM_READWRITE));
- g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX,
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFERS_MAX,
g_param_spec_int ("buffers-max", "Buffers max",
- "max number of buffers to queue (-1 = no limit)", -1, G_MAXINT,
- DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE));
- g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_SOFT_MAX,
- g_param_spec_int ("buffers-soft-max", "Buffers soft max",
+ "max number of buffers to queue for a client (-1 = no limit)", -1,
+ G_MAXINT, DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass),
+ PROP_BUFFERS_SOFT_MAX, g_param_spec_int ("buffers-soft-max",
+ "Buffers soft max",
"Recover client when going over this limit (-1 = no limit)", -1,
G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_MIN,
+ g_param_spec_int ("bytes-min", "Bytes min",
+ "min number of bytes to queue (-1 = as little as possible)", -1,
+ G_MAXINT, DEFAULT_BYTES_MIN, G_PARAM_READWRITE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIME_MIN,
+ g_param_spec_int64 ("time-min", "Time min",
+ "min number of time to queue (-1 = as litte as possible)", -1,
+ G_MAXINT64, DEFAULT_TIME_MIN, G_PARAM_READWRITE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFERS_MIN,
+ g_param_spec_int64 ("buffers-min", "Buffers min",
+ "min number of buffers to queue (-1 = as litte as possible)", -1,
+ G_MAXINT, DEFAULT_BUFFERS_MIN, G_PARAM_READWRITE));
+
#if NOT_IMPLEMENTED
- g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNIT_TYPE,
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_UNIT_TYPE,
g_param_spec_enum ("unit-type", "Units type",
"The unit to measure the max/soft-max/queued properties",
GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, G_PARAM_READWRITE));
- g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_MAX,
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_UNITS_MAX,
g_param_spec_int ("units-max", "Units max",
"max number of units to queue (-1 = no limit)", -1, G_MAXINT,
DEFAULT_UNITS_MAX, G_PARAM_READWRITE));
- g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_SOFT_MAX,
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_UNITS_SOFT_MAX,
g_param_spec_int ("units-soft-max", "Units soft max",
"Recover client when going over this limit (-1 = no limit)", -1,
G_MAXINT, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE));
#endif
- g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED,
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFERS_QUEUED,
g_param_spec_uint ("buffers-queued", "Buffers queued",
"Number of buffers currently queued", 0, G_MAXUINT, 0,
G_PARAM_READABLE));
#if NOT_IMPLEMENTED
- g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_QUEUED,
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_QUEUED,
g_param_spec_uint ("bytes-queued", "Bytes queued",
"Number of bytes currently queued", 0, G_MAXUINT, 0,
G_PARAM_READABLE));
- g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIME_QUEUED,
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIME_QUEUED,
g_param_spec_uint64 ("time-queued", "Time queued",
"Number of time currently queued", 0, G_MAXUINT64, 0,
G_PARAM_READABLE));
#endif
- g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY,
+ g_object_class_install_property (gobject_class, PROP_RECOVER_POLICY,
g_param_spec_enum ("recover-policy", "Recover Policy",
"How to recover when client reaches the soft max",
GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY, G_PARAM_READWRITE));
- g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT,
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
g_param_spec_uint64 ("timeout", "Timeout",
"Maximum inactivity timeout in nanoseconds for a client (0 = no limit)",
0, G_MAXUINT64, DEFAULT_TIMEOUT, G_PARAM_READWRITE));
- g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SYNC_METHOD,
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SYNC_METHOD,
g_param_spec_enum ("sync-method", "Sync Method",
"How to sync new clients to the stream",
GST_TYPE_SYNC_METHOD, DEFAULT_SYNC_METHOD, G_PARAM_READWRITE));
- g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_TO_SERVE,
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_TO_SERVE,
g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
"Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
G_PARAM_READABLE));
- g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_SERVED,
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_SERVED,
g_param_spec_uint64 ("bytes-served", "Bytes served",
"Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
G_PARAM_READABLE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BURST_UNIT,
+ g_param_spec_enum ("burst-unit", "Burst unit",
+ "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
+ GST_TYPE_UNIT_TYPE, DEFAULT_BURST_UNIT, G_PARAM_READWRITE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BURST_VALUE,
+ g_param_spec_uint64 ("burst-value", "Burst value",
+ "The amount of burst expressed in burst-unit",
+ 0, G_MAXUINT64, DEFAULT_BURST_VALUE, G_PARAM_READWRITE));
+
/**
* GstMultiFdSink::add:
* @gstmultifdsink: the multifdsink element to emit this signal on
G_STRUCT_OFFSET (GstMultiFdSinkClass, add),
NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
/**
+ * GstMultiFdSink::add-full:
+ * @gstmultifdsink: the multifdsink element to emit this signal on
+ * @fd: the file descriptor to add to multifdsink
+ * @keyframe: start bursting from a keyframe
+ * @unit_type: the unit-type of @value
+ * @value: the minimal amount of data to burst expressed in
+ * @format units.
+ *
+ * Hand the given open file descriptor to multifdsink to write to and
+ * specify the burst parameters for the new connection.
+ */
+ gst_multi_fd_sink_signals[SIGNAL_ADD_BURST] =
+ g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+ G_STRUCT_OFFSET (GstMultiFdSinkClass, add_full),
+ NULL, NULL, gst_tcp_marshal_VOID__INT_BOOLEAN_INT_UINT64_INT_UINT64,
+ G_TYPE_NONE, 6, G_TYPE_INT, G_TYPE_BOOLEAN, GST_TYPE_UNIT_TYPE,
+ G_TYPE_UINT64, GST_TYPE_UNIT_TYPE, G_TYPE_UINT64);
+ /**
* GstMultiFdSink::remove:
* @gstmultifdsink: the multifdsink element to emit this signal on
* @fd: the file descriptor to remove from multifdsink
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_render);
klass->add = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add);
+ klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add_full);
klass->remove = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove);
klass->clear = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear);
klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_get_stats);
this->unit_type = DEFAULT_UNIT_TYPE;
this->units_max = DEFAULT_UNITS_MAX;
this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
+ this->time_min = DEFAULT_TIME_MIN;
+ this->bytes_min = DEFAULT_BYTES_MIN;
+ this->buffers_min = DEFAULT_BUFFERS_MIN;
this->recover_policy = DEFAULT_RECOVER_POLICY;
this->timeout = DEFAULT_TIMEOUT;
- this->sync_method = DEFAULT_SYNC_METHOD;
+ this->def_sync_method = DEFAULT_SYNC_METHOD;
+ this->def_burst_unit = DEFAULT_BURST_UNIT;
+ this->def_burst_value = DEFAULT_BURST_VALUE;
this->header_flags = 0;
}
G_OBJECT_CLASS (parent_class)->finalize (object);
}
-/* "add" signal implemntation */
+/* "add-full" signal implemntation */
void
-gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd)
+gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
+ GstSyncMethod sync_method, GstUnitType min_unit, guint64 min_value,
+ GstUnitType max_unit, guint64 max_value)
{
GstTCPClient *client;
GList *clink;
GST_DEBUG_OBJECT (sink, "[fd %5d] adding client", fd);
+ /* do limits check if we can */
+ if (min_unit == max_unit) {
+ if (max_value != -1 && min_value != -1 && max_value < min_value)
+ goto wrong_limits;
+ }
+
/* create client datastructure */
client = g_new0 (GstTCPClient, 1);
client->fd.fd = fd;
client->dropped_buffers = 0;
client->avg_queue_size = 0;
client->new_connection = TRUE;
+ client->burst_min_unit = min_unit;
+ client->burst_min_value = min_value;
+ client->burst_max_unit = max_unit;
+ client->burst_max_value = max_value;
+ client->sync_method = sync_method;
/* update start time */
g_get_current_time (&now);
client->connect_time = GST_TIMEVAL_TO_TIME (now);
client->disconnect_time = 0;
- /* send last activity time to connect time */
- client->last_activity_time = GST_TIMEVAL_TO_TIME (now);
+ /* set last activity time to connect time */
+ client->last_activity_time = client->connect_time;
CLIENTS_LOCK (sink);
/* check the hash to find a duplicate fd */
clink = g_hash_table_lookup (sink->fd_hash, &client->fd.fd);
- if (clink != NULL) {
- client->status = GST_CLIENT_STATUS_DUPLICATE;
- CLIENTS_UNLOCK (sink);
- GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd);
- g_signal_emit (G_OBJECT (sink),
- gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd,
- client->status);
- g_free (client);
- return;
- }
+ if (clink != NULL)
+ goto duplicate;
/* we can add the fd now */
clink = sink->clients = g_list_prepend (sink->clients, client);
g_signal_emit (G_OBJECT (sink),
gst_multi_fd_sink_signals[SIGNAL_CLIENT_ADDED], 0, fd);
+
+ return;
+
+ /* errors */
+wrong_limits:
+ {
+ GST_WARNING_OBJECT (sink,
+ "[fd %5d] wrong values min =%" G_GUINT64_FORMAT ", max=%"
+ G_GUINT64_FORMAT ", unit %d specified when adding client", fd,
+ min_value, max_value, min_unit);
+ return;
+ }
+duplicate:
+ {
+ client->status = GST_CLIENT_STATUS_DUPLICATE;
+ CLIENTS_UNLOCK (sink);
+ GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd);
+ g_signal_emit (G_OBJECT (sink),
+ gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd,
+ client->status);
+ g_free (client);
+ return;
+ }
+}
+
+/* "add" signal implemntation */
+void
+gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd)
+{
+ gst_multi_fd_sink_add_full (sink, fd, sink->def_sync_method,
+ sink->def_burst_unit, sink->def_burst_value, sink->def_burst_unit, -1);
}
/* "remove" signal implemntation */
return result;
}
-/* should be called with the clientslock held.
+/* should be called with the clientslock helt.
* Note that we don't close the fd as we didn't open it in the first
* place. An application should connect to the client-removed signal and
* close the fd itself.
fd = client->fd.fd;
- if (ioctl (fd, FIONREAD, &avail) < 0) {
- GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)",
- fd, g_strerror (errno), errno);
- client->status = GST_CLIENT_STATUS_ERROR;
- ret = FALSE;
- return ret;
- }
+ if (ioctl (fd, FIONREAD, &avail) < 0)
+ goto ioctl_failed;
GST_DEBUG_OBJECT (sink, "[fd %5d] select reports client read of %d bytes",
fd, avail);
while (avail > 0);
}
return ret;
+
+ /* ERRORS */
+ioctl_failed:
+ {
+ GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)",
+ fd, g_strerror (errno), errno);
+ client->status = GST_CLIENT_STATUS_ERROR;
+ return FALSE;
+ }
}
/* Queue raw data for this client, creating a new buffer.
* This takes ownership of the data by
- * setting it as GST_BUFFER_MALLOCDATA() on the created buffer
+ * setting it as GST_BUFFER_MALLOCDATA() on the created buffer so
+ * be sure to pass g_free()-able @data.
*/
static gboolean
gst_multi_fd_sink_client_queue_data (GstMultiFdSink * sink,
return TRUE;
}
+/* find the keyframe in the list of buffers starting the
+ * search from @idx. @direction as -1 will search backwards,
+ * 1 will search forwards.
+ * Returns: the index or -1 if there is no keyframe after idx.
+ */
+static gint
+find_syncframe (GstMultiFdSink * sink, gint idx, gint direction)
+{
+ gint i, len, result;
+
+ /* take length of queued buffers */
+ len = sink->bufqueue->len;
+
+ /* assume we don't find a keyframe */
+ result = -1;
+
+ /* then loop over all buffers to find the first keyframe */
+ for (i = idx; i >= 0 && i < len; i += direction) {
+ GstBuffer *buf;
+
+ buf = g_array_index (sink->bufqueue, GstBuffer *, i);
+ if (is_sync_frame (sink, buf)) {
+ GST_LOG_OBJECT (sink, "found keyframe at %d from %d, direction %d",
+ i, idx, direction);
+ result = i;
+ break;
+ }
+ }
+ return result;
+}
+
+#define find_next_syncframe(s,i) find_syncframe(s,i,1)
+#define find_prev_syncframe(s,i) find_syncframe(s,i,-1)
+
+/* find the positions in the buffer queue where *_min and *_max
+ * is satisfied
+ */
+/* count the amount of data in the buffers and return the index
+ * that satifies the given limits.
+ *
+ * Returns: index @idx in the buffer queue so that the given limits are
+ * satisfied. TRUE if all the limits could be satisfied, FALSE if not
+ * enough data was in the queue.
+ *
+ * FIXME, this code might now work if any of the units is in buffers...
+ */
+static gboolean
+find_limits (GstMultiFdSink * sink,
+ gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
+ gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max)
+{
+ GstClockTime first, time;
+ gint i, len, bytes;
+ gboolean result, max_hit;
+
+ /* take length of queue */
+ len = sink->bufqueue->len;
+
+ /* this must hold */
+ g_assert (len > 0);
+
+ GST_LOG_OBJECT (sink,
+ "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT
+ ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min,
+ buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max,
+ GST_TIME_ARGS (time_max));
+
+ /* do the trivial buffer limit test */
+ if (buffers_min != -1 && len < buffers_min) {
+ *min_idx = len - 1;
+ *max_idx = len - 1;
+ return FALSE;
+ }
+
+ result = FALSE;
+ /* else count bytes and time */
+ first = -1;
+ bytes = 0;
+ /* unset limits */
+ *min_idx = -1;
+ *max_idx = -1;
+ max_hit = FALSE;
+
+ i = 0;
+ /* loop through the buffers, when a limit is ok, mark it
+ * as -1, we have at least one buffer in the queue. */
+ do {
+ GstBuffer *buf;
+
+ /* if we checked all min limits, update result */
+ if (bytes_min == -1 && time_min == -1 && *min_idx == -1) {
+ /* don't go below 0 */
+ *min_idx = MAX (i - 1, 0);
+ }
+ /* if we reached one max limit break out */
+ if (max_hit) {
+ /* i > 0 when we get here, we subtract one to get the position
+ * of the previous buffer. */
+ *max_idx = i - 1;
+ /* we have valid complete result if we found a min_idx too */
+ result = *min_idx != -1;
+ break;
+ }
+ buf = g_array_index (sink->bufqueue, GstBuffer *, i);
+
+ bytes += GST_BUFFER_SIZE (buf);
+
+ /* take timestamp and save for the base first timestamp */
+ if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) {
+ if (first == -1)
+ first = time;
+
+ /* increase max usage if we did not fill enough. Note that
+ * buffers are sorted from new to old, so the first timestamp is
+ * bigger than the next one. */
+ if (time_min != -1 && first - time >= time_min)
+ time_min = -1;
+ if (time_max != -1 && first - time >= time_max)
+ max_hit = TRUE;
+ }
+ /* time is OK or unknown, check and increase if not enough bytes */
+ if (bytes_min != -1) {
+ if (bytes >= bytes_min)
+ bytes_min = -1;
+ }
+ if (bytes_max != -1) {
+ if (bytes >= bytes_max) {
+ max_hit = TRUE;
+ }
+ }
+ i++;
+ }
+ while (i < len);
+
+ /* if we did not hit the max or min limit, set to buffer size */
+ if (*max_idx == -1)
+ *max_idx = len - 1;
+ /* make sure min does not exceed max */
+ if (*min_idx == -1)
+ *min_idx = *max_idx;
+
+ return result;
+}
+
+/* parse the unit/value pair and assign it to the result value of the
+ * right type, leave the other values untouched
+ *
+ * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
+ */
+static gboolean
+assign_value (GstUnitType unit, guint64 value, gint * bytes, gint * buffers,
+ GstClockTime * time)
+{
+ gboolean res = TRUE;
+
+ /* set only the limit of the given format to the given value */
+ switch (unit) {
+ case GST_UNIT_TYPE_BUFFERS:
+ *buffers = (gint) value;
+ break;
+ case GST_UNIT_TYPE_TIME:
+ *time = value;
+ break;
+ case GST_UNIT_TYPE_BYTES:
+ *bytes = (gint) value;
+ break;
+ case GST_UNIT_TYPE_UNDEFINED:
+ default:
+ res = FALSE;
+ break;
+ }
+ return res;
+}
+
+/* count the index in the buffer queue to satisfy the given unit
+ * and value pair starting from buffer at index 0.
+ *
+ * Returns: TRUE if there was enuough data in the queue to satisfy the
+ * burst values. @idx contains the index in the buffer that contains enough
+ * data to satisfy the limits or the last buffer in the queue when the
+ * function returns FALSE.
+ */
+static gboolean
+count_burst_unit (GstMultiFdSink * sink, gint * min_idx, GstUnitType min_unit,
+ guint64 min_value, gint * max_idx, GstUnitType max_unit, guint64 max_value)
+{
+ gint bytes_min = -1, buffers_min = -1;
+ gint bytes_max = -1, buffers_max = -1;
+ GstClockTime time_min = -1, time_max = -1;
+
+ assign_value (min_unit, min_value, &bytes_min, &buffers_min, &time_min);
+ assign_value (max_unit, max_value, &bytes_max, &buffers_max, &time_max);
+
+ return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
+ max_idx, bytes_max, buffers_max, time_max);
+}
+
/* decide where in the current buffer queue this new client should start
- * receiving buffers from
+ * receiving buffers from.
+ * This function is called whenever a client is connected and has not yet
+ * received a buffer.
* If this returns -1, it means that we haven't found a good point to
* start streaming from yet, and this function should be called again later
- * when more buffers have arrived */
+ * when more buffers have arrived.
+ */
static gint
gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
{
gint result;
- switch (sink->sync_method) {
+ switch (client->sync_method) {
+ case GST_SYNC_METHOD_LATEST:
+ /* no syncing, we are happy with whatever the client is going to get */
+ GST_LOG_OBJECT (sink, "no client sync needed");
+ result = client->bufpos;
+ break;
case GST_SYNC_METHOD_NEXT_KEYFRAME:
{
+ GstBuffer *buf;
+
/* if the buffer at the head of the queue is a sync point we can proceed,
* else we need to skip the buffer and wait for a new one */
GST_LOG_OBJECT (sink,
"[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd,
client->bufpos);
- /* the client is not yet aligned to a buffer */
- if (client->bufpos < 0) {
- result = -1;
- } else {
- GstBuffer *buf;
- gint i;
-
- for (i = client->bufpos; i >= 0; i--) {
- /* get the buffer for the client */
- buf = g_array_index (sink->bufqueue, GstBuffer *, i);
- if (is_sync_frame (sink, buf)) {
- GST_LOG_OBJECT (sink, "[fd %5d] new client, found sync",
- client->fd.fd);
- result = i;
- goto done;
- } else {
- /* client is not on a buffer, need to skip this buffer and
- * wait some more */
- GST_LOG_OBJECT (sink, "[fd %5d] new client, skipping buffer",
- client->fd.fd);
- client->bufpos--;
- }
- }
- result = -1;
+ /* get the buffer for the client */
+ buf = g_array_index (sink->bufqueue, GstBuffer *, 0);
+ if (is_sync_frame (sink, buf)) {
+ GST_LOG_OBJECT (sink, "[fd %5d] new client, found sync", client->fd.fd);
+ result = 0;
+ goto done;
}
+ /* client is not on a syncbuffer, need to skip this buffer and
+ * wait some more */
+ GST_LOG_OBJECT (sink, "[fd %5d] new client, skipping buffer",
+ client->fd.fd);
+ client->bufpos = -1;
+ result = -1;
break;
}
case GST_SYNC_METHOD_LATEST_KEYFRAME:
{
- /* FIXME for new clients we constantly scan the complete
- * buffer queue for sync point whenever a buffer is added. This is
- * suboptimal because if we cannot find a sync point the first time,
- * the algorithm should behave as GST_SYNC_METHOD_NEXT_KEYFRAME */
- gint i, len;
-
GST_LOG_OBJECT (sink, "[fd %5d] new client, bufpos %d, bursting keyframe",
client->fd.fd, client->bufpos);
- /* take length of queued buffers */
- len = sink->bufqueue->len;
- /* assume we don't find a keyframe */
- result = -1;
- /* then loop over all buffers to find the first keyframe */
- for (i = 0; i < len; i++) {
- GstBuffer *buf;
+ /* for new clients we initially scan the complete buffer queue for
+ * a sync point when a buffer is added. If we don't find a keyframe,
+ * we need to wait for the next keyframe and so we change the client's
+ * sync method to GST_SYNC_METHOD_NEXT_KEYFRAME.
+ */
+ result = find_next_syncframe (sink, 0);
+ if (result != -1)
+ goto done;
- buf = g_array_index (sink->bufqueue, GstBuffer *, i);
- if (is_sync_frame (sink, buf)) {
- /* found a keyframe, return its position */
- GST_LOG_OBJECT (sink, "found keyframe at %d", i);
- result = i;
- goto done;
- }
- }
GST_LOG_OBJECT (sink, "no keyframe found");
/* throw client to the waiting state */
client->bufpos = -1;
+ /* and make client sync to next keyframe */
+ client->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
+ break;
+ }
+ case GST_SYNC_METHOD_BURST:
+ {
+ gboolean ok;
+ gint max;
+
+ /* move to the position where we satisfy the client's burst
+ * parameters. If we could not satisfy the parameters because there
+ * is not enough data, we just send what we have (which is in result).
+ * We use the max value to limit the search
+ */
+ ok = count_burst_unit (sink, &result, client->burst_min_unit,
+ client->burst_min_value, &max, client->burst_max_unit,
+ client->burst_max_value);
+
+ GST_LOG_OBJECT (sink, "min %d, max %d", result, max);
+
+ /* we hit the max and it is below the min, use that then */
+ if (max != -1 && max <= result) {
+ result = MAX (max - 1, 0);
+ }
+ break;
+ }
+ case GST_SYNC_METHOD_BURST_KEYFRAME:
+ {
+ gboolean ok;
+ gint min_idx, max_idx;
+ gint next_syncframe, prev_syncframe;
+
+ /* BURST_KEYFRAME:
+ *
+ * _always_ start sending a keyframe to the client. We first search
+ * a keyframe between min/max limits. If there is none, we send it the
+ * last keyframe before min. If there is none, the behaviour is like
+ * NEXT_KEYFRAME.
+ */
+ /* gather burst limits */
+ ok = count_burst_unit (sink, &min_idx, client->burst_min_unit,
+ client->burst_min_value, &max_idx, client->burst_max_unit,
+ client->burst_max_value);
+
+ GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
+
+ /* first find a keyframe after min_idx */
+ next_syncframe = find_next_syncframe (sink, min_idx);
+ if (next_syncframe != -1 && next_syncframe < max_idx) {
+ /* we have a valid keyframe and it's below the max */
+ GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
+ result = next_syncframe;
+ break;
+ }
+
+ /* no valid keyframe, try to find one below min */
+ prev_syncframe = find_prev_syncframe (sink, min_idx);
+ if (prev_syncframe != -1) {
+ GST_WARNING_OBJECT (sink,
+ "using keyframe below min in BURST_KEYFRAME sync mode");
+ result = prev_syncframe;
+ break;
+ }
+
+ /* no prev keyframe or not enough data */
+ GST_WARNING_OBJECT (sink,
+ "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next");
+
+ /* throw client to the waiting state */
+ client->bufpos = -1;
+ /* and make client sync to next keyframe */
+ client->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
+ result = -1;
+ break;
+ }
+ case GST_SYNC_METHOD_BURST_WITH_KEYFRAME:
+ {
+ gboolean ok;
+ gint min_idx, max_idx;
+ gint next_syncframe;
+
+ /* BURST_WITH_KEYFRAME:
+ *
+ * try to start sending a keyframe to the client. We first search
+ * a keyframe between min/max limits. If there is none, we send it the
+ * amount of data up 'till min.
+ */
+ /* gather enough data to burst */
+ ok = count_burst_unit (sink, &min_idx, client->burst_min_unit,
+ client->burst_min_value, &max_idx, client->burst_max_unit,
+ client->burst_max_value);
+
+ GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
+
+ /* first find a keyframe after min_idx */
+ next_syncframe = find_next_syncframe (sink, min_idx);
+ if (next_syncframe != -1 && next_syncframe < max_idx) {
+ /* we have a valid keyframe and it's below the max */
+ GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
+ result = next_syncframe;
+ break;
+ }
+
+ /* no keyframe, send data from min_idx */
+ GST_WARNING_OBJECT (sink, "using min in BURST_WITH_KEYFRAME sync mode");
+
+ /* make sure we don't go over the max limit */
+ if (max_idx != -1 && max_idx <= min_idx) {
+ result = MAX (max_idx - 1, 0);
+ } else {
+ result = min_idx;
+ }
+
break;
}
default:
- /* no syncing, we are happy with whatever the client is going to get */
- GST_LOG_OBJECT (sink, "no client syn needed");
+ g_warning ("unknown sync method %d", client->sync_method);
result = client->bufpos;
break;
}
/* nothing serious, resource was unavailable, try again later */
more = FALSE;
} else if (errno == ECONNRESET) {
- GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing",
- fd);
- client->status = GST_CLIENT_STATUS_CLOSED;
- return FALSE;
+ goto connection_reset;
} else {
- GST_WARNING_OBJECT (sink,
- "[fd %5d] could not write, removing client: %s (%d)", fd,
- g_strerror (errno), errno);
- client->status = GST_CLIENT_STATUS_ERROR;
- return FALSE;
+ goto write_error;
}
} else {
if (wrote < maxsize) {
} while (more);
return TRUE;
+
+ /* ERRORS */
+connection_reset:
+ {
+ GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing", fd);
+ client->status = GST_CLIENT_STATUS_CLOSED;
+ return FALSE;
+ }
+write_error:
+ {
+ GST_WARNING_OBJECT (sink,
+ "[fd %5d] could not write, removing client: %s (%d)", fd,
+ g_strerror (errno), errno);
+ client->status = GST_CLIENT_STATUS_ERROR;
+ return FALSE;
+ }
}
/* calculate the new position for a client after recovery. This function
}
}
+ /* make sure we respect bytes-min, buffers-min and time-min when they are set */
+ {
+ gint usage, max;
+
+ GST_LOG_OBJECT (sink,
+ "extending queue %d to respect time_min %" GST_TIME_FORMAT
+ ", bytes_min %d, buffers_min %d", max_buffer_usage,
+ GST_TIME_ARGS (sink->time_min), sink->bytes_min, sink->buffers_min);
+
+ /* get index where the limits are ok, we don't really care if all limits
+ * are ok, we just queue as much as we need. We also don't compare against
+ * the max limits. */
+ find_limits (sink, &usage, sink->bytes_min, sink->buffers_min,
+ sink->time_min, &max, -1, -1, -1);
+
+ max_buffer_usage = MAX (max_buffer_usage, usage + 1);
+ GST_LOG_OBJECT (sink, "extended queue to %d", max_buffer_usage);
+ }
+
/* now look for sync points and make sure there is at least one
- * sync point in the queue. We only do this if the burst mode
- * is enabled. */
- if (sink->sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME) {
+ * sync point in the queue. We only do this if the LATEST_KEYFRAME
+ * mode is selected */
+ if (sink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME) {
/* no point in searching beyond the queue length */
gint limit = queuelen;
GstBuffer *buf;
GST_LOG_OBJECT (sink, "max buffer usage is now %d", max_buffer_usage);
}
+ GST_LOG_OBJECT (sink, "len %d, usage %d", queuelen, max_buffer_usage);
+
/* nobody is referencing units after max_buffer_usage so we can
* remove them from the queue. We remove them in reverse order as
* this is the most optimal for GArray. */
gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
{
GstMultiFdSink *sink;
+ gboolean in_caps;
GstCaps *bufcaps, *padcaps;
sink = GST_MULTI_FD_SINK (bsink);
g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, GST_MULTI_FD_SINK_OPEN),
GST_FLOW_ERROR);
- GST_LOG_OBJECT (sink, "received buffer %p", buf);
+ in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
+
+ GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %d", buf, in_caps);
+
/* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS,
* it means we're getting new streamheader buffers, and we should clear
* the old ones */
- if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS) &&
- sink->previous_buffer_in_caps == FALSE) {
+ if (in_caps && sink->previous_buffer_in_caps == FALSE) {
GST_DEBUG_OBJECT (sink,
"receiving new IN_CAPS buffers, clearing old streamheader");
g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL);
sink->streamheader = NULL;
}
+ /* save the current in_caps */
+ sink->previous_buffer_in_caps = in_caps;
+
/* if the incoming buffer is marked as IN CAPS, then we assume for now
* it's a streamheader that needs to be sent to each new client, so we
* put it on our internal list of streamheader buffers.
*
* We don't send the buffer to the client, since streamheaders are sent
* separately when necessary. */
- if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS)) {
- sink->previous_buffer_in_caps = TRUE;
+ if (in_caps) {
GST_DEBUG_OBJECT (sink,
"appending IN_CAPS buffer with length %d to streamheader",
GST_BUFFER_SIZE (buf));
sink->streamheader = g_slist_append (sink->streamheader, buf);
- return GST_FLOW_OK;
- }
-
- sink->previous_buffer_in_caps = FALSE;
- /* queue the buffer */
- gst_multi_fd_sink_queue_buffer (sink, buf);
+ } else {
+ /* queue the buffer, this is a regular data buffer. */
+ gst_multi_fd_sink_queue_buffer (sink, buf);
- sink->bytes_to_serve += GST_BUFFER_SIZE (buf);
+ sink->bytes_to_serve += GST_BUFFER_SIZE (buf);
+ }
return GST_FLOW_OK;
}
{
GstMultiFdSink *multifdsink;
- g_return_if_fail (GST_IS_MULTI_FD_SINK (object));
multifdsink = GST_MULTI_FD_SINK (object);
switch (prop_id) {
- case ARG_PROTOCOL:
+ case PROP_PROTOCOL:
multifdsink->protocol = g_value_get_enum (value);
break;
- case ARG_MODE:
+ case PROP_MODE:
multifdsink->mode = g_value_get_enum (value);
break;
- case ARG_BUFFERS_MAX:
+ case PROP_BUFFERS_MAX:
multifdsink->units_max = g_value_get_int (value);
break;
- case ARG_BUFFERS_SOFT_MAX:
+ case PROP_BUFFERS_SOFT_MAX:
multifdsink->units_soft_max = g_value_get_int (value);
break;
- case ARG_UNIT_TYPE:
+ case PROP_TIME_MIN:
+ multifdsink->time_min = g_value_get_int64 (value);
+ break;
+ case PROP_BYTES_MIN:
+ multifdsink->bytes_min = g_value_get_int (value);
+ break;
+ case PROP_BUFFERS_MIN:
+ multifdsink->buffers_min = g_value_get_int (value);
+ break;
+ case PROP_UNIT_TYPE:
multifdsink->unit_type = g_value_get_enum (value);
break;
- case ARG_UNITS_MAX:
+ case PROP_UNITS_MAX:
multifdsink->units_max = g_value_get_int (value);
break;
- case ARG_UNITS_SOFT_MAX:
+ case PROP_UNITS_SOFT_MAX:
multifdsink->units_soft_max = g_value_get_int (value);
break;
- case ARG_RECOVER_POLICY:
+ case PROP_RECOVER_POLICY:
multifdsink->recover_policy = g_value_get_enum (value);
break;
- case ARG_TIMEOUT:
+ case PROP_TIMEOUT:
multifdsink->timeout = g_value_get_uint64 (value);
break;
- case ARG_SYNC_METHOD:
- multifdsink->sync_method = g_value_get_enum (value);
+ case PROP_SYNC_METHOD:
+ multifdsink->def_sync_method = g_value_get_enum (value);
+ break;
+ case PROP_BURST_UNIT:
+ multifdsink->def_burst_unit = g_value_get_enum (value);
+ break;
+ case PROP_BURST_VALUE:
+ multifdsink->def_burst_value = g_value_get_uint64 (value);
break;
default:
{
GstMultiFdSink *multifdsink;
- g_return_if_fail (GST_IS_MULTI_FD_SINK (object));
multifdsink = GST_MULTI_FD_SINK (object);
switch (prop_id) {
- case ARG_PROTOCOL:
+ case PROP_PROTOCOL:
g_value_set_enum (value, multifdsink->protocol);
break;
- case ARG_MODE:
+ case PROP_MODE:
g_value_set_enum (value, multifdsink->mode);
break;
- case ARG_BUFFERS_MAX:
+ case PROP_BUFFERS_MAX:
g_value_set_int (value, multifdsink->units_max);
break;
- case ARG_BUFFERS_SOFT_MAX:
+ case PROP_BUFFERS_SOFT_MAX:
g_value_set_int (value, multifdsink->units_soft_max);
break;
- case ARG_BUFFERS_QUEUED:
+ case PROP_TIME_MIN:
+ g_value_set_int64 (value, multifdsink->time_min);
+ break;
+ case PROP_BYTES_MIN:
+ g_value_set_int (value, multifdsink->bytes_min);
+ break;
+ case PROP_BUFFERS_MIN:
+ g_value_set_int (value, multifdsink->buffers_min);
+ break;
+ case PROP_BUFFERS_QUEUED:
g_value_set_uint (value, multifdsink->buffers_queued);
break;
- case ARG_BYTES_QUEUED:
+ case PROP_BYTES_QUEUED:
g_value_set_uint (value, multifdsink->bytes_queued);
break;
- case ARG_TIME_QUEUED:
+ case PROP_TIME_QUEUED:
g_value_set_uint64 (value, multifdsink->time_queued);
break;
- case ARG_UNIT_TYPE:
+ case PROP_UNIT_TYPE:
g_value_set_enum (value, multifdsink->unit_type);
break;
- case ARG_UNITS_MAX:
+ case PROP_UNITS_MAX:
g_value_set_int (value, multifdsink->units_max);
break;
- case ARG_UNITS_SOFT_MAX:
+ case PROP_UNITS_SOFT_MAX:
g_value_set_int (value, multifdsink->units_soft_max);
break;
- case ARG_RECOVER_POLICY:
+ case PROP_RECOVER_POLICY:
g_value_set_enum (value, multifdsink->recover_policy);
break;
- case ARG_TIMEOUT:
+ case PROP_TIMEOUT:
g_value_set_uint64 (value, multifdsink->timeout);
break;
- case ARG_SYNC_METHOD:
- g_value_set_enum (value, multifdsink->sync_method);
+ case PROP_SYNC_METHOD:
+ g_value_set_enum (value, multifdsink->def_sync_method);
break;
- case ARG_BYTES_TO_SERVE:
+ case PROP_BYTES_TO_SERVE:
g_value_set_uint64 (value, multifdsink->bytes_to_serve);
break;
- case ARG_BYTES_SERVED:
+ case PROP_BYTES_SERVED:
g_value_set_uint64 (value, multifdsink->bytes_served);
break;
+ case PROP_BURST_UNIT:
+ g_value_set_enum (value, multifdsink->def_burst_unit);
+ break;
+ case PROP_BURST_VALUE:
+ g_value_set_uint64 (value, multifdsink->def_burst_value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
/* ERRORS */
start_failed:
{
+ /* error message was posted */
return GST_STATE_CHANGE_FAILURE;
}
}
memcpy (GST_BUFFER_DATA (buffer), "dead", 4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+ GST_DEBUG ("reading");
fail_if (read (pfd[0], data, 4) < 4);
fail_unless (strncmp (data, "dead", 4) == 0);
wait_bytes_served (sink, 4);
GST_END_TEST;
+/* keep 100 bytes and burst 80 bytes to clients */
+GST_START_TEST (test_burst_client_bytes)
+{
+ GstElement *sink;
+ GstBuffer *buffer;
+ GstCaps *caps;
+ int pfd1[2];
+ int pfd2[2];
+ int pfd3[2];
+ gchar data[16];
+ guint64 bytes_served;
+ gint i;
+ guint buffers_queued;
+
+ sink = setup_multifdsink ();
+ /* make sure we keep at least 100 bytes at all times */
+ g_object_set (sink, "bytes-min", 100, NULL);
+ g_object_set (sink, "sync-method", 3, NULL); /* 3 = burst */
+ g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */
+ g_object_set (sink, "burst-value", (guint64) 80, NULL);
+
+ fail_if (pipe (pfd1) == -1);
+ fail_if (pipe (pfd2) == -1);
+ fail_if (pipe (pfd3) == -1);
+
+ ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
+
+ caps = gst_caps_from_string ("application/x-gst-check");
+ GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
+
+ /* push buffers in, 9 * 16 bytes = 144 bytes */
+ for (i = 0; i < 9; i++) {
+ gchar *data;
+
+ buffer = gst_buffer_new_and_alloc (16);
+ gst_buffer_set_caps (buffer, caps);
+
+ /* copy some id */
+ data = (gchar *) GST_BUFFER_DATA (buffer);
+ g_snprintf (data, 16, "deadbee%08x", i);
+
+ fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+ }
+
+ /* check that at least 7 buffers (112 bytes) are in the queue */
+ g_object_get (sink, "buffers-queued", &buffers_queued, NULL);
+ fail_if (buffers_queued != 7);
+
+ /* now add the clients */
+ g_signal_emit_by_name (sink, "add", pfd1[1]);
+ g_signal_emit_by_name (sink, "add_full", pfd2[1], 3,
+ 3, (guint64) 50, 3, (guint64) 200);
+ g_signal_emit_by_name (sink, "add_full", pfd3[1], 3,
+ 3, (guint64) 50, 3, (guint64) 50);
+
+ /* push last buffer to make client fds ready for reading */
+ for (i = 9; i < 10; i++) {
+ gchar *data;
+
+ buffer = gst_buffer_new_and_alloc (16);
+ gst_buffer_set_caps (buffer, caps);
+
+ /* copy some id */
+ data = (gchar *) GST_BUFFER_DATA (buffer);
+ g_snprintf (data, 16, "deadbee%08x", i);
+
+ fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+ }
+
+ /* now we should only read the last 5 buffers (5 * 16 = 80 bytes) */
+ GST_DEBUG ("Reading from client 1");
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+ /* second client only bursts 50 bytes = 4 buffers (we get 4 buffers since
+ * the max alows it) */
+ GST_DEBUG ("Reading from client 2");
+ fail_if (read (pfd2[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
+ fail_if (read (pfd2[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
+ fail_if (read (pfd2[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+ fail_if (read (pfd2[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+ /* third client only bursts 50 bytes = 4 buffers, we can't send
+ * more than 50 bytes so we only get 3 buffers (48 bytes). */
+ GST_DEBUG ("Reading from client 3");
+ fail_if (read (pfd3[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
+ fail_if (read (pfd3[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+ fail_if (read (pfd3[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+ GST_DEBUG ("cleaning up multifdsink");
+ ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+ cleanup_multifdsink (sink);
+
+ ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+ gst_caps_unref (caps);
+}
+
+GST_END_TEST;
+
+/* keep 100 bytes and burst 80 bytes to clients */
+GST_START_TEST (test_burst_client_bytes_keyframe)
+{
+ GstElement *sink;
+ GstBuffer *buffer;
+ GstCaps *caps;
+ int pfd1[2];
+ int pfd2[2];
+ int pfd3[2];
+ gchar data[16];
+ guint64 bytes_served;
+ gint i;
+ guint buffers_queued;
+
+ sink = setup_multifdsink ();
+ /* make sure we keep at least 100 bytes at all times */
+ g_object_set (sink, "bytes-min", 100, NULL);
+ g_object_set (sink, "sync-method", 4, NULL); /* 3 = burst_keyframe */
+ g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */
+ g_object_set (sink, "burst-value", (guint64) 80, NULL);
+
+ fail_if (pipe (pfd1) == -1);
+ fail_if (pipe (pfd2) == -1);
+ fail_if (pipe (pfd3) == -1);
+
+ ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
+
+ caps = gst_caps_from_string ("application/x-gst-check");
+ GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
+
+ /* push buffers in, 9 * 16 bytes = 144 bytes */
+ for (i = 0; i < 9; i++) {
+ gchar *data;
+
+ buffer = gst_buffer_new_and_alloc (16);
+ gst_buffer_set_caps (buffer, caps);
+
+ /* mark most buffers as delta */
+ if (i != 0 && i != 4 && i != 8)
+ GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
+
+ /* copy some id */
+ data = (gchar *) GST_BUFFER_DATA (buffer);
+ g_snprintf (data, 16, "deadbee%08x", i);
+
+ fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+ }
+
+ /* check that at least 7 buffers (112 bytes) are in the queue */
+ g_object_get (sink, "buffers-queued", &buffers_queued, NULL);
+ fail_if (buffers_queued != 7);
+
+ /* now add the clients */
+ g_signal_emit_by_name (sink, "add", pfd1[1]);
+ g_signal_emit_by_name (sink, "add_full", pfd2[1], 4,
+ 3, (guint64) 50, 3, (guint64) 90);
+ g_signal_emit_by_name (sink, "add_full", pfd3[1], 4,
+ 3, (guint64) 50, 3, (guint64) 50);
+
+ /* push last buffer to make client fds ready for reading */
+ for (i = 9; i < 10; i++) {
+ gchar *data;
+
+ buffer = gst_buffer_new_and_alloc (16);
+ gst_buffer_set_caps (buffer, caps);
+ GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
+
+ /* copy some id */
+ data = (gchar *) GST_BUFFER_DATA (buffer);
+ g_snprintf (data, 16, "deadbee%08x", i);
+
+ fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+ }
+
+ /* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes),
+ * keyframe at buffer 4 */
+ GST_DEBUG ("Reading from client 1");
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000004", 16) == 0);
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+ /* second client only bursts 50 bytes = 4 buffers, there is
+ * no keyframe above min and below max, so get one below min */
+ GST_DEBUG ("Reading from client 2");
+ fail_if (read (pfd2[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+ fail_if (read (pfd2[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+ /* third client only bursts 50 bytes = 4 buffers, we can't send
+ * more than 50 bytes so we only get 2 buffers (32 bytes). */
+ GST_DEBUG ("Reading from client 3");
+ fail_if (read (pfd3[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+ fail_if (read (pfd3[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+ GST_DEBUG ("cleaning up multifdsink");
+ ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+ cleanup_multifdsink (sink);
+
+ ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+ gst_caps_unref (caps);
+}
+
+GST_END_TEST;
+
+/* keep 100 bytes and burst 80 bytes to clients */
+GST_START_TEST (test_burst_client_bytes_with_keyframe)
+{
+ GstElement *sink;
+ GstBuffer *buffer;
+ GstCaps *caps;
+ int pfd1[2];
+ int pfd2[2];
+ int pfd3[2];
+ gchar data[16];
+ guint64 bytes_served;
+ gint i;
+ guint buffers_queued;
+
+ sink = setup_multifdsink ();
+ /* make sure we keep at least 100 bytes at all times */
+ g_object_set (sink, "bytes-min", 100, NULL);
+ g_object_set (sink, "sync-method", 5, NULL); /* 3 = burst_with_keyframe */
+ g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */
+ g_object_set (sink, "burst-value", (guint64) 80, NULL);
+
+ fail_if (pipe (pfd1) == -1);
+ fail_if (pipe (pfd2) == -1);
+ fail_if (pipe (pfd3) == -1);
+
+ ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
+
+ caps = gst_caps_from_string ("application/x-gst-check");
+ GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
+
+ /* push buffers in, 9 * 16 bytes = 144 bytes */
+ for (i = 0; i < 9; i++) {
+ gchar *data;
+
+ buffer = gst_buffer_new_and_alloc (16);
+ gst_buffer_set_caps (buffer, caps);
+
+ /* mark most buffers as delta */
+ if (i != 0 && i != 4 && i != 8)
+ GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
+
+ /* copy some id */
+ data = (gchar *) GST_BUFFER_DATA (buffer);
+ g_snprintf (data, 16, "deadbee%08x", i);
+
+ fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+ }
+
+ /* check that at least 7 buffers (112 bytes) are in the queue */
+ g_object_get (sink, "buffers-queued", &buffers_queued, NULL);
+ fail_if (buffers_queued != 7);
+
+ /* now add the clients */
+ g_signal_emit_by_name (sink, "add", pfd1[1]);
+ g_signal_emit_by_name (sink, "add_full", pfd2[1], 5,
+ 3, (guint64) 50, 3, (guint64) 90);
+ g_signal_emit_by_name (sink, "add_full", pfd3[1], 5,
+ 3, (guint64) 50, 3, (guint64) 50);
+
+ /* push last buffer to make client fds ready for reading */
+ for (i = 9; i < 10; i++) {
+ gchar *data;
+
+ buffer = gst_buffer_new_and_alloc (16);
+ gst_buffer_set_caps (buffer, caps);
+ GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
+
+ /* copy some id */
+ data = (gchar *) GST_BUFFER_DATA (buffer);
+ g_snprintf (data, 16, "deadbee%08x", i);
+
+ fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+ }
+
+ /* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes),
+ * keyframe at buffer 4 */
+ GST_DEBUG ("Reading from client 1");
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000004", 16) == 0);
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+ fail_if (read (pfd1[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+ /* second client only bursts 50 bytes = 4 buffers, there is
+ * no keyframe above min and below max, so send min */
+ GST_DEBUG ("Reading from client 2");
+ fail_if (read (pfd2[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
+ fail_if (read (pfd2[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
+ fail_if (read (pfd2[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+ fail_if (read (pfd2[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+ /* third client only bursts 50 bytes = 4 buffers, we can't send
+ * more than 50 bytes so we only get 3 buffers (48 bytes). */
+ GST_DEBUG ("Reading from client 3");
+ fail_if (read (pfd3[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
+ fail_if (read (pfd3[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+ fail_if (read (pfd3[0], data, 16) < 16);
+ fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+ GST_DEBUG ("cleaning up multifdsink");
+ ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+ cleanup_multifdsink (sink);
+
+ ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+ gst_caps_unref (caps);
+}
+
+GST_END_TEST;
+
/* FIXME: add test simulating chained oggs where:
* sync-method is burst-on-connect
* (when multifdsink actually does burst-on-connect based on byte size, not
* an old client still needs to read from before the new streamheaders
* a new client gets the new streamheaders
*/
-
Suite *
multifdsink_suite (void)
{
tcase_add_test (tc_chain, test_add_client);
tcase_add_test (tc_chain, test_streamheader);
tcase_add_test (tc_chain, test_change_streamheader);
+ tcase_add_test (tc_chain, test_burst_client_bytes);
+ tcase_add_test (tc_chain, test_burst_client_bytes_keyframe);
+ tcase_add_test (tc_chain, test_burst_client_bytes_with_keyframe);
return s;
}