gst/tcp/gstmultifdsink.*: Added shiny new burst-on-connect methods.
authorWim Taymans <wim.taymans@gmail.com>
Mon, 19 Jun 2006 17:12:57 +0000 (17:12 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Mon, 19 Jun 2006 17:12:57 +0000 (17:12 +0000)
Original commit message from CVS:
* gst/tcp/gstmultifdsink.c: (gst_sync_method_get_type),
(gst_unit_type_get_type), (gst_multi_fd_sink_class_init),
(gst_multi_fd_sink_init), (gst_multi_fd_sink_add_full),
(gst_multi_fd_sink_add), (gst_multi_fd_sink_handle_client_read),
(find_syncframe), (find_limits), (assign_value),
(count_burst_unit), (gst_multi_fd_sink_new_client),
(gst_multi_fd_sink_handle_client_write),
(gst_multi_fd_sink_queue_buffer), (gst_multi_fd_sink_render),
(gst_multi_fd_sink_set_property), (gst_multi_fd_sink_get_property),
(gst_multi_fd_sink_change_state):
* gst/tcp/gstmultifdsink.h:
Added shiny new burst-on-connect methods.
Add properties to control the minimal amount of data queued.
Small cleanups.
API: bytes-min property
API: time-min property
API: buffers-min property
API: burst-unit property
API: burst-value property
API: add-full signal
* gst/tcp/gsttcp-marshal.list:
Added new marshaller code for the new signal.
* tests/check/elements/multifdsink.c: (GST_START_TEST),
(multifdsink_suite):
Added testcases for new burst methods.

ChangeLog
gst/tcp/gstmultifdsink.c
gst/tcp/gstmultifdsink.h
gst/tcp/gsttcp-marshal.list
tests/check/elements/multifdsink.c

index 7793b2a..0121bcb 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,33 @@
+2006-06-19  Wim Taymans  <wim@fluendo.com>
+
+       * gst/tcp/gstmultifdsink.c: (gst_sync_method_get_type),
+       (gst_unit_type_get_type), (gst_multi_fd_sink_class_init),
+       (gst_multi_fd_sink_init), (gst_multi_fd_sink_add_full),
+       (gst_multi_fd_sink_add), (gst_multi_fd_sink_handle_client_read),
+       (find_syncframe), (find_limits), (assign_value),
+       (count_burst_unit), (gst_multi_fd_sink_new_client),
+       (gst_multi_fd_sink_handle_client_write),
+       (gst_multi_fd_sink_queue_buffer), (gst_multi_fd_sink_render),
+       (gst_multi_fd_sink_set_property), (gst_multi_fd_sink_get_property),
+       (gst_multi_fd_sink_change_state):
+       * gst/tcp/gstmultifdsink.h:
+       Added shiny new burst-on-connect methods.
+       Add properties to control the minimal amount of data queued.
+       Small cleanups.
+       API: bytes-min property
+       API: time-min property
+       API: buffers-min property
+       API: burst-unit property
+       API: burst-value property
+       API: add-full signal
+
+       * gst/tcp/gsttcp-marshal.list:
+       Added new marshaller code for the new signal.
+
+       * tests/check/elements/multifdsink.c: (GST_START_TEST),
+       (multifdsink_suite):
+       Added testcases for new burst methods.
+
 2006-06-19  Edward Hervey  <edward@fluendo.com>
 
        * ext/theora/theoradec.c: (clip_buffer), (theora_dec_push):
index 7dfce3e..4381c53 100644 (file)
  * each descriptor added, the "client-added" signal will be called.
  * </para>
  * <para>
+ * As of version 0.10.8, a client can also be added with the "add-full" signal
+ * that allows for more control over what and how much data a client 
+ * initially receives.
+ * </para>
+ * <para>
  * Clients can be removed from multifdsink by emiting the "remove" signal. For
  * each descriptor removed, the "client-removed" signal will be called. The
  * "client-removed" signal can also be fired when multifdsink decides that a
  * client is not active anymore or, depending on the value of the
  * "recover-policy," if the client is reading to slow.
  * In all cases, multifdsink will never ever close a file descriptor itself.
- * The user of multifdsink is responsible for closing the file descriptor.
+ * The user of multifdsink is responsible for closing all file descriptors.
  * This can for example be done in response to the "client-fd-removed" signal.
  * Note that multifdsink still has a reference to the file descriptor when the
  * "client-removed" signal is emited so that "get-stats" can be performed on
  * the descriptor; It is therefore not allowed to close the file descriptor in
- * the "client-removed" signal, use the "client-fd-removed" signal to close the
- * fd.
+ * the "client-removed" signal, use the "client-fd-removed" signal to safely 
+ * close the fd.
  * </para>
  * <para>
  * Multifdsink internally keeps a queue of the incomming buffers and uses a
  * </para>
  * <para>
  * When adding a client to multifdsink, the "sync-method" property will define
- * which buffer will be sent first to the client. Clients can be sent
- * respectively the most recent buffer (which might not be decodable by the
- * client when it is not a keyframe), the next keyframe received in multifdsink
- * (which can take some time depending on the keyframe rate, or the last
- * received keyframe (which will cause a burst-on-connect).
+ * which buffer in the queued buffers will be sent first to the client. Clients 
+ * can be sent respectively the most recent buffer (which might not be decodable
+ * by the client when it is not a keyframe), the next keyframe received in 
+ * multifdsink (which can take some time depending on the keyframe rate, or the
+ * last received keyframe (which will cause a simpl burst-on-connect). 
+ * Multifdsink will always keep at least one keyframe in its internal buffers
+ * when the sync-mode is set to latest-keyframe.
+ * </para>
+ * <para>
+ * Multifdsink can be instructed to keep at least a minimum amount of data
+ * expressed in time or byte units in its internal queues with the the 
+ * "time-min" and "bytes-min" properties respectively. These properties are
+ * usefull if the application adds clients with the "add-full" signal to
+ * make sure that a burst connect can actually be honored. 
  * </para>
  * <para>
  * When streaming data, clients are allowed to read at a different rate than
  * buffer queue.
  * </para>
  * <para>
- * multifdsink will synchronize on the clock before serving the buffers to the
- * clients.
+ * multifdsink will by default synchronize on the clock before serving the buffers
+ * to the clients. This behaviour can be disabled by setting the sync 
+ * property to FALSE. Multifdsink will be default not do QoS and will never
+ * drop late buffers.
  * </para>
  * </refsect2>
  *
- * Last reviewed on 2006-04-28 (0.10.7)
+ * Last reviewed on 2006-06-13 (0.10.9)
  */
 
 #ifdef HAVE_CONFIG_H
@@ -147,6 +163,7 @@ enum
 {
   /* methods */
   SIGNAL_ADD,
+  SIGNAL_ADD_BURST,
   SIGNAL_REMOVE,
   SIGNAL_CLEAR,
   SIGNAL_GET_STATS,
@@ -160,38 +177,51 @@ enum
 };
 
 /* this is really arbitrarily chosen */
-#define DEFAULT_PROTOCOL                 GST_TCP_PROTOCOL_NONE
-#define DEFAULT_MODE                     GST_FDSET_MODE_POLL
+#define DEFAULT_PROTOCOL                GST_TCP_PROTOCOL_NONE
+#define DEFAULT_MODE                    GST_FDSET_MODE_POLL
 #define DEFAULT_BUFFERS_MAX             -1
 #define DEFAULT_BUFFERS_SOFT_MAX        -1
+#define DEFAULT_TIME_MIN                -1
+#define DEFAULT_BYTES_MIN               -1
+#define DEFAULT_BUFFERS_MIN             -1
 #define DEFAULT_UNIT_TYPE               GST_UNIT_TYPE_BUFFERS
 #define DEFAULT_UNITS_MAX               -1
 #define DEFAULT_UNITS_SOFT_MAX          -1
-#define DEFAULT_RECOVER_POLICY           GST_RECOVER_POLICY_NONE
-#define DEFAULT_TIMEOUT                  0
-#define DEFAULT_SYNC_METHOD              GST_SYNC_METHOD_LATEST
+#define DEFAULT_RECOVER_POLICY          GST_RECOVER_POLICY_NONE
+#define DEFAULT_TIMEOUT                 0
+#define DEFAULT_SYNC_METHOD             GST_SYNC_METHOD_LATEST
+
+#define DEFAULT_BURST_UNIT              GST_UNIT_TYPE_UNDEFINED
+#define DEFAULT_BURST_VALUE             0
 
 enum
 {
-  ARG_0,
-  ARG_PROTOCOL,
-  ARG_MODE,
-  ARG_BUFFERS_QUEUED,
-  ARG_BYTES_QUEUED,
-  ARG_TIME_QUEUED,
-
-  ARG_UNIT_TYPE,
-  ARG_UNITS_MAX,
-  ARG_UNITS_SOFT_MAX,
-
-  ARG_BUFFERS_MAX,
-  ARG_BUFFERS_SOFT_MAX,
-
-  ARG_RECOVER_POLICY,
-  ARG_TIMEOUT,
-  ARG_SYNC_METHOD,
-  ARG_BYTES_TO_SERVE,
-  ARG_BYTES_SERVED,
+  PROP_0,
+  PROP_PROTOCOL,
+  PROP_MODE,
+  PROP_BUFFERS_QUEUED,
+  PROP_BYTES_QUEUED,
+  PROP_TIME_QUEUED,
+
+  PROP_UNIT_TYPE,
+  PROP_UNITS_MAX,
+  PROP_UNITS_SOFT_MAX,
+
+  PROP_BUFFERS_MAX,
+  PROP_BUFFERS_SOFT_MAX,
+
+  PROP_TIME_MIN,
+  PROP_BYTES_MIN,
+  PROP_BUFFERS_MIN,
+
+  PROP_RECOVER_POLICY,
+  PROP_TIMEOUT,
+  PROP_SYNC_METHOD,
+  PROP_BYTES_TO_SERVE,
+  PROP_BYTES_SERVED,
+
+  PROP_BURST_UNIT,
+  PROP_BURST_VALUE,
 };
 
 #define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type())
@@ -231,6 +261,13 @@ gst_sync_method_get_type (void)
     {GST_SYNC_METHOD_LATEST_KEYFRAME,
           "Serve everything since the latest keyframe (burst)",
         "latest-keyframe"},
+    {GST_SYNC_METHOD_BURST, "Serve burst-value data to client", "burst"},
+    {GST_SYNC_METHOD_BURST_KEYFRAME,
+          "Serve burst-value data starting on a keyframe",
+        "burst-keyframe"},
+    {GST_SYNC_METHOD_BURST_WITH_KEYFRAME,
+          "Serve burst-value data preferably starting on a keyframe",
+        "burst-with-keyframe"},
     {0, NULL, NULL},
   };
 
@@ -240,13 +277,13 @@ gst_sync_method_get_type (void)
   return sync_method_type;
 }
 
-#if NOT_IMPLEMENTED
 #define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type())
 static GType
 gst_unit_type_get_type (void)
 {
   static GType unit_type_type = 0;
   static const GEnumValue unit_type[] = {
+    {GST_UNIT_TYPE_UNDEFINED, "Undefined", "undefined"},
     {GST_UNIT_TYPE_BUFFERS, "Buffers", "buffers"},
     {GST_UNIT_TYPE_BYTES, "Bytes", "bytes"},
     {GST_UNIT_TYPE_TIME, "Time", "time"},
@@ -258,7 +295,6 @@ gst_unit_type_get_type (void)
   }
   return unit_type_type;
 }
-#endif
 
 #define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type())
 static GType
@@ -328,74 +364,97 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
   gobject_class->get_property = gst_multi_fd_sink_get_property;
   gobject_class->finalize = gst_multi_fd_sink_finalize;
 
-  g_object_class_install_property (gobject_class, ARG_PROTOCOL,
+  g_object_class_install_property (gobject_class, PROP_PROTOCOL,
       g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
           GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
-  g_object_class_install_property (gobject_class, ARG_MODE,
+  g_object_class_install_property (gobject_class, PROP_MODE,
       g_param_spec_enum ("mode", "Mode",
           "The mode for selecting activity on the fds", GST_TYPE_FDSET_MODE,
           DEFAULT_MODE, G_PARAM_READWRITE));
 
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX,
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFERS_MAX,
       g_param_spec_int ("buffers-max", "Buffers max",
-          "max number of buffers to queue (-1 = no limit)", -1, G_MAXINT,
-          DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE));
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_SOFT_MAX,
-      g_param_spec_int ("buffers-soft-max", "Buffers soft max",
+          "max number of buffers to queue for a client (-1 = no limit)", -1,
+          G_MAXINT, DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass),
+      PROP_BUFFERS_SOFT_MAX, g_param_spec_int ("buffers-soft-max",
+          "Buffers soft max",
           "Recover client when going over this limit (-1 = no limit)", -1,
           G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE));
 
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_MIN,
+      g_param_spec_int ("bytes-min", "Bytes min",
+          "min number of bytes to queue (-1 = as little as possible)", -1,
+          G_MAXINT, DEFAULT_BYTES_MIN, G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIME_MIN,
+      g_param_spec_int64 ("time-min", "Time min",
+          "min number of time to queue (-1 = as litte as possible)", -1,
+          G_MAXINT64, DEFAULT_TIME_MIN, G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFERS_MIN,
+      g_param_spec_int64 ("buffers-min", "Buffers min",
+          "min number of buffers to queue (-1 = as litte as possible)", -1,
+          G_MAXINT, DEFAULT_BUFFERS_MIN, G_PARAM_READWRITE));
+
 #if NOT_IMPLEMENTED
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNIT_TYPE,
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_UNIT_TYPE,
       g_param_spec_enum ("unit-type", "Units type",
           "The unit to measure the max/soft-max/queued properties",
           GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, G_PARAM_READWRITE));
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_MAX,
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_UNITS_MAX,
       g_param_spec_int ("units-max", "Units max",
           "max number of units to queue (-1 = no limit)", -1, G_MAXINT,
           DEFAULT_UNITS_MAX, G_PARAM_READWRITE));
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_SOFT_MAX,
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_UNITS_SOFT_MAX,
       g_param_spec_int ("units-soft-max", "Units soft max",
           "Recover client when going over this limit (-1 = no limit)", -1,
           G_MAXINT, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE));
 #endif
 
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED,
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFERS_QUEUED,
       g_param_spec_uint ("buffers-queued", "Buffers queued",
           "Number of buffers currently queued", 0, G_MAXUINT, 0,
           G_PARAM_READABLE));
 #if NOT_IMPLEMENTED
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_QUEUED,
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_QUEUED,
       g_param_spec_uint ("bytes-queued", "Bytes queued",
           "Number of bytes currently queued", 0, G_MAXUINT, 0,
           G_PARAM_READABLE));
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIME_QUEUED,
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIME_QUEUED,
       g_param_spec_uint64 ("time-queued", "Time queued",
           "Number of time currently queued", 0, G_MAXUINT64, 0,
           G_PARAM_READABLE));
 #endif
 
-  g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY,
+  g_object_class_install_property (gobject_class, PROP_RECOVER_POLICY,
       g_param_spec_enum ("recover-policy", "Recover Policy",
           "How to recover when client reaches the soft max",
           GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY, G_PARAM_READWRITE));
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT,
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
       g_param_spec_uint64 ("timeout", "Timeout",
           "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)",
           0, G_MAXUINT64, DEFAULT_TIMEOUT, G_PARAM_READWRITE));
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SYNC_METHOD,
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SYNC_METHOD,
       g_param_spec_enum ("sync-method", "Sync Method",
           "How to sync new clients to the stream",
           GST_TYPE_SYNC_METHOD, DEFAULT_SYNC_METHOD, G_PARAM_READWRITE));
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_TO_SERVE,
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_TO_SERVE,
       g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
           "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
           G_PARAM_READABLE));
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_SERVED,
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_SERVED,
       g_param_spec_uint64 ("bytes-served", "Bytes served",
           "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
           G_PARAM_READABLE));
 
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BURST_UNIT,
+      g_param_spec_enum ("burst-unit", "Burst unit",
+          "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
+          GST_TYPE_UNIT_TYPE, DEFAULT_BURST_UNIT, G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BURST_VALUE,
+      g_param_spec_uint64 ("burst-value", "Burst value",
+          "The amount of burst expressed in burst-unit",
+          0, G_MAXUINT64, DEFAULT_BURST_VALUE, G_PARAM_READWRITE));
+
   /**
    * GstMultiFdSink::add:
    * @gstmultifdsink: the multifdsink element to emit this signal on
@@ -408,6 +467,24 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
       G_STRUCT_OFFSET (GstMultiFdSinkClass, add),
       NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
   /**
+   * GstMultiFdSink::add-full:
+   * @gstmultifdsink: the multifdsink element to emit this signal on
+   * @fd:             the file descriptor to add to multifdsink
+   * @keyframe:       start bursting from a keyframe
+   * @unit_type:      the unit-type of @value
+   * @value:          the minimal amount of data to burst expressed in
+   *                  @format units.
+   *
+   * Hand the given open file descriptor to multifdsink to write to and
+   * specify the burst parameters for the new connection.
+   */
+  gst_multi_fd_sink_signals[SIGNAL_ADD_BURST] =
+      g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+      G_STRUCT_OFFSET (GstMultiFdSinkClass, add_full),
+      NULL, NULL, gst_tcp_marshal_VOID__INT_BOOLEAN_INT_UINT64_INT_UINT64,
+      G_TYPE_NONE, 6, G_TYPE_INT, G_TYPE_BOOLEAN, GST_TYPE_UNIT_TYPE,
+      G_TYPE_UINT64, GST_TYPE_UNIT_TYPE, G_TYPE_UINT64);
+  /**
    * GstMultiFdSink::remove:
    * @gstmultifdsink: the multifdsink element to emit this signal on
    * @fd:             the file descriptor to remove from multifdsink
@@ -510,6 +587,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
   gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_render);
 
   klass->add = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add);
+  klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add_full);
   klass->remove = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove);
   klass->clear = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear);
   klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_get_stats);
@@ -533,10 +611,15 @@ gst_multi_fd_sink_init (GstMultiFdSink * this, GstMultiFdSinkClass * klass)
   this->unit_type = DEFAULT_UNIT_TYPE;
   this->units_max = DEFAULT_UNITS_MAX;
   this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
+  this->time_min = DEFAULT_TIME_MIN;
+  this->bytes_min = DEFAULT_BYTES_MIN;
+  this->buffers_min = DEFAULT_BUFFERS_MIN;
   this->recover_policy = DEFAULT_RECOVER_POLICY;
 
   this->timeout = DEFAULT_TIMEOUT;
-  this->sync_method = DEFAULT_SYNC_METHOD;
+  this->def_sync_method = DEFAULT_SYNC_METHOD;
+  this->def_burst_unit = DEFAULT_BURST_UNIT;
+  this->def_burst_value = DEFAULT_BURST_VALUE;
 
   this->header_flags = 0;
 }
@@ -555,9 +638,11 @@ gst_multi_fd_sink_finalize (GObject * object)
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
-/* "add" signal implemntation */
+/* "add-full" signal implemntation */
 void
-gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd)
+gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
+    GstSyncMethod sync_method, GstUnitType min_unit, guint64 min_value,
+    GstUnitType max_unit, guint64 max_value)
 {
   GstTCPClient *client;
   GList *clink;
@@ -567,6 +652,12 @@ gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd)
 
   GST_DEBUG_OBJECT (sink, "[fd %5d] adding client", fd);
 
+  /* do limits check if we can */
+  if (min_unit == max_unit) {
+    if (max_value != -1 && min_value != -1 && max_value < min_value)
+      goto wrong_limits;
+  }
+
   /* create client datastructure */
   client = g_new0 (GstTCPClient, 1);
   client->fd.fd = fd;
@@ -578,28 +669,25 @@ gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd)
   client->dropped_buffers = 0;
   client->avg_queue_size = 0;
   client->new_connection = TRUE;
+  client->burst_min_unit = min_unit;
+  client->burst_min_value = min_value;
+  client->burst_max_unit = max_unit;
+  client->burst_max_value = max_value;
+  client->sync_method = sync_method;
 
   /* update start time */
   g_get_current_time (&now);
   client->connect_time = GST_TIMEVAL_TO_TIME (now);
   client->disconnect_time = 0;
-  /* send last activity time to connect time */
-  client->last_activity_time = GST_TIMEVAL_TO_TIME (now);
+  /* set last activity time to connect time */
+  client->last_activity_time = client->connect_time;
 
   CLIENTS_LOCK (sink);
 
   /* check the hash to find a duplicate fd */
   clink = g_hash_table_lookup (sink->fd_hash, &client->fd.fd);
-  if (clink != NULL) {
-    client->status = GST_CLIENT_STATUS_DUPLICATE;
-    CLIENTS_UNLOCK (sink);
-    GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd);
-    g_signal_emit (G_OBJECT (sink),
-        gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd,
-        client->status);
-    g_free (client);
-    return;
-  }
+  if (clink != NULL)
+    goto duplicate;
 
   /* we can add the fd now */
   clink = sink->clients = g_list_prepend (sink->clients, client);
@@ -627,6 +715,37 @@ gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd)
 
   g_signal_emit (G_OBJECT (sink),
       gst_multi_fd_sink_signals[SIGNAL_CLIENT_ADDED], 0, fd);
+
+  return;
+
+  /* errors */
+wrong_limits:
+  {
+    GST_WARNING_OBJECT (sink,
+        "[fd %5d] wrong values min =%" G_GUINT64_FORMAT ", max=%"
+        G_GUINT64_FORMAT ", unit %d specified when adding client", fd,
+        min_value, max_value, min_unit);
+    return;
+  }
+duplicate:
+  {
+    client->status = GST_CLIENT_STATUS_DUPLICATE;
+    CLIENTS_UNLOCK (sink);
+    GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd);
+    g_signal_emit (G_OBJECT (sink),
+        gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd,
+        client->status);
+    g_free (client);
+    return;
+  }
+}
+
+/* "add" signal implemntation */
+void
+gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd)
+{
+  gst_multi_fd_sink_add_full (sink, fd, sink->def_sync_method,
+      sink->def_burst_unit, sink->def_burst_value, sink->def_burst_unit, -1);
 }
 
 /* "remove" signal implemntation */
@@ -739,7 +858,7 @@ gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd)
   return result;
 }
 
-/* should be called with the clientslock held.
+/* should be called with the clientslock helt.
  * Note that we don't close the fd as we didn't open it in the first
  * place. An application should connect to the client-removed signal and
  * close the fd itself.
@@ -846,13 +965,8 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink,
 
   fd = client->fd.fd;
 
-  if (ioctl (fd, FIONREAD, &avail) < 0) {
-    GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)",
-        fd, g_strerror (errno), errno);
-    client->status = GST_CLIENT_STATUS_ERROR;
-    ret = FALSE;
-    return ret;
-  }
+  if (ioctl (fd, FIONREAD, &avail) < 0)
+    goto ioctl_failed;
 
   GST_DEBUG_OBJECT (sink, "[fd %5d] select reports client read of %d bytes",
       fd, avail);
@@ -900,11 +1014,21 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink,
     while (avail > 0);
   }
   return ret;
+
+  /* ERRORS */
+ioctl_failed:
+  {
+    GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)",
+        fd, g_strerror (errno), errno);
+    client->status = GST_CLIENT_STATUS_ERROR;
+    return FALSE;
+  }
 }
 
 /* Queue raw data for this client, creating a new buffer.
  * This takes ownership of the data by
- * setting it as GST_BUFFER_MALLOCDATA() on the created buffer
+ * setting it as GST_BUFFER_MALLOCDATA() on the created buffer so
+ * be sure to pass g_free()-able @data.
  */
 static gboolean
 gst_multi_fd_sink_client_queue_data (GstMultiFdSink * sink,
@@ -1100,87 +1224,381 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink,
   return TRUE;
 }
 
+/* find the keyframe in the list of buffers starting the
+ * search from @idx. @direction as -1 will search backwards, 
+ * 1 will search forwards.
+ * Returns: the index or -1 if there is no keyframe after idx.
+ */
+static gint
+find_syncframe (GstMultiFdSink * sink, gint idx, gint direction)
+{
+  gint i, len, result;
+
+  /* take length of queued buffers */
+  len = sink->bufqueue->len;
+
+  /* assume we don't find a keyframe */
+  result = -1;
+
+  /* then loop over all buffers to find the first keyframe */
+  for (i = idx; i >= 0 && i < len; i += direction) {
+    GstBuffer *buf;
+
+    buf = g_array_index (sink->bufqueue, GstBuffer *, i);
+    if (is_sync_frame (sink, buf)) {
+      GST_LOG_OBJECT (sink, "found keyframe at %d from %d, direction %d",
+          i, idx, direction);
+      result = i;
+      break;
+    }
+  }
+  return result;
+}
+
+#define find_next_syncframe(s,i)       find_syncframe(s,i,1)
+#define find_prev_syncframe(s,i)       find_syncframe(s,i,-1)
+
+/* find the positions in the buffer queue where *_min and *_max
+ * is satisfied
+ */
+/* count the amount of data in the buffers and return the index
+ * that satifies the given limits.
+ *
+ * Returns: index @idx in the buffer queue so that the given limits are
+ * satisfied. TRUE if all the limits could be satisfied, FALSE if not
+ * enough data was in the queue.
+ *
+ * FIXME, this code might now work if any of the units is in buffers...
+ */
+static gboolean
+find_limits (GstMultiFdSink * sink,
+    gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
+    gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max)
+{
+  GstClockTime first, time;
+  gint i, len, bytes;
+  gboolean result, max_hit;
+
+  /* take length of queue */
+  len = sink->bufqueue->len;
+
+  /* this must hold */
+  g_assert (len > 0);
+
+  GST_LOG_OBJECT (sink,
+      "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT
+      ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min,
+      buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max,
+      GST_TIME_ARGS (time_max));
+
+  /* do the trivial buffer limit test */
+  if (buffers_min != -1 && len < buffers_min) {
+    *min_idx = len - 1;
+    *max_idx = len - 1;
+    return FALSE;
+  }
+
+  result = FALSE;
+  /* else count bytes and time */
+  first = -1;
+  bytes = 0;
+  /* unset limits */
+  *min_idx = -1;
+  *max_idx = -1;
+  max_hit = FALSE;
+
+  i = 0;
+  /* loop through the buffers, when a limit is ok, mark it 
+   * as -1, we have at least one buffer in the queue. */
+  do {
+    GstBuffer *buf;
+
+    /* if we checked all min limits, update result */
+    if (bytes_min == -1 && time_min == -1 && *min_idx == -1) {
+      /* don't go below 0 */
+      *min_idx = MAX (i - 1, 0);
+    }
+    /* if we reached one max limit break out */
+    if (max_hit) {
+      /* i > 0 when we get here, we subtract one to get the position
+       * of the previous buffer. */
+      *max_idx = i - 1;
+      /* we have valid complete result if we found a min_idx too */
+      result = *min_idx != -1;
+      break;
+    }
+    buf = g_array_index (sink->bufqueue, GstBuffer *, i);
+
+    bytes += GST_BUFFER_SIZE (buf);
+
+    /* take timestamp and save for the base first timestamp */
+    if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) {
+      if (first == -1)
+        first = time;
+
+      /* increase max usage if we did not fill enough. Note that
+       * buffers are sorted from new to old, so the first timestamp is
+       * bigger than the next one. */
+      if (time_min != -1 && first - time >= time_min)
+        time_min = -1;
+      if (time_max != -1 && first - time >= time_max)
+        max_hit = TRUE;
+    }
+    /* time is OK or unknown, check and increase if not enough bytes */
+    if (bytes_min != -1) {
+      if (bytes >= bytes_min)
+        bytes_min = -1;
+    }
+    if (bytes_max != -1) {
+      if (bytes >= bytes_max) {
+        max_hit = TRUE;
+      }
+    }
+    i++;
+  }
+  while (i < len);
+
+  /* if we did not hit the max or min limit, set to buffer size */
+  if (*max_idx == -1)
+    *max_idx = len - 1;
+  /* make sure min does not exceed max */
+  if (*min_idx == -1)
+    *min_idx = *max_idx;
+
+  return result;
+}
+
+/* parse the unit/value pair and assign it to the result value of the
+ * right type, leave the other values untouched 
+ *
+ * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
+ */
+static gboolean
+assign_value (GstUnitType unit, guint64 value, gint * bytes, gint * buffers,
+    GstClockTime * time)
+{
+  gboolean res = TRUE;
+
+  /* set only the limit of the given format to the given value */
+  switch (unit) {
+    case GST_UNIT_TYPE_BUFFERS:
+      *buffers = (gint) value;
+      break;
+    case GST_UNIT_TYPE_TIME:
+      *time = value;
+      break;
+    case GST_UNIT_TYPE_BYTES:
+      *bytes = (gint) value;
+      break;
+    case GST_UNIT_TYPE_UNDEFINED:
+    default:
+      res = FALSE;
+      break;
+  }
+  return res;
+}
+
+/* count the index in the buffer queue to satisfy the given unit
+ * and value pair starting from buffer at index 0.
+ *
+ * Returns: TRUE if there was enuough data in the queue to satisfy the
+ * burst values. @idx contains the index in the buffer that contains enough
+ * data to satisfy the limits or the last buffer in the queue when the
+ * function returns FALSE.
+ */
+static gboolean
+count_burst_unit (GstMultiFdSink * sink, gint * min_idx, GstUnitType min_unit,
+    guint64 min_value, gint * max_idx, GstUnitType max_unit, guint64 max_value)
+{
+  gint bytes_min = -1, buffers_min = -1;
+  gint bytes_max = -1, buffers_max = -1;
+  GstClockTime time_min = -1, time_max = -1;
+
+  assign_value (min_unit, min_value, &bytes_min, &buffers_min, &time_min);
+  assign_value (max_unit, max_value, &bytes_max, &buffers_max, &time_max);
+
+  return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
+      max_idx, bytes_max, buffers_max, time_max);
+}
+
 /* decide where in the current buffer queue this new client should start
- * receiving buffers from
+ * receiving buffers from.
+ * This function is called whenever a client is connected and has not yet
+ * received a buffer.
  * If this returns -1, it means that we haven't found a good point to
  * start streaming from yet, and this function should be called again later
- * when more buffers have arrived */
+ * when more buffers have arrived.
+ */
 static gint
 gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
 {
   gint result;
 
-  switch (sink->sync_method) {
+  switch (client->sync_method) {
+    case GST_SYNC_METHOD_LATEST:
+      /* no syncing, we are happy with whatever the client is going to get */
+      GST_LOG_OBJECT (sink, "no client sync needed");
+      result = client->bufpos;
+      break;
     case GST_SYNC_METHOD_NEXT_KEYFRAME:
     {
+      GstBuffer *buf;
+
       /* if the buffer at the head of the queue is a sync point we can proceed,
        * else we need to skip the buffer and wait for a new one */
       GST_LOG_OBJECT (sink,
           "[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd,
           client->bufpos);
 
-      /* the client is not yet aligned to a buffer */
-      if (client->bufpos < 0) {
-        result = -1;
-      } else {
-        GstBuffer *buf;
-        gint i;
-
-        for (i = client->bufpos; i >= 0; i--) {
-          /* get the buffer for the client */
-          buf = g_array_index (sink->bufqueue, GstBuffer *, i);
-          if (is_sync_frame (sink, buf)) {
-            GST_LOG_OBJECT (sink, "[fd %5d] new client, found sync",
-                client->fd.fd);
-            result = i;
-            goto done;
-          } else {
-            /* client is not on a buffer, need to skip this buffer and
-             * wait some more */
-            GST_LOG_OBJECT (sink, "[fd %5d] new client, skipping buffer",
-                client->fd.fd);
-            client->bufpos--;
-          }
-        }
-        result = -1;
+      /* get the buffer for the client */
+      buf = g_array_index (sink->bufqueue, GstBuffer *, 0);
+      if (is_sync_frame (sink, buf)) {
+        GST_LOG_OBJECT (sink, "[fd %5d] new client, found sync", client->fd.fd);
+        result = 0;
+        goto done;
       }
+      /* client is not on a syncbuffer, need to skip this buffer and
+       * wait some more */
+      GST_LOG_OBJECT (sink, "[fd %5d] new client, skipping buffer",
+          client->fd.fd);
+      client->bufpos = -1;
+      result = -1;
       break;
     }
     case GST_SYNC_METHOD_LATEST_KEYFRAME:
     {
-      /* FIXME for new clients we constantly scan the complete
-       * buffer queue for sync point whenever a buffer is added. This is
-       * suboptimal because if we cannot find a sync point the first time,
-       * the algorithm should behave as GST_SYNC_METHOD_NEXT_KEYFRAME */
-      gint i, len;
-
       GST_LOG_OBJECT (sink, "[fd %5d] new client, bufpos %d, bursting keyframe",
           client->fd.fd, client->bufpos);
 
-      /* take length of queued buffers */
-      len = sink->bufqueue->len;
-      /* assume we don't find a keyframe */
-      result = -1;
-      /* then loop over all buffers to find the first keyframe */
-      for (i = 0; i < len; i++) {
-        GstBuffer *buf;
+      /* for new clients we initially scan the complete buffer queue for 
+       * a sync point when a buffer is added. If we don't find a keyframe,
+       * we need to wait for the next keyframe and so we change the client's
+       * sync method to GST_SYNC_METHOD_NEXT_KEYFRAME.
+       */
+      result = find_next_syncframe (sink, 0);
+      if (result != -1)
+        goto done;
 
-        buf = g_array_index (sink->bufqueue, GstBuffer *, i);
-        if (is_sync_frame (sink, buf)) {
-          /* found a keyframe, return its position */
-          GST_LOG_OBJECT (sink, "found keyframe at %d", i);
-          result = i;
-          goto done;
-        }
-      }
       GST_LOG_OBJECT (sink, "no keyframe found");
       /* throw client to the waiting state */
       client->bufpos = -1;
+      /* and make client sync to next keyframe */
+      client->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
+      break;
+    }
+    case GST_SYNC_METHOD_BURST:
+    {
+      gboolean ok;
+      gint max;
+
+      /* move to the position where we satisfy the client's burst
+       * parameters. If we could not satisfy the parameters because there
+       * is not enough data, we just send what we have (which is in result).
+       * We use the max value to limit the search 
+       */
+      ok = count_burst_unit (sink, &result, client->burst_min_unit,
+          client->burst_min_value, &max, client->burst_max_unit,
+          client->burst_max_value);
+
+      GST_LOG_OBJECT (sink, "min %d, max %d", result, max);
+
+      /* we hit the max and it is below the min, use that then */
+      if (max != -1 && max <= result) {
+        result = MAX (max - 1, 0);
+      }
+      break;
+    }
+    case GST_SYNC_METHOD_BURST_KEYFRAME:
+    {
+      gboolean ok;
+      gint min_idx, max_idx;
+      gint next_syncframe, prev_syncframe;
+
+      /* BURST_KEYFRAME:
+       *
+       * _always_ start sending a keyframe to the client. We first search
+       * a keyframe between min/max limits. If there is none, we send it the
+       * last keyframe before min. If there is none, the behaviour is like
+       * NEXT_KEYFRAME.
+       */
+      /* gather burst limits */
+      ok = count_burst_unit (sink, &min_idx, client->burst_min_unit,
+          client->burst_min_value, &max_idx, client->burst_max_unit,
+          client->burst_max_value);
+
+      GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
+
+      /* first find a keyframe after min_idx */
+      next_syncframe = find_next_syncframe (sink, min_idx);
+      if (next_syncframe != -1 && next_syncframe < max_idx) {
+        /* we have a valid keyframe and it's below the max */
+        GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
+        result = next_syncframe;
+        break;
+      }
+
+      /* no valid keyframe, try to find one below min */
+      prev_syncframe = find_prev_syncframe (sink, min_idx);
+      if (prev_syncframe != -1) {
+        GST_WARNING_OBJECT (sink,
+            "using keyframe below min in BURST_KEYFRAME sync mode");
+        result = prev_syncframe;
+        break;
+      }
+
+      /* no prev keyframe or not enough data  */
+      GST_WARNING_OBJECT (sink,
+          "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next");
+
+      /* throw client to the waiting state */
+      client->bufpos = -1;
+      /* and make client sync to next keyframe */
+      client->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
+      result = -1;
+      break;
+    }
+    case GST_SYNC_METHOD_BURST_WITH_KEYFRAME:
+    {
+      gboolean ok;
+      gint min_idx, max_idx;
+      gint next_syncframe;
+
+      /* BURST_WITH_KEYFRAME:
+       *
+       * try to start sending a keyframe to the client. We first search
+       * a keyframe between min/max limits. If there is none, we send it the
+       * amount of data up 'till min.
+       */
+      /* gather enough data to burst */
+      ok = count_burst_unit (sink, &min_idx, client->burst_min_unit,
+          client->burst_min_value, &max_idx, client->burst_max_unit,
+          client->burst_max_value);
+
+      GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
+
+      /* first find a keyframe after min_idx */
+      next_syncframe = find_next_syncframe (sink, min_idx);
+      if (next_syncframe != -1 && next_syncframe < max_idx) {
+        /* we have a valid keyframe and it's below the max */
+        GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
+        result = next_syncframe;
+        break;
+      }
+
+      /* no keyframe, send data from min_idx */
+      GST_WARNING_OBJECT (sink, "using min in BURST_WITH_KEYFRAME sync mode");
+
+      /* make sure we don't go over the max limit */
+      if (max_idx != -1 && max_idx <= min_idx) {
+        result = MAX (max_idx - 1, 0);
+      } else {
+        result = min_idx;
+      }
+
       break;
     }
     default:
-      /* no syncing, we are happy with whatever the client is going to get */
-      GST_LOG_OBJECT (sink, "no client syn needed");
+      g_warning ("unknown sync method %d", client->sync_method);
       result = client->bufpos;
       break;
   }
@@ -1332,16 +1750,9 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
           /* nothing serious, resource was unavailable, try again later */
           more = FALSE;
         } else if (errno == ECONNRESET) {
-          GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing",
-              fd);
-          client->status = GST_CLIENT_STATUS_CLOSED;
-          return FALSE;
+          goto connection_reset;
         } else {
-          GST_WARNING_OBJECT (sink,
-              "[fd %5d] could not write, removing client: %s (%d)", fd,
-              g_strerror (errno), errno);
-          client->status = GST_CLIENT_STATUS_ERROR;
-          return FALSE;
+          goto write_error;
         }
       } else {
         if (wrote < maxsize) {
@@ -1366,6 +1777,22 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
   } while (more);
 
   return TRUE;
+
+  /* ERRORS */
+connection_reset:
+  {
+    GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing", fd);
+    client->status = GST_CLIENT_STATUS_CLOSED;
+    return FALSE;
+  }
+write_error:
+  {
+    GST_WARNING_OBJECT (sink,
+        "[fd %5d] could not write, removing client: %s (%d)", fd,
+        g_strerror (errno), errno);
+    client->status = GST_CLIENT_STATUS_ERROR;
+    return FALSE;
+  }
 }
 
 /* calculate the new position for a client after recovery. This function
@@ -1509,10 +1936,29 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
     }
   }
 
+  /* make sure we respect bytes-min, buffers-min and time-min when they are set */
+  {
+    gint usage, max;
+
+    GST_LOG_OBJECT (sink,
+        "extending queue %d to respect time_min %" GST_TIME_FORMAT
+        ", bytes_min %d, buffers_min %d", max_buffer_usage,
+        GST_TIME_ARGS (sink->time_min), sink->bytes_min, sink->buffers_min);
+
+    /* get index where the limits are ok, we don't really care if all limits
+     * are ok, we just queue as much as we need. We also don't compare against
+     * the max limits. */
+    find_limits (sink, &usage, sink->bytes_min, sink->buffers_min,
+        sink->time_min, &max, -1, -1, -1);
+
+    max_buffer_usage = MAX (max_buffer_usage, usage + 1);
+    GST_LOG_OBJECT (sink, "extended queue to %d", max_buffer_usage);
+  }
+
   /* now look for sync points and make sure there is at least one
-   * sync point in the queue. We only do this if the burst mode
-   * is enabled. */
-  if (sink->sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME) {
+   * sync point in the queue. We only do this if the LATEST_KEYFRAME
+   * mode is selected */
+  if (sink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME) {
     /* no point in searching beyond the queue length */
     gint limit = queuelen;
     GstBuffer *buf;
@@ -1535,6 +1981,8 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
     GST_LOG_OBJECT (sink, "max buffer usage is now %d", max_buffer_usage);
   }
 
+  GST_LOG_OBJECT (sink, "len %d, usage %d", queuelen, max_buffer_usage);
+
   /* nobody is referencing units after max_buffer_usage so we can
    * remove them from the queue. We remove them in reverse order as
    * this is the most optimal for GArray. */
@@ -1737,6 +2185,7 @@ static GstFlowReturn
 gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
 {
   GstMultiFdSink *sink;
+  gboolean in_caps;
   GstCaps *bufcaps, *padcaps;
 
   sink = GST_MULTI_FD_SINK (bsink);
@@ -1769,12 +2218,14 @@ gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
   g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, GST_MULTI_FD_SINK_OPEN),
       GST_FLOW_ERROR);
 
-  GST_LOG_OBJECT (sink, "received buffer %p", buf);
+  in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
+
+  GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %d", buf, in_caps);
+
   /* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS,
    * it means we're getting new streamheader buffers, and we should clear
    * the old ones */
-  if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS) &&
-      sink->previous_buffer_in_caps == FALSE) {
+  if (in_caps && sink->previous_buffer_in_caps == FALSE) {
     GST_DEBUG_OBJECT (sink,
         "receiving new IN_CAPS buffers, clearing old streamheader");
     g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL);
@@ -1782,6 +2233,9 @@ gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
     sink->streamheader = NULL;
   }
 
+  /* save the current in_caps */
+  sink->previous_buffer_in_caps = in_caps;
+
   /* if the incoming buffer is marked as IN CAPS, then we assume for now
    * it's a streamheader that needs to be sent to each new client, so we
    * put it on our internal list of streamheader buffers.
@@ -1790,20 +2244,17 @@ gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
    *
    * We don't send the buffer to the client, since streamheaders are sent
    * separately when necessary. */
-  if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS)) {
-    sink->previous_buffer_in_caps = TRUE;
+  if (in_caps) {
     GST_DEBUG_OBJECT (sink,
         "appending IN_CAPS buffer with length %d to streamheader",
         GST_BUFFER_SIZE (buf));
     sink->streamheader = g_slist_append (sink->streamheader, buf);
-    return GST_FLOW_OK;
-  }
-
-  sink->previous_buffer_in_caps = FALSE;
-  /* queue the buffer */
-  gst_multi_fd_sink_queue_buffer (sink, buf);
+  } else {
+    /* queue the buffer, this is a regular data buffer. */
+    gst_multi_fd_sink_queue_buffer (sink, buf);
 
-  sink->bytes_to_serve += GST_BUFFER_SIZE (buf);
+    sink->bytes_to_serve += GST_BUFFER_SIZE (buf);
+  }
 
   return GST_FLOW_OK;
 }
@@ -1814,39 +2265,53 @@ gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
 {
   GstMultiFdSink *multifdsink;
 
-  g_return_if_fail (GST_IS_MULTI_FD_SINK (object));
   multifdsink = GST_MULTI_FD_SINK (object);
 
   switch (prop_id) {
-    case ARG_PROTOCOL:
+    case PROP_PROTOCOL:
       multifdsink->protocol = g_value_get_enum (value);
       break;
-    case ARG_MODE:
+    case PROP_MODE:
       multifdsink->mode = g_value_get_enum (value);
       break;
-    case ARG_BUFFERS_MAX:
+    case PROP_BUFFERS_MAX:
       multifdsink->units_max = g_value_get_int (value);
       break;
-    case ARG_BUFFERS_SOFT_MAX:
+    case PROP_BUFFERS_SOFT_MAX:
       multifdsink->units_soft_max = g_value_get_int (value);
       break;
-    case ARG_UNIT_TYPE:
+    case PROP_TIME_MIN:
+      multifdsink->time_min = g_value_get_int64 (value);
+      break;
+    case PROP_BYTES_MIN:
+      multifdsink->bytes_min = g_value_get_int (value);
+      break;
+    case PROP_BUFFERS_MIN:
+      multifdsink->buffers_min = g_value_get_int (value);
+      break;
+    case PROP_UNIT_TYPE:
       multifdsink->unit_type = g_value_get_enum (value);
       break;
-    case ARG_UNITS_MAX:
+    case PROP_UNITS_MAX:
       multifdsink->units_max = g_value_get_int (value);
       break;
-    case ARG_UNITS_SOFT_MAX:
+    case PROP_UNITS_SOFT_MAX:
       multifdsink->units_soft_max = g_value_get_int (value);
       break;
-    case ARG_RECOVER_POLICY:
+    case PROP_RECOVER_POLICY:
       multifdsink->recover_policy = g_value_get_enum (value);
       break;
-    case ARG_TIMEOUT:
+    case PROP_TIMEOUT:
       multifdsink->timeout = g_value_get_uint64 (value);
       break;
-    case ARG_SYNC_METHOD:
-      multifdsink->sync_method = g_value_get_enum (value);
+    case PROP_SYNC_METHOD:
+      multifdsink->def_sync_method = g_value_get_enum (value);
+      break;
+    case PROP_BURST_UNIT:
+      multifdsink->def_burst_unit = g_value_get_enum (value);
+      break;
+    case PROP_BURST_VALUE:
+      multifdsink->def_burst_value = g_value_get_uint64 (value);
       break;
 
     default:
@@ -1861,55 +2326,69 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value,
 {
   GstMultiFdSink *multifdsink;
 
-  g_return_if_fail (GST_IS_MULTI_FD_SINK (object));
   multifdsink = GST_MULTI_FD_SINK (object);
 
   switch (prop_id) {
-    case ARG_PROTOCOL:
+    case PROP_PROTOCOL:
       g_value_set_enum (value, multifdsink->protocol);
       break;
-    case ARG_MODE:
+    case PROP_MODE:
       g_value_set_enum (value, multifdsink->mode);
       break;
-    case ARG_BUFFERS_MAX:
+    case PROP_BUFFERS_MAX:
       g_value_set_int (value, multifdsink->units_max);
       break;
-    case ARG_BUFFERS_SOFT_MAX:
+    case PROP_BUFFERS_SOFT_MAX:
       g_value_set_int (value, multifdsink->units_soft_max);
       break;
-    case ARG_BUFFERS_QUEUED:
+    case PROP_TIME_MIN:
+      g_value_set_int64 (value, multifdsink->time_min);
+      break;
+    case PROP_BYTES_MIN:
+      g_value_set_int (value, multifdsink->bytes_min);
+      break;
+    case PROP_BUFFERS_MIN:
+      g_value_set_int (value, multifdsink->buffers_min);
+      break;
+    case PROP_BUFFERS_QUEUED:
       g_value_set_uint (value, multifdsink->buffers_queued);
       break;
-    case ARG_BYTES_QUEUED:
+    case PROP_BYTES_QUEUED:
       g_value_set_uint (value, multifdsink->bytes_queued);
       break;
-    case ARG_TIME_QUEUED:
+    case PROP_TIME_QUEUED:
       g_value_set_uint64 (value, multifdsink->time_queued);
       break;
-    case ARG_UNIT_TYPE:
+    case PROP_UNIT_TYPE:
       g_value_set_enum (value, multifdsink->unit_type);
       break;
-    case ARG_UNITS_MAX:
+    case PROP_UNITS_MAX:
       g_value_set_int (value, multifdsink->units_max);
       break;
-    case ARG_UNITS_SOFT_MAX:
+    case PROP_UNITS_SOFT_MAX:
       g_value_set_int (value, multifdsink->units_soft_max);
       break;
-    case ARG_RECOVER_POLICY:
+    case PROP_RECOVER_POLICY:
       g_value_set_enum (value, multifdsink->recover_policy);
       break;
-    case ARG_TIMEOUT:
+    case PROP_TIMEOUT:
       g_value_set_uint64 (value, multifdsink->timeout);
       break;
-    case ARG_SYNC_METHOD:
-      g_value_set_enum (value, multifdsink->sync_method);
+    case PROP_SYNC_METHOD:
+      g_value_set_enum (value, multifdsink->def_sync_method);
       break;
-    case ARG_BYTES_TO_SERVE:
+    case PROP_BYTES_TO_SERVE:
       g_value_set_uint64 (value, multifdsink->bytes_to_serve);
       break;
-    case ARG_BYTES_SERVED:
+    case PROP_BYTES_SERVED:
       g_value_set_uint64 (value, multifdsink->bytes_served);
       break;
+    case PROP_BURST_UNIT:
+      g_value_set_enum (value, multifdsink->def_burst_unit);
+      break;
+    case PROP_BURST_VALUE:
+      g_value_set_uint64 (value, multifdsink->def_burst_value);
+      break;
 
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -2086,6 +2565,7 @@ gst_multi_fd_sink_change_state (GstElement * element, GstStateChange transition)
   /* ERRORS */
 start_failed:
   {
+    /* error message was posted */
     return GST_STATE_CHANGE_FAILURE;
   }
 }
index 7e61573..7533424 100644 (file)
@@ -73,9 +73,15 @@ typedef enum
 
 /**
  * GstSyncMethod:
- * @GST_SYNC_METHOD_LATEST         : client receives most recent buffer
- * @GST_SYNC_METHOD_NEXT_KEYFRAME  : client receives next keyframe
- * @GST_SYNC_METHOD_LATEST_KEYFRAME: client receives latest keyframe (burst)
+ * @GST_SYNC_METHOD_LATEST              : client receives most recent buffer
+ * @GST_SYNC_METHOD_NEXT_KEYFRAME       : client receives next keyframe
+ * @GST_SYNC_METHOD_LATEST_KEYFRAME     : client receives latest keyframe (burst)
+ * @GST_SYNC_METHOD_BURST               : client receives specific amount of data
+ * @GST_SYNC_METHOD_BURST_KEYFRAME      : client receives specific amount of data 
+ *                                        starting from latest keyframe
+ * @GST_SYNC_METHOD_BURST_WITH_KEYFRAME : client receives specific amount of data from
+ *                                        a keyframe, or if there is not enough data after
+ *                                        the keyframe, starting before the keyframe
  *
  * This enum defines the selection of the first buffer that is sent
  * to a new client.
@@ -85,18 +91,23 @@ typedef enum
   GST_SYNC_METHOD_LATEST,
   GST_SYNC_METHOD_NEXT_KEYFRAME,
   GST_SYNC_METHOD_LATEST_KEYFRAME,
+  GST_SYNC_METHOD_BURST,
+  GST_SYNC_METHOD_BURST_KEYFRAME,
+  GST_SYNC_METHOD_BURST_WITH_KEYFRAME,
 } GstSyncMethod;
 
 /**
  * GstUnitType:
- * @GST_UNIT_TYPE_BUFFERS: a buffer
- * @GST_UNIT_TYPE_TIME   : timeunits (in nanoseconds)
- * @GST_UNIT_TYPE_BYTES  : bytes
+ * @GST_UNIT_TYPE_UNDEFINED: undefined
+ * @GST_UNIT_TYPE_BUFFERS  : buffers
+ * @GST_UNIT_TYPE_TIME     : timeunits (in nanoseconds)
+ * @GST_UNIT_TYPE_BYTES    : bytes
  *
  * The units used to specify limits.
  */
 typedef enum
 {
+  GST_UNIT_TYPE_UNDEFINED,
   GST_UNIT_TYPE_BUFFERS,
   GST_UNIT_TYPE_TIME,
   GST_UNIT_TYPE_BYTES,
@@ -144,6 +155,13 @@ typedef struct {
   gboolean caps_sent;
   gboolean new_connection;
 
+  /* method to sync client when connecting */
+  GstSyncMethod sync_method;
+  GstUnitType   burst_min_unit;
+  guint64       burst_min_value;
+  GstUnitType   burst_max_unit;
+  guint64       burst_max_value;
+
   GstCaps *caps;                /* caps of last queued buffer */
 
   /* stats */
@@ -192,12 +210,24 @@ struct _GstMultiFdSink {
   gboolean running;     /* the thread state */
   GThread *thread;      /* the sender thread */
 
+  /* these values are used to check if a client is reading fast
+   * enough and to control receovery */
   GstUnitType unit_type;/* the type of the units */
-  gint units_max;       /* max units to queue */
+  gint units_max;       /* max units to queue for a client */
   gint units_soft_max;  /* max units a client can lag before recovery starts */
   GstRecoverPolicy recover_policy;
   GstClockTime timeout; /* max amount of nanoseconds to remain idle */
-  GstSyncMethod sync_method;    /* what method to use for connecting clients */
+
+  GstSyncMethod def_sync_method;    /* what method to use for connecting clients */
+  GstUnitType   def_burst_unit;
+  guint64       def_burst_value;
+
+  /* these values are used to control the amount of data
+   * kept in the queues. It allows clients to perform a burst
+   * on connect. */
+  gint   bytes_min;    /* min number of bytes to queue */
+  gint64 time_min;     /* min time to queue */
+  gint   buffers_min;   /* min number of buffers to queue */
 
   /* stats */
   gint buffers_queued;  /* number of queued buffers */
@@ -212,6 +242,9 @@ struct _GstMultiFdSinkClass {
 
   /* element methods */
   void          (*add)          (GstMultiFdSink *sink, int fd);
+  void          (*add_full)     (GstMultiFdSink *sink, int fd, GstSyncMethod sync,
+                                GstUnitType format, guint64 value, 
+                                GstUnitType max_unit, guint64 max_value);
   void          (*remove)       (GstMultiFdSink *sink, int fd);
   void          (*clear)        (GstMultiFdSink *sink);
   GValueArray*  (*get_stats)    (GstMultiFdSink *sink, int fd);
@@ -231,10 +264,14 @@ struct _GstMultiFdSinkClass {
 GType gst_multi_fd_sink_get_type (void);
 
 void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd);
+void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync, 
+               GstUnitType min_unit, guint64 min_value,
+               GstUnitType max_unit, guint64 max_value);
 void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd);
 void gst_multi_fd_sink_clear (GstMultiFdSink *sink);
 GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, int fd);
 
+
 G_END_DECLS
 
 #endif /* __GST_MULTI_FD_SINK_H__ */
index 49f4e52..3a7fa9b 100644 (file)
@@ -1,4 +1,5 @@
 VOID:STRING,UINT
 VOID:INT
 VOID:INT,BOXED
+VOID:INT,BOOLEAN,INT,UINT64,INT,UINT64
 BOXED:INT
index 2cc8e7f..024b276 100644 (file)
@@ -125,6 +125,7 @@ GST_START_TEST (test_add_client)
   memcpy (GST_BUFFER_DATA (buffer), "dead", 4);
   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
 
+  GST_DEBUG ("reading");
   fail_if (read (pfd[0], data, 4) < 4);
   fail_unless (strncmp (data, "dead", 4) == 0);
   wait_bytes_served (sink, 4);
@@ -441,6 +442,358 @@ GST_START_TEST (test_change_streamheader)
 
 GST_END_TEST;
 
+/* keep 100 bytes and burst 80 bytes to clients */
+GST_START_TEST (test_burst_client_bytes)
+{
+  GstElement *sink;
+  GstBuffer *buffer;
+  GstCaps *caps;
+  int pfd1[2];
+  int pfd2[2];
+  int pfd3[2];
+  gchar data[16];
+  guint64 bytes_served;
+  gint i;
+  guint buffers_queued;
+
+  sink = setup_multifdsink ();
+  /* make sure we keep at least 100 bytes at all times */
+  g_object_set (sink, "bytes-min", 100, NULL);
+  g_object_set (sink, "sync-method", 3, NULL);  /* 3 = burst */
+  g_object_set (sink, "burst-unit", 3, NULL);   /* 3 = bytes */
+  g_object_set (sink, "burst-value", (guint64) 80, NULL);
+
+  fail_if (pipe (pfd1) == -1);
+  fail_if (pipe (pfd2) == -1);
+  fail_if (pipe (pfd3) == -1);
+
+  ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
+
+  caps = gst_caps_from_string ("application/x-gst-check");
+  GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
+
+  /* push buffers in, 9 * 16 bytes = 144 bytes */
+  for (i = 0; i < 9; i++) {
+    gchar *data;
+
+    buffer = gst_buffer_new_and_alloc (16);
+    gst_buffer_set_caps (buffer, caps);
+
+    /* copy some id */
+    data = (gchar *) GST_BUFFER_DATA (buffer);
+    g_snprintf (data, 16, "deadbee%08x", i);
+
+    fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+  }
+
+  /* check that at least 7 buffers (112 bytes) are in the queue */
+  g_object_get (sink, "buffers-queued", &buffers_queued, NULL);
+  fail_if (buffers_queued != 7);
+
+  /* now add the clients */
+  g_signal_emit_by_name (sink, "add", pfd1[1]);
+  g_signal_emit_by_name (sink, "add_full", pfd2[1], 3,
+      3, (guint64) 50, 3, (guint64) 200);
+  g_signal_emit_by_name (sink, "add_full", pfd3[1], 3,
+      3, (guint64) 50, 3, (guint64) 50);
+
+  /* push last buffer to make client fds ready for reading */
+  for (i = 9; i < 10; i++) {
+    gchar *data;
+
+    buffer = gst_buffer_new_and_alloc (16);
+    gst_buffer_set_caps (buffer, caps);
+
+    /* copy some id */
+    data = (gchar *) GST_BUFFER_DATA (buffer);
+    g_snprintf (data, 16, "deadbee%08x", i);
+
+    fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+  }
+
+  /* now we should only read the last 5 buffers (5 * 16 = 80 bytes) */
+  GST_DEBUG ("Reading from client 1");
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+  /* second client only bursts 50 bytes = 4 buffers (we get 4 buffers since
+   * the max alows it) */
+  GST_DEBUG ("Reading from client 2");
+  fail_if (read (pfd2[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
+  fail_if (read (pfd2[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
+  fail_if (read (pfd2[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+  fail_if (read (pfd2[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+  /* third client only bursts 50 bytes = 4 buffers, we can't send
+   * more than 50 bytes so we only get 3 buffers (48 bytes). */
+  GST_DEBUG ("Reading from client 3");
+  fail_if (read (pfd3[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
+  fail_if (read (pfd3[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+  fail_if (read (pfd3[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+  GST_DEBUG ("cleaning up multifdsink");
+  ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+  cleanup_multifdsink (sink);
+
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+  gst_caps_unref (caps);
+}
+
+GST_END_TEST;
+
+/* keep 100 bytes and burst 80 bytes to clients */
+GST_START_TEST (test_burst_client_bytes_keyframe)
+{
+  GstElement *sink;
+  GstBuffer *buffer;
+  GstCaps *caps;
+  int pfd1[2];
+  int pfd2[2];
+  int pfd3[2];
+  gchar data[16];
+  guint64 bytes_served;
+  gint i;
+  guint buffers_queued;
+
+  sink = setup_multifdsink ();
+  /* make sure we keep at least 100 bytes at all times */
+  g_object_set (sink, "bytes-min", 100, NULL);
+  g_object_set (sink, "sync-method", 4, NULL);  /* 3 = burst_keyframe */
+  g_object_set (sink, "burst-unit", 3, NULL);   /* 3 = bytes */
+  g_object_set (sink, "burst-value", (guint64) 80, NULL);
+
+  fail_if (pipe (pfd1) == -1);
+  fail_if (pipe (pfd2) == -1);
+  fail_if (pipe (pfd3) == -1);
+
+  ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
+
+  caps = gst_caps_from_string ("application/x-gst-check");
+  GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
+
+  /* push buffers in, 9 * 16 bytes = 144 bytes */
+  for (i = 0; i < 9; i++) {
+    gchar *data;
+
+    buffer = gst_buffer_new_and_alloc (16);
+    gst_buffer_set_caps (buffer, caps);
+
+    /* mark most buffers as delta */
+    if (i != 0 && i != 4 && i != 8)
+      GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
+
+    /* copy some id */
+    data = (gchar *) GST_BUFFER_DATA (buffer);
+    g_snprintf (data, 16, "deadbee%08x", i);
+
+    fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+  }
+
+  /* check that at least 7 buffers (112 bytes) are in the queue */
+  g_object_get (sink, "buffers-queued", &buffers_queued, NULL);
+  fail_if (buffers_queued != 7);
+
+  /* now add the clients */
+  g_signal_emit_by_name (sink, "add", pfd1[1]);
+  g_signal_emit_by_name (sink, "add_full", pfd2[1], 4,
+      3, (guint64) 50, 3, (guint64) 90);
+  g_signal_emit_by_name (sink, "add_full", pfd3[1], 4,
+      3, (guint64) 50, 3, (guint64) 50);
+
+  /* push last buffer to make client fds ready for reading */
+  for (i = 9; i < 10; i++) {
+    gchar *data;
+
+    buffer = gst_buffer_new_and_alloc (16);
+    gst_buffer_set_caps (buffer, caps);
+    GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
+
+    /* copy some id */
+    data = (gchar *) GST_BUFFER_DATA (buffer);
+    g_snprintf (data, 16, "deadbee%08x", i);
+
+    fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+  }
+
+  /* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes),
+   * keyframe at buffer 4 */
+  GST_DEBUG ("Reading from client 1");
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000004", 16) == 0);
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+  /* second client only bursts 50 bytes = 4 buffers, there is
+   * no keyframe above min and below max, so get one below min */
+  GST_DEBUG ("Reading from client 2");
+  fail_if (read (pfd2[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+  fail_if (read (pfd2[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+  /* third client only bursts 50 bytes = 4 buffers, we can't send
+   * more than 50 bytes so we only get 2 buffers (32 bytes). */
+  GST_DEBUG ("Reading from client 3");
+  fail_if (read (pfd3[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+  fail_if (read (pfd3[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+  GST_DEBUG ("cleaning up multifdsink");
+  ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+  cleanup_multifdsink (sink);
+
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+  gst_caps_unref (caps);
+}
+
+GST_END_TEST;
+
+/* keep 100 bytes and burst 80 bytes to clients */
+GST_START_TEST (test_burst_client_bytes_with_keyframe)
+{
+  GstElement *sink;
+  GstBuffer *buffer;
+  GstCaps *caps;
+  int pfd1[2];
+  int pfd2[2];
+  int pfd3[2];
+  gchar data[16];
+  guint64 bytes_served;
+  gint i;
+  guint buffers_queued;
+
+  sink = setup_multifdsink ();
+  /* make sure we keep at least 100 bytes at all times */
+  g_object_set (sink, "bytes-min", 100, NULL);
+  g_object_set (sink, "sync-method", 5, NULL);  /* 3 = burst_with_keyframe */
+  g_object_set (sink, "burst-unit", 3, NULL);   /* 3 = bytes */
+  g_object_set (sink, "burst-value", (guint64) 80, NULL);
+
+  fail_if (pipe (pfd1) == -1);
+  fail_if (pipe (pfd2) == -1);
+  fail_if (pipe (pfd3) == -1);
+
+  ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
+
+  caps = gst_caps_from_string ("application/x-gst-check");
+  GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
+
+  /* push buffers in, 9 * 16 bytes = 144 bytes */
+  for (i = 0; i < 9; i++) {
+    gchar *data;
+
+    buffer = gst_buffer_new_and_alloc (16);
+    gst_buffer_set_caps (buffer, caps);
+
+    /* mark most buffers as delta */
+    if (i != 0 && i != 4 && i != 8)
+      GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
+
+    /* copy some id */
+    data = (gchar *) GST_BUFFER_DATA (buffer);
+    g_snprintf (data, 16, "deadbee%08x", i);
+
+    fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+  }
+
+  /* check that at least 7 buffers (112 bytes) are in the queue */
+  g_object_get (sink, "buffers-queued", &buffers_queued, NULL);
+  fail_if (buffers_queued != 7);
+
+  /* now add the clients */
+  g_signal_emit_by_name (sink, "add", pfd1[1]);
+  g_signal_emit_by_name (sink, "add_full", pfd2[1], 5,
+      3, (guint64) 50, 3, (guint64) 90);
+  g_signal_emit_by_name (sink, "add_full", pfd3[1], 5,
+      3, (guint64) 50, 3, (guint64) 50);
+
+  /* push last buffer to make client fds ready for reading */
+  for (i = 9; i < 10; i++) {
+    gchar *data;
+
+    buffer = gst_buffer_new_and_alloc (16);
+    gst_buffer_set_caps (buffer, caps);
+    GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
+
+    /* copy some id */
+    data = (gchar *) GST_BUFFER_DATA (buffer);
+    g_snprintf (data, 16, "deadbee%08x", i);
+
+    fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+  }
+
+  /* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes),
+   * keyframe at buffer 4 */
+  GST_DEBUG ("Reading from client 1");
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000004", 16) == 0);
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+  fail_if (read (pfd1[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+  /* second client only bursts 50 bytes = 4 buffers, there is
+   * no keyframe above min and below max, so send min */
+  GST_DEBUG ("Reading from client 2");
+  fail_if (read (pfd2[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
+  fail_if (read (pfd2[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
+  fail_if (read (pfd2[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+  fail_if (read (pfd2[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+  /* third client only bursts 50 bytes = 4 buffers, we can't send
+   * more than 50 bytes so we only get 3 buffers (48 bytes). */
+  GST_DEBUG ("Reading from client 3");
+  fail_if (read (pfd3[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
+  fail_if (read (pfd3[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
+  fail_if (read (pfd3[0], data, 16) < 16);
+  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+
+  GST_DEBUG ("cleaning up multifdsink");
+  ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+  cleanup_multifdsink (sink);
+
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+  gst_caps_unref (caps);
+}
+
+GST_END_TEST;
+
 /* FIXME: add test simulating chained oggs where:
  * sync-method is burst-on-connect
  * (when multifdsink actually does burst-on-connect based on byte size, not
@@ -448,7 +801,6 @@ GST_END_TEST;
  * an old client still needs to read from before the new streamheaders
  * a new client gets the new streamheaders
  */
-
 Suite *
 multifdsink_suite (void)
 {
@@ -460,6 +812,9 @@ multifdsink_suite (void)
   tcase_add_test (tc_chain, test_add_client);
   tcase_add_test (tc_chain, test_streamheader);
   tcase_add_test (tc_chain, test_change_streamheader);
+  tcase_add_test (tc_chain, test_burst_client_bytes);
+  tcase_add_test (tc_chain, test_burst_client_bytes_keyframe);
+  tcase_add_test (tc_chain, test_burst_client_bytes_with_keyframe);
 
   return s;
 }