multihandlesink: rework to use GST_TYPE_FORMAT
authorThomas Vander Stichele <thomas (at) apestaart (dot) org>
Fri, 27 Jan 2012 17:44:04 +0000 (18:44 +0100)
committerThomas Vander Stichele <thomas (at) apestaart (dot) org>
Sun, 12 Feb 2012 21:23:44 +0000 (22:23 +0100)
gst/tcp/gstmultifdsink.c
gst/tcp/gstmultifdsink.h
gst/tcp/gstmultihandlesink.c
gst/tcp/gstmultihandlesink.h
gst/tcp/gstmultisocketsink.c
gst/tcp/gstmultisocketsink.h
tests/check/elements/multifdsink.c
tests/check/elements/multisocketsink.c

index c57ada9..136b22d 100644 (file)
@@ -148,10 +148,6 @@ enum
 
 /* this is really arbitrarily chosen */
 #define DEFAULT_MODE                    1
-#define DEFAULT_UNIT_FORMAT               GST_FORMAT_BUFFERS
-
-#define DEFAULT_BURST_FORMAT              GST_FORMAT_UNDEFINED
-#define DEFAULT_BURST_VALUE             0
 
 #define DEFAULT_HANDLE_READ             TRUE
 
@@ -160,11 +156,6 @@ enum
   PROP_0,
   PROP_MODE,
 
-  PROP_UNIT_FORMAT,
-
-  PROP_BURST_FORMAT,
-  PROP_BURST_VALUE,
-
   PROP_HANDLE_READ,
 
   PROP_NUM_FDS,
@@ -192,25 +183,6 @@ gst_fdset_mode_get_type (void)
   return fdset_mode_type;
 }
 
-#define GST_TYPE_UNIT_FORMAT (gst_unit_format_get_type())
-static GType
-gst_unit_format_get_type (void)
-{
-  static GType unit_format_type = 0;
-  static const GEnumValue unit_format[] = {
-    {GST_TCP_UNIT_FORMAT_UNDEFINED, "Undefined", "undefined"},
-    {GST_TCP_UNIT_FORMAT_BYTES, "Bytes", "bytes"},
-    {GST_TCP_UNIT_FORMAT_TIME, "Time", "time"},
-    {GST_TCP_UNIT_FORMAT_BUFFERS, "Buffers", "buffers"},
-    {0, NULL, NULL},
-  };
-
-  if (!unit_format_type) {
-    unit_format_type = g_enum_register_static ("GstTCPUnitType", unit_format);
-  }
-  return unit_format_type;
-}
-
 static void gst_multi_fd_sink_finalize (GObject * object);
 
 static void gst_multi_fd_sink_clear_post (GstMultiHandleSink * mhsink);
@@ -266,22 +238,6 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
           GST_TYPE_FDSET_MODE, DEFAULT_MODE,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
-  g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT,
-      g_param_spec_enum ("unit-format", "Units format",
-          "The unit to measure the max/soft-max/queued properties",
-          GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
-  g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
-      g_param_spec_enum ("burst-format", "Burst format",
-          "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
-          GST_TYPE_FORMAT, DEFAULT_BURST_FORMAT,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
-      g_param_spec_uint64 ("burst-value", "Burst value",
-          "The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
-          DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
   /**
    * GstMultiFdSink::handle-read
    *
@@ -331,8 +287,8 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
           add_full), NULL, NULL,
       gst_tcp_marshal_VOID__INT_ENUM_INT_UINT64_INT_UINT64,
-      G_TYPE_NONE, 6, G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_FORMAT,
-      G_TYPE_UINT64, GST_TYPE_UNIT_FORMAT, G_TYPE_UINT64);
+      G_TYPE_NONE, 6, G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_FORMAT,
+      G_TYPE_UINT64, GST_TYPE_FORMAT, G_TYPE_UINT64);
   /**
    * GstMultiFdSink::remove:
    * @gstmultifdsink: the multifdsink element to emit this signal on
@@ -478,11 +434,6 @@ gst_multi_fd_sink_init (GstMultiFdSink * this)
 
   this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal);
 
-  this->unit_format = DEFAULT_UNIT_FORMAT;
-
-  this->def_burst_format = DEFAULT_BURST_FORMAT;
-  this->def_burst_value = DEFAULT_BURST_VALUE;
-
   this->handle_read = DEFAULT_HANDLE_READ;
 }
 
@@ -535,10 +486,10 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
   g_snprintf (mhclient->debug, 30, "[fd %5d]", fd);
 
   client->fd.fd = fd;
-  client->burst_min_format = min_format;
-  client->burst_min_value = min_value;
-  client->burst_max_format = max_format;
-  client->burst_max_value = max_value;
+  mhclient->burst_min_format = min_format;
+  mhclient->burst_min_value = min_value;
+  mhclient->burst_max_format = max_format;
+  mhclient->burst_max_value = max_value;
 
   CLIENTS_LOCK (sink);
 
@@ -613,8 +564,8 @@ gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd)
 
   mhsink = GST_MULTI_HANDLE_SINK (sink);
   gst_multi_fd_sink_add_full (sink, fd, mhsink->def_sync_method,
-      sink->def_burst_format, sink->def_burst_value, sink->def_burst_format,
-      -1);
+      mhsink->def_burst_format, mhsink->def_burst_value,
+      mhsink->def_burst_format, -1);
 }
 
 /* "remove" signal implementation */
@@ -1086,434 +1037,6 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
   return TRUE;
 }
 
-/* Get the number of buffers from the buffer queue needed to satisfy
- * the maximum max in the configured units.
- * If units are not BUFFERS, and there are insufficient buffers in the
- * queue to satify the limit, return len(queue) + 1 */
-static gint
-get_buffers_max (GstMultiFdSink * sink, gint64 max)
-{
-  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-
-  switch (sink->unit_format) {
-    case GST_TCP_UNIT_FORMAT_BUFFERS:
-      return max;
-    case GST_TCP_UNIT_FORMAT_TIME:
-    {
-      GstBuffer *buf;
-      int i;
-      int len;
-      gint64 diff;
-      GstClockTime first = GST_CLOCK_TIME_NONE;
-
-      len = mhsink->bufqueue->len;
-
-      for (i = 0; i < len; i++) {
-        buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
-        if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
-          if (first == -1)
-            first = GST_BUFFER_TIMESTAMP (buf);
-
-          diff = first - GST_BUFFER_TIMESTAMP (buf);
-
-          if (diff > max)
-            return i + 1;
-        }
-      }
-      return len + 1;
-    }
-    case GST_TCP_UNIT_FORMAT_BYTES:
-    {
-      GstBuffer *buf;
-      int i;
-      int len;
-      gint acc = 0;
-
-      len = mhsink->bufqueue->len;
-
-      for (i = 0; i < len; i++) {
-        buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
-        acc += gst_buffer_get_size (buf);
-
-        if (acc > max)
-          return i + 1;
-      }
-      return len + 1;
-    }
-    default:
-      return max;
-  }
-}
-
-/* find the positions in the buffer queue where *_min and *_max
- * is satisfied
- */
-/* count the amount of data in the buffers and return the index
- * that satifies the given limits.
- *
- * Returns: index @idx in the buffer queue so that the given limits are
- * satisfied. TRUE if all the limits could be satisfied, FALSE if not
- * enough data was in the queue.
- *
- * FIXME, this code might now work if any of the units is in buffers...
- */
-static gboolean
-find_limits (GstMultiFdSink * sink,
-    gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
-    gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max)
-{
-  GstClockTime first, time;
-  gint i, len, bytes;
-  gboolean result, max_hit;
-  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-
-  /* take length of queue */
-  len = mhsink->bufqueue->len;
-
-  /* this must hold */
-  g_assert (len > 0);
-
-  GST_LOG_OBJECT (sink,
-      "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT
-      ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min,
-      buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max,
-      GST_TIME_ARGS (time_max));
-
-  /* do the trivial buffer limit test */
-  if (buffers_min != -1 && len < buffers_min) {
-    *min_idx = len - 1;
-    *max_idx = len - 1;
-    return FALSE;
-  }
-
-  result = FALSE;
-  /* else count bytes and time */
-  first = -1;
-  bytes = 0;
-  /* unset limits */
-  *min_idx = -1;
-  *max_idx = -1;
-  max_hit = FALSE;
-
-  i = 0;
-  /* loop through the buffers, when a limit is ok, mark it 
-   * as -1, we have at least one buffer in the queue. */
-  do {
-    GstBuffer *buf;
-
-    /* if we checked all min limits, update result */
-    if (bytes_min == -1 && time_min == -1 && *min_idx == -1) {
-      /* don't go below 0 */
-      *min_idx = MAX (i - 1, 0);
-    }
-    /* if we reached one max limit break out */
-    if (max_hit) {
-      /* i > 0 when we get here, we subtract one to get the position
-       * of the previous buffer. */
-      *max_idx = i - 1;
-      /* we have valid complete result if we found a min_idx too */
-      result = *min_idx != -1;
-      break;
-    }
-    buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
-
-    bytes += gst_buffer_get_size (buf);
-
-    /* take timestamp and save for the base first timestamp */
-    if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) {
-      GST_LOG_OBJECT (sink, "Ts %" GST_TIME_FORMAT " on buffer",
-          GST_TIME_ARGS (time));
-      if (first == -1)
-        first = time;
-
-      /* increase max usage if we did not fill enough. Note that
-       * buffers are sorted from new to old, so the first timestamp is
-       * bigger than the next one. */
-      if (time_min != -1 && first - time >= time_min)
-        time_min = -1;
-      if (time_max != -1 && first - time >= time_max)
-        max_hit = TRUE;
-    } else {
-      GST_LOG_OBJECT (sink, "No timestamp on buffer");
-    }
-    /* time is OK or unknown, check and increase if not enough bytes */
-    if (bytes_min != -1) {
-      if (bytes >= bytes_min)
-        bytes_min = -1;
-    }
-    if (bytes_max != -1) {
-      if (bytes >= bytes_max) {
-        max_hit = TRUE;
-      }
-    }
-    i++;
-  }
-  while (i < len);
-
-  /* if we did not hit the max or min limit, set to buffer size */
-  if (*max_idx == -1)
-    *max_idx = len - 1;
-  /* make sure min does not exceed max */
-  if (*min_idx == -1)
-    *min_idx = *max_idx;
-
-  return result;
-}
-
-/* parse the unit/value pair and assign it to the result value of the
- * right type, leave the other values untouched 
- *
- * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
- */
-static gboolean
-assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers,
-    GstClockTime * time)
-{
-  gboolean res = TRUE;
-
-  /* set only the limit of the given format to the given value */
-  switch (format) {
-    case GST_TCP_UNIT_FORMAT_BUFFERS:
-      *buffers = (gint) value;
-      break;
-    case GST_TCP_UNIT_FORMAT_TIME:
-      *time = value;
-      break;
-    case GST_TCP_UNIT_FORMAT_BYTES:
-      *bytes = (gint) value;
-      break;
-    case GST_TCP_UNIT_FORMAT_UNDEFINED:
-    default:
-      res = FALSE;
-      break;
-  }
-  return res;
-}
-
-/* count the index in the buffer queue to satisfy the given unit
- * and value pair starting from buffer at index 0.
- *
- * Returns: TRUE if there was enough data in the queue to satisfy the
- * burst values. @idx contains the index in the buffer that contains enough
- * data to satisfy the limits or the last buffer in the queue when the
- * function returns FALSE.
- */
-static gboolean
-count_burst_format (GstMultiFdSink * sink, gint * min_idx,
-    GstFormat min_format, guint64 min_value, gint * max_idx,
-    GstFormat max_format, guint64 max_value)
-{
-  gint bytes_min = -1, buffers_min = -1;
-  gint bytes_max = -1, buffers_max = -1;
-  GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE;
-
-  assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min);
-  assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max);
-
-  return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
-      max_idx, bytes_max, buffers_max, time_max);
-}
-
-/* decide where in the current buffer queue this new client should start
- * receiving buffers from.
- * This function is called whenever a client is connected and has not yet
- * received a buffer.
- * If this returns -1, it means that we haven't found a good point to
- * start streaming from yet, and this function should be called again later
- * when more buffers have arrived.
- */
-static gint
-gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
-{
-  gint result;
-  GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
-  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-
-  GST_DEBUG_OBJECT (sink,
-      "[fd %5d] new client, deciding where to start in queue", client->fd.fd);
-  GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long",
-      mhsink->bufqueue->len);
-  switch (mhclient->sync_method) {
-    case GST_SYNC_METHOD_LATEST:
-      /* no syncing, we are happy with whatever the client is going to get */
-      result = mhclient->bufpos;
-      GST_DEBUG_OBJECT (sink,
-          "[fd %5d] SYNC_METHOD_LATEST, position %d", client->fd.fd, result);
-      break;
-    case GST_SYNC_METHOD_NEXT_KEYFRAME:
-    {
-      /* if one of the new buffers (between mhclient->bufpos and 0) in the queue
-       * is a sync point, we can proceed, otherwise we need to keep waiting */
-      GST_LOG_OBJECT (sink,
-          "[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd,
-          mhclient->bufpos);
-
-      result = find_prev_syncframe (mhsink, mhclient->bufpos);
-      if (result != -1) {
-        GST_DEBUG_OBJECT (sink,
-            "[fd %5d] SYNC_METHOD_NEXT_KEYFRAME: result %d",
-            client->fd.fd, result);
-        break;
-      }
-
-      /* client is not on a syncbuffer, need to skip these buffers and
-       * wait some more */
-      GST_LOG_OBJECT (sink,
-          "[fd %5d] new client, skipping buffer(s), no syncpoint found",
-          client->fd.fd);
-      mhclient->bufpos = -1;
-      break;
-    }
-    case GST_SYNC_METHOD_LATEST_KEYFRAME:
-    {
-      GST_DEBUG_OBJECT (sink,
-          "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME", client->fd.fd);
-
-      /* for new clients we initially scan the complete buffer queue for
-       * a sync point when a buffer is added. If we don't find a keyframe,
-       * we need to wait for the next keyframe and so we change the client's
-       * sync method to GST_SYNC_METHOD_NEXT_KEYFRAME.
-       */
-      result = find_next_syncframe (mhsink, 0);
-      if (result != -1) {
-        GST_DEBUG_OBJECT (sink,
-            "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME: result %d", client->fd.fd,
-            result);
-        break;
-      }
-
-      GST_DEBUG_OBJECT (sink,
-          "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, "
-          "switching to SYNC_METHOD_NEXT_KEYFRAME", client->fd.fd);
-      /* throw client to the waiting state */
-      mhclient->bufpos = -1;
-      /* and make client sync to next keyframe */
-      mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
-      break;
-    }
-    case GST_SYNC_METHOD_BURST:
-    {
-      gboolean ok;
-      gint max;
-
-      /* move to the position where we satisfy the client's burst
-       * parameters. If we could not satisfy the parameters because there
-       * is not enough data, we just send what we have (which is in result).
-       * We use the max value to limit the search
-       */
-      ok = count_burst_format (sink, &result, client->burst_min_format,
-          client->burst_min_value, &max, client->burst_max_format,
-          client->burst_max_value);
-      GST_DEBUG_OBJECT (sink,
-          "[fd %5d] SYNC_METHOD_BURST: burst_format returned %d, result %d",
-          client->fd.fd, ok, result);
-
-      GST_LOG_OBJECT (sink, "min %d, max %d", result, max);
-
-      /* we hit the max and it is below the min, use that then */
-      if (max != -1 && max <= result) {
-        result = MAX (max - 1, 0);
-        GST_DEBUG_OBJECT (sink,
-            "[fd %5d] SYNC_METHOD_BURST: result above max, taken down to %d",
-            client->fd.fd, result);
-      }
-      break;
-    }
-    case GST_SYNC_METHOD_BURST_KEYFRAME:
-    {
-      gint min_idx, max_idx;
-      gint next_syncframe, prev_syncframe;
-
-      /* BURST_KEYFRAME:
-       *
-       * _always_ start sending a keyframe to the client. We first search
-       * a keyframe between min/max limits. If there is none, we send it the
-       * last keyframe before min. If there is none, the behaviour is like
-       * NEXT_KEYFRAME.
-       */
-      /* gather burst limits */
-      count_burst_format (sink, &min_idx, client->burst_min_format,
-          client->burst_min_value, &max_idx, client->burst_max_format,
-          client->burst_max_value);
-
-      GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
-
-      /* first find a keyframe after min_idx */
-      next_syncframe = find_next_syncframe (mhsink, min_idx);
-      if (next_syncframe != -1 && next_syncframe < max_idx) {
-        /* we have a valid keyframe and it's below the max */
-        GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
-        result = next_syncframe;
-        break;
-      }
-
-      /* no valid keyframe, try to find one below min */
-      prev_syncframe = find_prev_syncframe (mhsink, min_idx);
-      if (prev_syncframe != -1) {
-        GST_WARNING_OBJECT (sink,
-            "using keyframe below min in BURST_KEYFRAME sync mode");
-        result = prev_syncframe;
-        break;
-      }
-
-      /* no prev keyframe or not enough data  */
-      GST_WARNING_OBJECT (sink,
-          "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next");
-
-      /* throw client to the waiting state */
-      mhclient->bufpos = -1;
-      /* and make client sync to next keyframe */
-      mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
-      result = -1;
-      break;
-    }
-    case GST_SYNC_METHOD_BURST_WITH_KEYFRAME:
-    {
-      gint min_idx, max_idx;
-      gint next_syncframe;
-
-      /* BURST_WITH_KEYFRAME:
-       *
-       * try to start sending a keyframe to the client. We first search
-       * a keyframe between min/max limits. If there is none, we send it the
-       * amount of data up 'till min.
-       */
-      /* gather enough data to burst */
-      count_burst_format (sink, &min_idx, client->burst_min_format,
-          client->burst_min_value, &max_idx, client->burst_max_format,
-          client->burst_max_value);
-
-      GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
-
-      /* first find a keyframe after min_idx */
-      next_syncframe = find_next_syncframe (mhsink, min_idx);
-      if (next_syncframe != -1 && next_syncframe < max_idx) {
-        /* we have a valid keyframe and it's below the max */
-        GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
-        result = next_syncframe;
-        break;
-      }
-
-      /* no keyframe, send data from min_idx */
-      GST_WARNING_OBJECT (sink, "using min in BURST_WITH_KEYFRAME sync mode");
-
-      /* make sure we don't go over the max limit */
-      if (max_idx != -1 && max_idx <= min_idx) {
-        result = MAX (max_idx - 1, 0);
-      } else {
-        result = min_idx;
-      }
-
-      break;
-    }
-    default:
-      g_warning ("unknown sync method %d", mhclient->sync_method);
-      result = mhclient->bufpos;
-      break;
-  }
-  return result;
-}
-
 /* Handle a write on a client,
  * which indicates a read request from a client.
  *
@@ -1565,7 +1088,9 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
       if (mhclient->bufpos == -1) {
         /* client is too fast, remove from write queue until new buffer is
          * available */
+        // FIXME: specific
         gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE);
+        //
         /* if we flushed out all of the client buffers, we can stop */
         if (mhclient->flushcount == 0)
           goto flushed;
@@ -1579,7 +1104,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
         /* for new connections, we need to find a good spot in the
          * bufqueue to start streaming from */
         if (mhclient->new_connection && !flushing) {
-          gint position = gst_multi_fd_sink_new_client (sink, client);
+          gint position = gst_multi_handle_sink_new_client (mhsink, mhclient);
 
           if (position >= 0) {
             /* we got a valid spot in the queue */
@@ -1587,6 +1112,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
             mhclient->bufpos = position;
           } else {
             /* cannot send data to this client yet */
+            // FIXME: specific
             gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE);
             return TRUE;
           }
@@ -1636,6 +1162,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
       data = info.data;
       maxsize = info.size - mhclient->bufoffset;
 
+      // FIXME: specific
       /* try to write the complete buffer */
 #ifdef MSG_NOSIGNAL
 #define FLAGS MSG_NOSIGNAL
@@ -1707,60 +1234,6 @@ write_error:
   }
 }
 
-/* calculate the new position for a client after recovery. This function
- * does not update the client position but merely returns the required
- * position.
- */
-static gint
-gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
-{
-  gint newbufpos;
-  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-  GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
-
-  GST_WARNING_OBJECT (sink,
-      "[fd %5d] client %p is lagging at %d, recover using policy %d",
-      client->fd.fd, client, mhclient->bufpos, mhsink->recover_policy);
-
-  switch (mhsink->recover_policy) {
-    case GST_RECOVER_POLICY_NONE:
-      /* do nothing, client will catch up or get kicked out when it reaches
-       * the hard max */
-      newbufpos = mhclient->bufpos;
-      break;
-    case GST_RECOVER_POLICY_RESYNC_LATEST:
-      /* move to beginning of queue */
-      newbufpos = -1;
-      break;
-    case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
-      /* move to beginning of soft max */
-      newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
-      break;
-    case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
-      /* find keyframe in buffers, we search backwards to find the
-       * closest keyframe relative to what this client already received. */
-      newbufpos = MIN (mhsink->bufqueue->len - 1,
-          get_buffers_max (sink, mhsink->units_soft_max) - 1);
-
-      while (newbufpos >= 0) {
-        GstBuffer *buf;
-
-        buf = g_array_index (mhsink->bufqueue, GstBuffer *, newbufpos);
-        if (is_sync_frame (mhsink, buf)) {
-          /* found a buffer that is not a delta unit */
-          break;
-        }
-        newbufpos--;
-      }
-      break;
-    default:
-      /* unknown recovery procedure */
-      newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
-      break;
-  }
-  return newbufpos;
-}
-
 /* Queue a buffer on the global queue.
  *
  * This function adds the buffer to the front of a GArray. It removes the
@@ -1804,12 +1277,12 @@ gst_multi_fd_sink_queue_buffer (GstMultiHandleSink * mhsink, GstBuffer * buffer)
   queuelen = mhsink->bufqueue->len;
 
   if (mhsink->units_max > 0)
-    max_buffers = get_buffers_max (sink, mhsink->units_max);
+    max_buffers = get_buffers_max (mhsink, mhsink->units_max);
   else
     max_buffers = -1;
 
   if (mhsink->units_soft_max > 0)
-    soft_max_buffers = get_buffers_max (sink, mhsink->units_soft_max);
+    soft_max_buffers = get_buffers_max (mhsink, mhsink->units_soft_max);
   else
     soft_max_buffers = -1;
   GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers,
@@ -1840,7 +1313,7 @@ restart:
     if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) {
       gint newpos;
 
-      newpos = gst_multi_fd_sink_recover_client (sink, client);
+      newpos = gst_multi_handle_sink_recover_client (mhsink, mhclient);
       if (newpos != mhclient->bufpos) {
         mhclient->dropped_buffers += mhclient->bufpos - newpos;
         mhclient->bufpos = newpos;
@@ -1893,7 +1366,7 @@ restart:
     /* get index where the limits are ok, we don't really care if all limits
      * are ok, we just queue as much as we need. We also don't compare against
      * the max limits. */
-    find_limits (sink, &usage, mhsink->bytes_min, mhsink->buffers_min,
+    find_limits (mhsink, &usage, mhsink->bytes_min, mhsink->buffers_min,
         mhsink->time_min, &max, -1, -1, -1);
 
     max_buffer_usage = MAX (max_buffer_usage, usage + 1);
@@ -2156,15 +1629,6 @@ gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
     case PROP_MODE:
       multifdsink->mode = g_value_get_enum (value);
       break;
-    case PROP_UNIT_FORMAT:
-      multifdsink->unit_format = g_value_get_enum (value);
-      break;
-    case PROP_BURST_FORMAT:
-      multifdsink->def_burst_format = g_value_get_enum (value);
-      break;
-    case PROP_BURST_VALUE:
-      multifdsink->def_burst_value = g_value_get_uint64 (value);
-      break;
     case PROP_HANDLE_READ:
       multifdsink->handle_read = g_value_get_boolean (value);
       break;
@@ -2187,15 +1651,6 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_MODE:
       g_value_set_enum (value, multifdsink->mode);
       break;
-    case PROP_UNIT_FORMAT:
-      g_value_set_enum (value, multifdsink->unit_format);
-      break;
-    case PROP_BURST_FORMAT:
-      g_value_set_enum (value, multifdsink->def_burst_format);
-      break;
-    case PROP_BURST_VALUE:
-      g_value_set_uint64 (value, multifdsink->def_burst_value);
-      break;
     case PROP_HANDLE_READ:
       g_value_set_boolean (value, multifdsink->handle_read);
       break;
index 89ec423..d589539 100644 (file)
@@ -47,23 +47,6 @@ typedef struct _GstMultiFdSink GstMultiFdSink;
 typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass;
 
 
-/**
- * GstTCPUnitType:
- * @GST_TCP_UNIT_FORMAT_UNDEFINED: undefined
- * @GST_TCP_UNIT_FORMAT_BUFFERS  : buffers
- * @GST_TCP_UNIT_FORMAT_TIME     : timeunits (in nanoseconds)
- * @GST_TCP_UNIT_FORMAT_BYTES    : bytes
- *
- * The units used to specify limits.
- */
-typedef enum
-{
-  GST_TCP_UNIT_FORMAT_UNDEFINED,
-  GST_TCP_UNIT_FORMAT_BUFFERS,
-  GST_TCP_UNIT_FORMAT_TIME,
-  GST_TCP_UNIT_FORMAT_BYTES,
-} GstTCPUnitType;
-
 /* structure for a client
  */
 typedef struct {
@@ -72,15 +55,6 @@ typedef struct {
   GstPollFD fd;
 
   gboolean is_socket;
-
-  gboolean caps_sent;
-
-  /* method to sync client when connecting */
-  GstSyncMethod sync_method;
-  GstFormat   burst_min_format;
-  guint64          burst_min_value;
-  GstFormat   burst_max_format;
-  guint64          burst_max_value;
 } GstTCPClient;
 
 /**
@@ -99,13 +73,6 @@ struct _GstMultiFdSink {
 
   gboolean handle_read;
 
-  /* these values are used to check if a client is reading fast
-   * enough and to control receovery */
-  GstFormat unit_format;/* the type of the units */
-
-  GstFormat   def_burst_format;
-  guint64       def_burst_value;
-
   guint8 header_flags;
 };
 
index bb0ffcb..700f8fa 100644 (file)
@@ -170,9 +170,7 @@ enum
   PROP_BYTES_QUEUED,
   PROP_TIME_QUEUED,
 
-#if 0
   PROP_UNIT_FORMAT,
-#endif
   PROP_UNITS_MAX,
   PROP_UNITS_SOFT_MAX,
 
@@ -189,10 +187,8 @@ enum
   PROP_BYTES_TO_SERVE,
   PROP_BYTES_SERVED,
 
-#if 0
   PROP_BURST_FORMAT,
   PROP_BURST_VALUE,
-#endif
 
   PROP_QOS_DSCP,
 
@@ -420,7 +416,6 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
           "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 
-#if 0
   g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
       g_param_spec_enum ("burst-format", "Burst format",
           "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
@@ -430,7 +425,6 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
       g_param_spec_uint64 ("burst-value", "Burst value",
           "The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
           DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-#endif
 
   g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
       g_param_spec_int ("qos-dscp", "QoS diff srv code point",
@@ -678,10 +672,8 @@ gst_multi_handle_sink_init (GstMultiHandleSink * this)
   this->timeout = DEFAULT_TIMEOUT;
   this->def_sync_method = DEFAULT_SYNC_METHOD;
 
-#if 0
   this->def_burst_format = DEFAULT_BURST_FORMAT;
   this->def_burst_value = DEFAULT_BURST_VALUE;
-#endif
 
   this->qos_dscp = DEFAULT_QOS_DSCP;
 
@@ -1407,13 +1399,11 @@ find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction)
   return result;
 }
 
-
-#if 0
 /* Get the number of buffers from the buffer queue needed to satisfy
  * the maximum max in the configured units.
  * If units are not BUFFERS, and there are insufficient buffers in the
  * queue to satify the limit, return len(queue) + 1 */
-static gint
+gint
 get_buffers_max (GstMultiHandleSink * sink, gint64 max)
 {
   switch (sink->unit_format) {
@@ -1465,9 +1455,7 @@ get_buffers_max (GstMultiHandleSink * sink, gint64 max)
       return max;
   }
 }
-#endif
 
-#if 0
 /* find the positions in the buffer queue where *_min and *_max
  * is satisfied
  */
@@ -1480,7 +1468,7 @@ get_buffers_max (GstMultiHandleSink * sink, gint64 max)
  *
  * FIXME, this code might now work if any of the units is in buffers...
  */
-static gboolean
+gboolean
 find_limits (GstMultiHandleSink * sink,
     gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
     gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max)
@@ -1611,9 +1599,7 @@ assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers,
   }
   return res;
 }
-#endif
 
-#if 0
 /* count the index in the buffer queue to satisfy the given unit
  * and value pair starting from buffer at index 0.
  *
@@ -1637,9 +1623,7 @@ count_burst_unit (GstMultiHandleSink * sink, gint * min_idx,
   return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
       max_idx, bytes_max, buffers_max, time_max);
 }
-#endif
 
-#if 0
 /* decide where in the current buffer queue this new client should start
  * receiving buffers from.
  * This function is called whenever a client is connected and has not yet
@@ -1648,15 +1632,14 @@ count_burst_unit (GstMultiHandleSink * sink, gint * min_idx,
  * start streaming from yet, and this function should be called again later
  * when more buffers have arrived.
  */
-static gint
+gint
 gst_multi_handle_sink_new_client (GstMultiHandleSink * sink,
-    GstSocketClient * client)
+    GstMultiHandleClient * client)
 {
   gint result;
 
   GST_DEBUG_OBJECT (sink,
-      "[socket %p] new client, deciding where to start in queue",
-      client->socket);
+      "%s new client, deciding where to start in queue", client->debug);
   GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long",
       sink->bufqueue->len);
   switch (client->sync_method) {
@@ -1664,37 +1647,34 @@ gst_multi_handle_sink_new_client (GstMultiHandleSink * sink,
       /* no syncing, we are happy with whatever the client is going to get */
       result = client->bufpos;
       GST_DEBUG_OBJECT (sink,
-          "[socket %p] SYNC_METHOD_LATEST, position %d", client->socket,
-          result);
+          "%s SYNC_METHOD_LATEST, position %d", client->debug, result);
       break;
     case GST_SYNC_METHOD_NEXT_KEYFRAME:
     {
       /* if one of the new buffers (between client->bufpos and 0) in the queue
        * is a sync point, we can proceed, otherwise we need to keep waiting */
       GST_LOG_OBJECT (sink,
-          "[socket %p] new client, bufpos %d, waiting for keyframe",
-          client->socket, client->bufpos);
+          "%s new client, bufpos %d, waiting for keyframe",
+          client->debug, client->bufpos);
 
       result = find_prev_syncframe (sink, client->bufpos);
       if (result != -1) {
         GST_DEBUG_OBJECT (sink,
-            "[socket %p] SYNC_METHOD_NEXT_KEYFRAME: result %d",
-            client->socket, result);
+            "%s SYNC_METHOD_NEXT_KEYFRAME: result %d", client->debug, result);
         break;
       }
 
       /* client is not on a syncbuffer, need to skip these buffers and
        * wait some more */
       GST_LOG_OBJECT (sink,
-          "[socket %p] new client, skipping buffer(s), no syncpoint found",
-          client->socket);
+          "%s new client, skipping buffer(s), no syncpoint found",
+          client->debug);
       client->bufpos = -1;
       break;
     }
     case GST_SYNC_METHOD_LATEST_KEYFRAME:
     {
-      GST_DEBUG_OBJECT (sink,
-          "[socket %p] SYNC_METHOD_LATEST_KEYFRAME", client->socket);
+      GST_DEBUG_OBJECT (sink, "%s SYNC_METHOD_LATEST_KEYFRAME", client->debug);
 
       /* for new clients we initially scan the complete buffer queue for
        * a sync point when a buffer is added. If we don't find a keyframe,
@@ -1704,14 +1684,13 @@ gst_multi_handle_sink_new_client (GstMultiHandleSink * sink,
       result = find_next_syncframe (sink, 0);
       if (result != -1) {
         GST_DEBUG_OBJECT (sink,
-            "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: result %d",
-            client->socket, result);
+            "%s SYNC_METHOD_LATEST_KEYFRAME: result %d", client->debug, result);
         break;
       }
 
       GST_DEBUG_OBJECT (sink,
-          "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, "
-          "switching to SYNC_METHOD_NEXT_KEYFRAME", client->socket);
+          "%s SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, "
+          "switching to SYNC_METHOD_NEXT_KEYFRAME", client->debug);
       /* throw client to the waiting state */
       client->bufpos = -1;
       /* and make client sync to next keyframe */
@@ -1732,8 +1711,8 @@ gst_multi_handle_sink_new_client (GstMultiHandleSink * sink,
           client->burst_min_value, &max, client->burst_max_format,
           client->burst_max_value);
       GST_DEBUG_OBJECT (sink,
-          "[socket %p] SYNC_METHOD_BURST: burst_unit returned %d, result %d",
-          client->socket, ok, result);
+          "%s SYNC_METHOD_BURST: burst_unit returned %d, result %d",
+          client->debug, ok, result);
 
       GST_LOG_OBJECT (sink, "min %d, max %d", result, max);
 
@@ -1741,8 +1720,8 @@ gst_multi_handle_sink_new_client (GstMultiHandleSink * sink,
       if (max != -1 && max <= result) {
         result = MAX (max - 1, 0);
         GST_DEBUG_OBJECT (sink,
-            "[socket %p] SYNC_METHOD_BURST: result above max, taken down to %d",
-            client->socket, result);
+            "%s SYNC_METHOD_BURST: result above max, taken down to %d",
+            client->debug, result);
       }
       break;
     }
@@ -1840,7 +1819,6 @@ gst_multi_handle_sink_new_client (GstMultiHandleSink * sink,
   }
   return result;
 }
-#endif
 
 #if 0
 /* Handle a write on a client,
@@ -2034,20 +2012,19 @@ write_error:
 }
 #endif
 
-#if 0
 /* calculate the new position for a client after recovery. This function
  * does not update the client position but merely returns the required
  * position.
  */
-static gint
+gint
 gst_multi_handle_sink_recover_client (GstMultiHandleSink * sink,
-    GstSocketClient * client)
+    GstMultiHandleClient * client)
 {
   gint newbufpos;
 
   GST_WARNING_OBJECT (sink,
-      "[socket %p] client %p is lagging at %d, recover using policy %d",
-      client->socket, client, client->bufpos, sink->recover_policy);
+      "%s client %p is lagging at %d, recover using policy %d",
+      client->debug, client, client->bufpos, sink->recover_policy);
 
   switch (sink->recover_policy) {
     case GST_RECOVER_POLICY_NONE:
@@ -2087,7 +2064,6 @@ gst_multi_handle_sink_recover_client (GstMultiHandleSink * sink,
   }
   return newbufpos;
 }
-#endif
 
 #if 0
 /* Queue a buffer on the global queue.
@@ -2536,7 +2512,6 @@ gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
     case PROP_BUFFERS_MIN:
       multihandlesink->buffers_min = g_value_get_int (value);
       break;
-#if 0
     case PROP_UNIT_FORMAT:
       multihandlesink->unit_format = g_value_get_enum (value);
       break;
@@ -2546,7 +2521,6 @@ gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
     case PROP_UNITS_SOFT_MAX:
       multihandlesink->units_soft_max = g_value_get_int64 (value);
       break;
-#endif
     case PROP_RECOVER_POLICY:
       multihandlesink->recover_policy = g_value_get_enum (value);
       break;
@@ -2556,14 +2530,12 @@ gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
     case PROP_SYNC_METHOD:
       multihandlesink->def_sync_method = g_value_get_enum (value);
       break;
-#if 0
     case PROP_BURST_FORMAT:
       multihandlesink->def_burst_format = g_value_get_enum (value);
       break;
     case PROP_BURST_VALUE:
       multihandlesink->def_burst_value = g_value_get_uint64 (value);
       break;
-#endif
     case PROP_QOS_DSCP:
       multihandlesink->qos_dscp = g_value_get_int (value);
       gst_multi_handle_sink_setup_dscp (multihandlesink);
@@ -2612,7 +2584,6 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
     case PROP_TIME_QUEUED:
       g_value_set_uint64 (value, multihandlesink->time_queued);
       break;
-#if 0
     case PROP_UNIT_FORMAT:
       g_value_set_enum (value, multihandlesink->unit_format);
       break;
@@ -2622,7 +2593,6 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
     case PROP_UNITS_SOFT_MAX:
       g_value_set_int64 (value, multihandlesink->units_soft_max);
       break;
-#endif
     case PROP_RECOVER_POLICY:
       g_value_set_enum (value, multihandlesink->recover_policy);
       break;
@@ -2638,14 +2608,12 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
     case PROP_BYTES_SERVED:
       g_value_set_uint64 (value, multihandlesink->bytes_served);
       break;
-#if 0
     case PROP_BURST_FORMAT:
       g_value_set_enum (value, multihandlesink->def_burst_format);
       break;
     case PROP_BURST_VALUE:
       g_value_set_uint64 (value, multihandlesink->def_burst_value);
       break;
-#endif
     case PROP_QOS_DSCP:
       g_value_set_int (value, multihandlesink->qos_dscp);
       break;
index cbd86c7..ba4e23b 100644 (file)
@@ -142,13 +142,10 @@ typedef struct {
 
   /* method to sync client when connecting */
   GstSyncMethod sync_method;
-// FIXME: refactor format vs unit
-#if 0
   GstFormat     burst_min_format;
   guint64       burst_min_value;
   GstFormat     burst_max_format;
   guint64       burst_max_value;
-#endif
 
   GstCaps *caps;                /* caps of last queued buffer */
 
@@ -179,6 +176,18 @@ gboolean gst_multi_handle_sink_stop (GstBaseSink * bsink);
 gboolean gst_multi_handle_sink_start (GstBaseSink * bsink);
 void gst_multi_handle_sink_setup_dscp (GstMultiHandleSink * mhsink);
 gint gst_multi_handle_sink_setup_dscp_client (GstMultiHandleSink * sink, GstMultiHandleClient * client);
+gint get_buffers_max (GstMultiHandleSink * sink, gint64 max);
+gint
+gst_multi_handle_sink_recover_client (GstMultiHandleSink * sink,
+    GstMultiHandleClient * client);
+gint
+gst_multi_handle_sink_new_client (GstMultiHandleSink * sink,
+    GstMultiHandleClient * client);
+gboolean
+find_limits (GstMultiHandleSink * sink,
+    gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
+    gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max);
+
 
 /**
  * GstMultiHandleSink:
index f7b1840..831ffdf 100644 (file)
@@ -136,22 +136,10 @@ enum
   LAST_SIGNAL
 };
 
-
-/* this is really arbitrarily chosen */
-#define DEFAULT_UNIT_FORMAT               GST_FORMAT_BUFFERS
-
-#define DEFAULT_BURST_FORMAT            GST_FORMAT_UNDEFINED
-#define DEFAULT_BURST_VALUE             0
-
 enum
 {
   PROP_0,
 
-  PROP_UNIT_FORMAT,
-
-  PROP_BURST_FORMAT,
-  PROP_BURST_VALUE,
-
   PROP_NUM_SOCKETS,
 
   PROP_LAST
@@ -206,22 +194,6 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
   gobject_class->get_property = gst_multi_socket_sink_get_property;
   gobject_class->finalize = gst_multi_socket_sink_finalize;
 
-  g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT,
-      g_param_spec_enum ("unit-format", "Units format",
-          "The unit to measure the max/soft-max/queued properties",
-          GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
-  g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
-      g_param_spec_enum ("burst-format", "Burst format",
-          "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
-          GST_TYPE_FORMAT, DEFAULT_BURST_FORMAT,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
-      g_param_spec_uint64 ("burst-value", "Burst value",
-          "The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
-          DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
   g_object_class_install_property (gobject_class, PROP_NUM_SOCKETS,
       g_param_spec_uint ("num-sockets", "Number of sockets",
           "The current number of client sockets",
@@ -405,11 +377,6 @@ gst_multi_socket_sink_init (GstMultiSocketSink * this)
 {
   this->socket_hash = g_hash_table_new (g_direct_hash, g_int_equal);
 
-  this->unit_format = DEFAULT_UNIT_FORMAT;
-
-  this->def_burst_format = DEFAULT_BURST_FORMAT;
-  this->def_burst_value = DEFAULT_BURST_VALUE;
-
   this->cancellable = g_cancellable_new ();
 }
 
@@ -464,10 +431,10 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
   g_snprintf (mhclient->debug, 30, "[socket %p]", socket);
   client->socket = G_SOCKET (g_object_ref (socket));
 
-  client->burst_min_format = min_format;
-  client->burst_min_value = min_value;
-  client->burst_max_format = max_format;
-  client->burst_max_value = max_value;
+  mhclient->burst_min_format = min_format;
+  mhclient->burst_min_value = min_value;
+  mhclient->burst_max_format = max_format;
+  mhclient->burst_max_value = max_value;
 
   CLIENTS_LOCK (sink);
 
@@ -535,8 +502,8 @@ gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket)
 
   mhsink = GST_MULTI_HANDLE_SINK (sink);
   gst_multi_socket_sink_add_full (sink, socket, mhsink->def_sync_method,
-      sink->def_burst_format, sink->def_burst_value, sink->def_burst_format,
-      -1);
+      mhsink->def_burst_format, mhsink->def_burst_value,
+      mhsink->def_burst_format, -1);
 }
 
 /* "remove" signal implementation */
@@ -957,437 +924,6 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
   return TRUE;
 }
 
-/* Get the number of buffers from the buffer queue needed to satisfy
- * the maximum max in the configured units.
- * If units are not BUFFERS, and there are insufficient buffers in the
- * queue to satify the limit, return len(queue) + 1 */
-static gint
-get_buffers_max (GstMultiSocketSink * sink, gint64 max)
-{
-  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-
-  switch (sink->unit_format) {
-    case GST_FORMAT_BUFFERS:
-      return max;
-    case GST_FORMAT_TIME:
-    {
-      GstBuffer *buf;
-      int i;
-      int len;
-      gint64 diff;
-      GstClockTime first = GST_CLOCK_TIME_NONE;
-
-      len = mhsink->bufqueue->len;
-
-      for (i = 0; i < len; i++) {
-        buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
-        if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
-          if (first == -1)
-            first = GST_BUFFER_TIMESTAMP (buf);
-
-          diff = first - GST_BUFFER_TIMESTAMP (buf);
-
-          if (diff > max)
-            return i + 1;
-        }
-      }
-      return len + 1;
-    }
-    case GST_FORMAT_BYTES:
-    {
-      GstBuffer *buf;
-      int i;
-      int len;
-      gint acc = 0;
-
-      len = mhsink->bufqueue->len;
-
-      for (i = 0; i < len; i++) {
-        buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
-        acc += gst_buffer_get_size (buf);
-
-        if (acc > max)
-          return i + 1;
-      }
-      return len + 1;
-    }
-    default:
-      return max;
-  }
-}
-
-/* find the positions in the buffer queue where *_min and *_max
- * is satisfied
- */
-/* count the amount of data in the buffers and return the index
- * that satifies the given limits.
- *
- * Returns: index @idx in the buffer queue so that the given limits are
- * satisfied. TRUE if all the limits could be satisfied, FALSE if not
- * enough data was in the queue.
- *
- * FIXME, this code might now work if any of the units is in buffers...
- */
-static gboolean
-find_limits (GstMultiSocketSink * sink,
-    gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
-    gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max)
-{
-  GstClockTime first, time;
-  gint i, len, bytes;
-  gboolean result, max_hit;
-  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-
-  /* take length of queue */
-  len = mhsink->bufqueue->len;
-
-  /* this must hold */
-  g_assert (len > 0);
-
-  GST_LOG_OBJECT (sink,
-      "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT
-      ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min,
-      buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max,
-      GST_TIME_ARGS (time_max));
-
-  /* do the trivial buffer limit test */
-  if (buffers_min != -1 && len < buffers_min) {
-    *min_idx = len - 1;
-    *max_idx = len - 1;
-    return FALSE;
-  }
-
-  result = FALSE;
-  /* else count bytes and time */
-  first = -1;
-  bytes = 0;
-  /* unset limits */
-  *min_idx = -1;
-  *max_idx = -1;
-  max_hit = FALSE;
-
-  i = 0;
-  /* loop through the buffers, when a limit is ok, mark it 
-   * as -1, we have at least one buffer in the queue. */
-  do {
-    GstBuffer *buf;
-
-    /* if we checked all min limits, update result */
-    if (bytes_min == -1 && time_min == -1 && *min_idx == -1) {
-      /* don't go below 0 */
-      *min_idx = MAX (i - 1, 0);
-    }
-    /* if we reached one max limit break out */
-    if (max_hit) {
-      /* i > 0 when we get here, we subtract one to get the position
-       * of the previous buffer. */
-      *max_idx = i - 1;
-      /* we have valid complete result if we found a min_idx too */
-      result = *min_idx != -1;
-      break;
-    }
-    buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
-
-    bytes += gst_buffer_get_size (buf);
-
-    /* take timestamp and save for the base first timestamp */
-    if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) {
-      GST_LOG_OBJECT (sink, "Ts %" GST_TIME_FORMAT " on buffer",
-          GST_TIME_ARGS (time));
-      if (first == -1)
-        first = time;
-
-      /* increase max usage if we did not fill enough. Note that
-       * buffers are sorted from new to old, so the first timestamp is
-       * bigger than the next one. */
-      if (time_min != -1 && first - time >= time_min)
-        time_min = -1;
-      if (time_max != -1 && first - time >= time_max)
-        max_hit = TRUE;
-    } else {
-      GST_LOG_OBJECT (sink, "No timestamp on buffer");
-    }
-    /* time is OK or unknown, check and increase if not enough bytes */
-    if (bytes_min != -1) {
-      if (bytes >= bytes_min)
-        bytes_min = -1;
-    }
-    if (bytes_max != -1) {
-      if (bytes >= bytes_max) {
-        max_hit = TRUE;
-      }
-    }
-    i++;
-  }
-  while (i < len);
-
-  /* if we did not hit the max or min limit, set to buffer size */
-  if (*max_idx == -1)
-    *max_idx = len - 1;
-  /* make sure min does not exceed max */
-  if (*min_idx == -1)
-    *min_idx = *max_idx;
-
-  return result;
-}
-
-/* parse the unit/value pair and assign it to the result value of the
- * right type, leave the other values untouched 
- *
- * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
- */
-static gboolean
-assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers,
-    GstClockTime * time)
-{
-  gboolean res = TRUE;
-
-  /* set only the limit of the given format to the given value */
-  switch (format) {
-    case GST_FORMAT_BUFFERS:
-      *buffers = (gint) value;
-      break;
-    case GST_FORMAT_TIME:
-      *time = value;
-      break;
-    case GST_FORMAT_BYTES:
-      *bytes = (gint) value;
-      break;
-    case GST_FORMAT_UNDEFINED:
-    default:
-      res = FALSE;
-      break;
-  }
-  return res;
-}
-
-/* count the index in the buffer queue to satisfy the given unit
- * and value pair starting from buffer at index 0.
- *
- * Returns: TRUE if there was enough data in the queue to satisfy the
- * burst values. @idx contains the index in the buffer that contains enough
- * data to satisfy the limits or the last buffer in the queue when the
- * function returns FALSE.
- */
-static gboolean
-count_burst_unit (GstMultiSocketSink * sink, gint * min_idx,
-    GstFormat min_format, guint64 min_value, gint * max_idx,
-    GstFormat max_format, guint64 max_value)
-{
-  gint bytes_min = -1, buffers_min = -1;
-  gint bytes_max = -1, buffers_max = -1;
-  GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE;
-
-  assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min);
-  assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max);
-
-  return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
-      max_idx, bytes_max, buffers_max, time_max);
-}
-
-/* decide where in the current buffer queue this new client should start
- * receiving buffers from.
- * This function is called whenever a client is connected and has not yet
- * received a buffer.
- * If this returns -1, it means that we haven't found a good point to
- * start streaming from yet, and this function should be called again later
- * when more buffers have arrived.
- */
-static gint
-gst_multi_socket_sink_new_client (GstMultiSocketSink * sink,
-    GstSocketClient * client)
-{
-  gint result;
-  GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
-  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-
-  GST_DEBUG_OBJECT (sink,
-      "[socket %p] new client, deciding where to start in queue",
-      client->socket);
-  GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long",
-      mhsink->bufqueue->len);
-  switch (mhclient->sync_method) {
-    case GST_SYNC_METHOD_LATEST:
-      /* no syncing, we are happy with whatever the client is going to get */
-      result = mhclient->bufpos;
-      GST_DEBUG_OBJECT (sink,
-          "[socket %p] SYNC_METHOD_LATEST, position %d", client->socket,
-          result);
-      break;
-    case GST_SYNC_METHOD_NEXT_KEYFRAME:
-    {
-      /* if one of the new buffers (between mhclient->bufpos and 0) in the queue
-       * is a sync point, we can proceed, otherwise we need to keep waiting */
-      GST_LOG_OBJECT (sink,
-          "[socket %p] new client, bufpos %d, waiting for keyframe",
-          client->socket, mhclient->bufpos);
-
-      result = find_prev_syncframe (mhsink, mhclient->bufpos);
-      if (result != -1) {
-        GST_DEBUG_OBJECT (sink,
-            "[socket %p] SYNC_METHOD_NEXT_KEYFRAME: result %d",
-            client->socket, result);
-        break;
-      }
-
-      /* client is not on a syncbuffer, need to skip these buffers and
-       * wait some more */
-      GST_LOG_OBJECT (sink,
-          "[socket %p] new client, skipping buffer(s), no syncpoint found",
-          client->socket);
-      mhclient->bufpos = -1;
-      break;
-    }
-    case GST_SYNC_METHOD_LATEST_KEYFRAME:
-    {
-      GST_DEBUG_OBJECT (sink,
-          "[socket %p] SYNC_METHOD_LATEST_KEYFRAME", client->socket);
-
-      /* for new clients we initially scan the complete buffer queue for
-       * a sync point when a buffer is added. If we don't find a keyframe,
-       * we need to wait for the next keyframe and so we change the client's
-       * sync method to GST_SYNC_METHOD_NEXT_KEYFRAME.
-       */
-      result = find_next_syncframe (mhsink, 0);
-      if (result != -1) {
-        GST_DEBUG_OBJECT (sink,
-            "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: result %d",
-            client->socket, result);
-        break;
-      }
-
-      GST_DEBUG_OBJECT (sink,
-          "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, "
-          "switching to SYNC_METHOD_NEXT_KEYFRAME", client->socket);
-      /* throw client to the waiting state */
-      mhclient->bufpos = -1;
-      /* and make client sync to next keyframe */
-      mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
-      break;
-    }
-    case GST_SYNC_METHOD_BURST:
-    {
-      gboolean ok;
-      gint max;
-
-      /* move to the position where we satisfy the client's burst
-       * parameters. If we could not satisfy the parameters because there
-       * is not enough data, we just send what we have (which is in result).
-       * We use the max value to limit the search
-       */
-      ok = count_burst_unit (sink, &result, client->burst_min_format,
-          client->burst_min_value, &max, client->burst_max_format,
-          client->burst_max_value);
-      GST_DEBUG_OBJECT (sink,
-          "[socket %p] SYNC_METHOD_BURST: burst_unit returned %d, result %d",
-          client->socket, ok, result);
-
-      GST_LOG_OBJECT (sink, "min %d, max %d", result, max);
-
-      /* we hit the max and it is below the min, use that then */
-      if (max != -1 && max <= result) {
-        result = MAX (max - 1, 0);
-        GST_DEBUG_OBJECT (sink,
-            "[socket %p] SYNC_METHOD_BURST: result above max, taken down to %d",
-            client->socket, result);
-      }
-      break;
-    }
-    case GST_SYNC_METHOD_BURST_KEYFRAME:
-    {
-      gint min_idx, max_idx;
-      gint next_syncframe, prev_syncframe;
-
-      /* BURST_KEYFRAME:
-       *
-       * _always_ start sending a keyframe to the client. We first search
-       * a keyframe between min/max limits. If there is none, we send it the
-       * last keyframe before min. If there is none, the behaviour is like
-       * NEXT_KEYFRAME.
-       */
-      /* gather burst limits */
-      count_burst_unit (sink, &min_idx, client->burst_min_format,
-          client->burst_min_value, &max_idx, client->burst_max_format,
-          client->burst_max_value);
-
-      GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
-
-      /* first find a keyframe after min_idx */
-      next_syncframe = find_next_syncframe (mhsink, min_idx);
-      if (next_syncframe != -1 && next_syncframe < max_idx) {
-        /* we have a valid keyframe and it's below the max */
-        GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
-        result = next_syncframe;
-        break;
-      }
-
-      /* no valid keyframe, try to find one below min */
-      prev_syncframe = find_prev_syncframe (mhsink, min_idx);
-      if (prev_syncframe != -1) {
-        GST_WARNING_OBJECT (sink,
-            "using keyframe below min in BURST_KEYFRAME sync mode");
-        result = prev_syncframe;
-        break;
-      }
-
-      /* no prev keyframe or not enough data  */
-      GST_WARNING_OBJECT (sink,
-          "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next");
-
-      /* throw client to the waiting state */
-      mhclient->bufpos = -1;
-      /* and make client sync to next keyframe */
-      mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
-      result = -1;
-      break;
-    }
-    case GST_SYNC_METHOD_BURST_WITH_KEYFRAME:
-    {
-      gint min_idx, max_idx;
-      gint next_syncframe;
-
-      /* BURST_WITH_KEYFRAME:
-       *
-       * try to start sending a keyframe to the client. We first search
-       * a keyframe between min/max limits. If there is none, we send it the
-       * amount of data up 'till min.
-       */
-      /* gather enough data to burst */
-      count_burst_unit (sink, &min_idx, client->burst_min_format,
-          client->burst_min_value, &max_idx, client->burst_max_format,
-          client->burst_max_value);
-
-      GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
-
-      /* first find a keyframe after min_idx */
-      next_syncframe = find_next_syncframe (mhsink, min_idx);
-      if (next_syncframe != -1 && next_syncframe < max_idx) {
-        /* we have a valid keyframe and it's below the max */
-        GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
-        result = next_syncframe;
-        break;
-      }
-
-      /* no keyframe, send data from min_idx */
-      GST_WARNING_OBJECT (sink, "using min in BURST_WITH_KEYFRAME sync mode");
-
-      /* make sure we don't go over the max limit */
-      if (max_idx != -1 && max_idx <= min_idx) {
-        result = MAX (max_idx - 1, 0);
-      } else {
-        result = min_idx;
-      }
-
-      break;
-    }
-    default:
-      g_warning ("unknown sync method %d", mhclient->sync_method);
-      result = mhclient->bufpos;
-      break;
-  }
-  return result;
-}
-
 /* Handle a write on a client,
  * which indicates a read request from a client.
  *
@@ -1440,11 +976,13 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
       if (mhclient->bufpos == -1) {
         /* client is too fast, remove from write queue until new buffer is
          * available */
+        // FIXME: specific
         if (client->source) {
           g_source_destroy (client->source);
           g_source_unref (client->source);
           client->source = NULL;
         }
+        //
         /* if we flushed out all of the client buffers, we can stop */
         if (mhclient->flushcount == 0)
           goto flushed;
@@ -1458,7 +996,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
         /* for new connections, we need to find a good spot in the
          * bufqueue to start streaming from */
         if (mhclient->new_connection && !flushing) {
-          gint position = gst_multi_socket_sink_new_client (sink, client);
+          gint position = gst_multi_handle_sink_new_client (mhsink, mhclient);
 
           if (position >= 0) {
             /* we got a valid spot in the queue */
@@ -1466,11 +1004,13 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
             mhclient->bufpos = position;
           } else {
             /* cannot send data to this client yet */
+            // FIXME: specific
             if (client->source) {
               g_source_destroy (client->source);
               g_source_unref (client->source);
               client->source = NULL;
             }
+            //
             return TRUE;
           }
         }
@@ -1517,6 +1057,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
       gst_buffer_map (head, &map, GST_MAP_READ);
       maxsize = map.size - mhclient->bufoffset;
 
+      // FIXME: specific
       /* try to write the complete buffer */
 
       wrote =
@@ -1583,61 +1124,6 @@ write_error:
   }
 }
 
-/* calculate the new position for a client after recovery. This function
- * does not update the client position but merely returns the required
- * position.
- */
-static gint
-gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink,
-    GstSocketClient * client)
-{
-  gint newbufpos;
-  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-  GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
-
-  GST_WARNING_OBJECT (sink,
-      "[socket %p] client %p is lagging at %d, recover using policy %d",
-      client->socket, client, mhclient->bufpos, mhsink->recover_policy);
-
-  switch (mhsink->recover_policy) {
-    case GST_RECOVER_POLICY_NONE:
-      /* do nothing, client will catch up or get kicked out when it reaches
-       * the hard max */
-      newbufpos = mhclient->bufpos;
-      break;
-    case GST_RECOVER_POLICY_RESYNC_LATEST:
-      /* move to beginning of queue */
-      newbufpos = -1;
-      break;
-    case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
-      /* move to beginning of soft max */
-      newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
-      break;
-    case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
-      /* find keyframe in buffers, we search backwards to find the
-       * closest keyframe relative to what this client already received. */
-      newbufpos = MIN (mhsink->bufqueue->len - 1,
-          get_buffers_max (sink, mhsink->units_soft_max) - 1);
-
-      while (newbufpos >= 0) {
-        GstBuffer *buf;
-
-        buf = g_array_index (mhsink->bufqueue, GstBuffer *, newbufpos);
-        if (is_sync_frame (mhsink, buf)) {
-          /* found a buffer that is not a delta unit */
-          break;
-        }
-        newbufpos--;
-      }
-      break;
-    default:
-      /* unknown recovery procedure */
-      newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
-      break;
-  }
-  return newbufpos;
-}
-
 /* Queue a buffer on the global queue.
  *
  * This function adds the buffer to the front of a GArray. It removes the
@@ -1682,12 +1168,12 @@ gst_multi_socket_sink_queue_buffer (GstMultiHandleSink * mhsink,
   queuelen = mhsink->bufqueue->len;
 
   if (mhsink->units_max > 0)
-    max_buffers = get_buffers_max (sink, mhsink->units_max);
+    max_buffers = get_buffers_max (mhsink, mhsink->units_max);
   else
     max_buffers = -1;
 
   if (mhsink->units_soft_max > 0)
-    soft_max_buffers = get_buffers_max (sink, mhsink->units_soft_max);
+    soft_max_buffers = get_buffers_max (mhsink, mhsink->units_soft_max);
   else
     soft_max_buffers = -1;
   GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers,
@@ -1718,7 +1204,7 @@ restart:
     if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) {
       gint newpos;
 
-      newpos = gst_multi_socket_sink_recover_client (sink, client);
+      newpos = gst_multi_handle_sink_recover_client (mhsink, mhclient);
       if (newpos != mhclient->bufpos) {
         mhclient->dropped_buffers += mhclient->bufpos - newpos;
         mhclient->bufpos = newpos;
@@ -1778,7 +1264,7 @@ restart:
     /* get index where the limits are ok, we don't really care if all limits
      * are ok, we just queue as much as we need. We also don't compare against
      * the max limits. */
-    find_limits (sink, &usage, mhsink->bytes_min, mhsink->buffers_min,
+    find_limits (mhsink, &usage, mhsink->bytes_min, mhsink->buffers_min,
         mhsink->time_min, &max, -1, -1, -1);
 
     max_buffer_usage = MAX (max_buffer_usage, usage + 1);
@@ -1968,20 +1454,7 @@ static void
 gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec)
 {
-  GstMultiSocketSink *multisocketsink;
-
-  multisocketsink = GST_MULTI_SOCKET_SINK (object);
-
   switch (prop_id) {
-    case PROP_UNIT_FORMAT:
-      multisocketsink->unit_format = g_value_get_enum (value);
-      break;
-    case PROP_BURST_FORMAT:
-      multisocketsink->def_burst_format = g_value_get_enum (value);
-      break;
-    case PROP_BURST_VALUE:
-      multisocketsink->def_burst_value = g_value_get_uint64 (value);
-      break;
 
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -1998,15 +1471,6 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
   multisocketsink = GST_MULTI_SOCKET_SINK (object);
 
   switch (prop_id) {
-    case PROP_UNIT_FORMAT:
-      g_value_set_enum (value, multisocketsink->unit_format);
-      break;
-    case PROP_BURST_FORMAT:
-      g_value_set_enum (value, multisocketsink->def_burst_format);
-      break;
-    case PROP_BURST_VALUE:
-      g_value_set_uint64 (value, multisocketsink->def_burst_value);
-      break;
     case PROP_NUM_SOCKETS:
       g_value_set_uint (value,
           g_hash_table_size (multisocketsink->socket_hash));
index d3e40d8..1a0222b 100644 (file)
@@ -57,13 +57,6 @@ typedef struct {
 
   GSocket *socket;
   GSource *source;
-
-  /* method to sync client when connecting */
-  GstSyncMethod sync_method;
-  GstFormat     burst_min_format;
-  guint64       burst_min_value;
-  GstFormat     burst_max_format;
-  guint64       burst_max_value;
 } GstSocketClient;
 
 /**
@@ -80,17 +73,6 @@ struct _GstMultiSocketSink {
   GMainContext *main_context;
   GCancellable *cancellable;
 
-  gboolean previous_buffer_in_caps;
-
-  guint mtu;
-
-  /* these values are used to check if a client is reading fast
-   * enough and to control receovery */
-  GstFormat unit_format;/* the format of the units */
-
-  GstFormat     def_burst_format;
-  guint64       def_burst_value;
-
   guint8 header_flags;
 };
 
index 3e8f5d6..66afed2 100644 (file)
@@ -467,7 +467,7 @@ GST_START_TEST (test_burst_client_bytes)
   /* make sure we keep at least 100 bytes at all times */
   g_object_set (sink, "bytes-min", 100, NULL);
   g_object_set (sink, "sync-method", 3, NULL);  /* 3 = burst */
-  g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL);
+  g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL);
   g_object_set (sink, "burst-value", (guint64) 80, NULL);
 
   fail_if (pipe (pfd1) == -1);
@@ -493,12 +493,10 @@ GST_START_TEST (test_burst_client_bytes)
 
   /* now add the clients */
   g_signal_emit_by_name (sink, "add", pfd1[1]);
-  g_signal_emit_by_name (sink, "add_full", pfd2[1], 3,
-      GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES,
-      (guint64) 200);
-  g_signal_emit_by_name (sink, "add_full", pfd3[1], 3,
-      GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES,
-      (guint64) 50);
+  g_signal_emit_by_name (sink, "add_full", pfd2[1], GST_SYNC_METHOD_BURST,
+      GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 200);
+  g_signal_emit_by_name (sink, "add_full", pfd3[1], GST_SYNC_METHOD_BURST,
+      GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 50);
 
   /* push last buffer to make client fds ready for reading */
   for (i = 9; i < 10; i++) {
@@ -555,7 +553,7 @@ GST_START_TEST (test_burst_client_bytes_keyframe)
   /* make sure we keep at least 100 bytes at all times */
   g_object_set (sink, "bytes-min", 100, NULL);
   g_object_set (sink, "sync-method", 4, NULL);  /* 4 = burst_keyframe */
-  g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL);
+  g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL);
   g_object_set (sink, "burst-value", (guint64) 80, NULL);
 
   fail_if (pipe (pfd1) == -1);
@@ -585,12 +583,12 @@ GST_START_TEST (test_burst_client_bytes_keyframe)
 
   /* now add the clients */
   g_signal_emit_by_name (sink, "add", pfd1[1]);
-  g_signal_emit_by_name (sink, "add_full", pfd2[1], 4,
-      GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES,
-      (guint64) 90);
-  g_signal_emit_by_name (sink, "add_full", pfd3[1], 4,
-      GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES,
-      (guint64) 50);
+  g_signal_emit_by_name (sink, "add_full", pfd2[1],
+      GST_SYNC_METHOD_BURST_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50,
+      GST_FORMAT_BYTES, (guint64) 90);
+  g_signal_emit_by_name (sink, "add_full", pfd3[1],
+      GST_SYNC_METHOD_BURST_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50,
+      GST_FORMAT_BYTES, (guint64) 50);
 
   /* push last buffer to make client fds ready for reading */
   for (i = 9; i < 10; i++) {
@@ -648,7 +646,7 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe)
   /* make sure we keep at least 100 bytes at all times */
   g_object_set (sink, "bytes-min", 100, NULL);
   g_object_set (sink, "sync-method", 5, NULL);  /* 5 = burst_with_keyframe */
-  g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL);
+  g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL);
   g_object_set (sink, "burst-value", (guint64) 80, NULL);
 
   fail_if (pipe (pfd1) == -1);
@@ -678,12 +676,12 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe)
 
   /* now add the clients */
   g_signal_emit_by_name (sink, "add", pfd1[1]);
-  g_signal_emit_by_name (sink, "add_full", pfd2[1], 5,
-      GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES,
-      (guint64) 90);
-  g_signal_emit_by_name (sink, "add_full", pfd3[1], 5,
-      GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES,
-      (guint64) 50);
+  g_signal_emit_by_name (sink, "add_full", pfd2[1],
+      GST_SYNC_METHOD_BURST_WITH_KEYFRAME, GST_FORMAT_BYTES,
+      (guint64) 50, GST_FORMAT_BYTES, (guint64) 90);
+  g_signal_emit_by_name (sink, "add_full", pfd3[1],
+      GST_SYNC_METHOD_BURST_WITH_KEYFRAME, GST_FORMAT_BYTES,
+      (guint64) 50, GST_FORMAT_BYTES, (guint64) 50);
 
   /* push last buffer to make client fds ready for reading */
   for (i = 9; i < 10; i++) {
index 591a98a..8e41600 100644 (file)
@@ -28,6 +28,8 @@
 #include <gio/gio.h>
 #include <gst/check/gstcheck.h>
 
+#include "gst/tcp/gstmultisocketsink.h"
+
 static GstPad *mysrcpad;
 
 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
@@ -528,9 +530,9 @@ GST_START_TEST (test_burst_client_bytes)
 
   /* now add the clients */
   g_signal_emit_by_name (sink, "add", socket[0]);
-  g_signal_emit_by_name (sink, "add_full", socket[2], 3,
+  g_signal_emit_by_name (sink, "add_full", socket[2], GST_SYNC_METHOD_BURST,
       GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 200);
-  g_signal_emit_by_name (sink, "add_full", socket[4], 3,
+  g_signal_emit_by_name (sink, "add_full", socket[4], GST_SYNC_METHOD_BURST,
       GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 50);
 
   /* push last buffer to make client fds ready for reading */
@@ -616,10 +618,12 @@ GST_START_TEST (test_burst_client_bytes_keyframe)
 
   /* now add the clients */
   g_signal_emit_by_name (sink, "add", socket[0]);
-  g_signal_emit_by_name (sink, "add_full", socket[2], 4,
-      GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 90);
-  g_signal_emit_by_name (sink, "add_full", socket[4], 4,
-      GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 50);
+  g_signal_emit_by_name (sink, "add_full", socket[2],
+      GST_SYNC_METHOD_BURST_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50,
+      GST_FORMAT_BYTES, (guint64) 90);
+  g_signal_emit_by_name (sink, "add_full", socket[4],
+      GST_SYNC_METHOD_BURST_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50,
+      GST_FORMAT_BYTES, (guint64) 50);
 
   /* push last buffer to make client fds ready for reading */
   for (i = 9; i < 10; i++) {
@@ -708,10 +712,12 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe)
 
   /* now add the clients */
   g_signal_emit_by_name (sink, "add", socket[0]);
-  g_signal_emit_by_name (sink, "add_full", socket[2], 5,
-      GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 90);
-  g_signal_emit_by_name (sink, "add_full", socket[4], 5,
-      GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 50);
+  g_signal_emit_by_name (sink, "add_full", socket[2],
+      GST_SYNC_METHOD_BURST_WITH_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50,
+      GST_FORMAT_BYTES, (guint64) 90);
+  g_signal_emit_by_name (sink, "add_full", socket[4],
+      GST_SYNC_METHOD_BURST_WITH_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50,
+      GST_FORMAT_BYTES, (guint64) 50);
 
   /* push last buffer to make client fds ready for reading */
   for (i = 9; i < 10; i++) {