multihandlesink: further refactoring
authorThomas Vander Stichele <thomas (at) apestaart (dot) org>
Fri, 27 Jan 2012 14:46:31 +0000 (15:46 +0100)
committerThomas Vander Stichele <thomas (at) apestaart (dot) org>
Sun, 12 Feb 2012 21:23:44 +0000 (22:23 +0100)
gst/tcp/gstmultifdsink.c
gst/tcp/gstmultifdsink.h
gst/tcp/gstmultihandlesink.c
gst/tcp/gstmultihandlesink.h
gst/tcp/gstmultisocketsink.c
gst/tcp/gstmultisocketsink.h
tests/check/elements/multifdsink.c
tests/check/elements/multisocketsink.c

index b78a99b..c510e82 100644 (file)
 
 #define NOT_IMPLEMENTED 0
 
-static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
-    GST_PAD_SINK,
-    GST_PAD_ALWAYS,
-    GST_STATIC_CAPS_ANY);
-
 GST_DEBUG_CATEGORY_STATIC (multifdsink_debug);
 #define GST_CAT_DEFAULT (multifdsink_debug)
 
@@ -154,16 +149,11 @@ enum
 
 /* this is really arbitrarily chosen */
 #define DEFAULT_MODE                    1
-#define DEFAULT_BUFFERS_MAX             -1
-#define DEFAULT_BUFFERS_SOFT_MAX        -1
-#define DEFAULT_UNIT_TYPE               GST_TCP_UNIT_TYPE_BUFFERS
-#define DEFAULT_UNITS_MAX               -1
-#define DEFAULT_UNITS_SOFT_MAX          -1
+#define DEFAULT_UNIT_FORMAT               GST_FORMAT_BUFFERS
 
-#define DEFAULT_BURST_UNIT              GST_TCP_UNIT_TYPE_UNDEFINED
+#define DEFAULT_BURST_FORMAT              GST_FORMAT_UNDEFINED
 #define DEFAULT_BURST_VALUE             0
 
-#define DEFAULT_QOS_DSCP                -1
 #define DEFAULT_HANDLE_READ             TRUE
 
 enum
@@ -171,18 +161,11 @@ enum
   PROP_0,
   PROP_MODE,
 
-  PROP_UNIT_TYPE,
-  PROP_UNITS_MAX,
-  PROP_UNITS_SOFT_MAX,
-
-  PROP_BUFFERS_MAX,
-  PROP_BUFFERS_SOFT_MAX,
+  PROP_UNIT_FORMAT,
 
-  PROP_BURST_UNIT,
+  PROP_BURST_FORMAT,
   PROP_BURST_VALUE,
 
-  PROP_QOS_DSCP,
-
   PROP_HANDLE_READ,
 
   PROP_NUM_FDS,
@@ -210,23 +193,23 @@ gst_fdset_mode_get_type (void)
   return fdset_mode_type;
 }
 
-#define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type())
+#define GST_TYPE_UNIT_FORMAT (gst_unit_format_get_type())
 static GType
-gst_unit_type_get_type (void)
+gst_unit_format_get_type (void)
 {
-  static GType unit_type_type = 0;
-  static const GEnumValue unit_type[] = {
-    {GST_TCP_UNIT_TYPE_UNDEFINED, "Undefined", "undefined"},
-    {GST_TCP_UNIT_TYPE_BUFFERS, "Buffers", "buffers"},
-    {GST_TCP_UNIT_TYPE_BYTES, "Bytes", "bytes"},
-    {GST_TCP_UNIT_TYPE_TIME, "Time", "time"},
+  static GType unit_format_type = 0;
+  static const GEnumValue unit_format[] = {
+    {GST_TCP_UNIT_FORMAT_UNDEFINED, "Undefined", "undefined"},
+    {GST_TCP_UNIT_FORMAT_BYTES, "Bytes", "bytes"},
+    {GST_TCP_UNIT_FORMAT_TIME, "Time", "time"},
+    {GST_TCP_UNIT_FORMAT_BUFFERS, "Buffers", "buffers"},
     {0, NULL, NULL},
   };
 
-  if (!unit_type_type) {
-    unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type);
+  if (!unit_format_type) {
+    unit_format_type = g_enum_register_static ("GstTCPUnitType", unit_format);
   }
-  return unit_type_type;
+  return unit_format_type;
 }
 
 static void gst_multi_fd_sink_finalize (GObject * object);
@@ -236,13 +219,15 @@ static void gst_multi_fd_sink_stop_pre (GstMultiHandleSink * mhsink);
 static void gst_multi_fd_sink_stop_post (GstMultiHandleSink * mhsink);
 static gboolean gst_multi_fd_sink_start_pre (GstMultiHandleSink * mhsink);
 static gpointer gst_multi_fd_sink_thread (GstMultiHandleSink * mhsink);
+static void gst_multi_fd_sink_queue_buffer (GstMultiHandleSink * mhsink,
+    GstBuffer * buffer);
+static gboolean gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink *
+    mhsink, GstMultiHandleClient * mhclient, GstBuffer * buffer);
+static int gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client);
 
 static void gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink,
     GList * link);
 
-static GstFlowReturn gst_multi_fd_sink_render (GstBaseSink * bsink,
-    GstBuffer * buf);
-
 static void gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
 static void gst_multi_fd_sink_get_property (GObject * object, guint prop_id,
@@ -258,12 +243,10 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
 {
   GObjectClass *gobject_class;
   GstElementClass *gstelement_class;
-  GstBaseSinkClass *gstbasesink_class;
   GstMultiHandleSinkClass *gstmultihandlesink_class;
 
   gobject_class = (GObjectClass *) klass;
   gstelement_class = (GstElementClass *) klass;
-  gstbasesink_class = (GstBaseSinkClass *) klass;
   gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass;
 
   gobject_class->set_property = gst_multi_fd_sink_set_property;
@@ -284,47 +267,22 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
           GST_TYPE_FDSET_MODE, DEFAULT_MODE,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
-  g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
-      g_param_spec_int ("buffers-max", "Buffers max",
-          "max number of buffers to queue for a client (-1 = no limit)", -1,
-          G_MAXINT, DEFAULT_BUFFERS_MAX,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX,
-      g_param_spec_int ("buffers-soft-max", "Buffers soft max",
-          "Recover client when going over this limit (-1 = no limit)", -1,
-          G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
-  g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
-      g_param_spec_enum ("unit-type", "Units type",
+  g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT,
+      g_param_spec_enum ("unit-format", "Units format",
           "The unit to measure the max/soft-max/queued properties",
-          GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
-      g_param_spec_int64 ("units-max", "Units max",
-          "max number of units to queue (-1 = no limit)", -1, G_MAXINT64,
-          DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX,
-      g_param_spec_int64 ("units-soft-max", "Units soft max",
-          "Recover client when going over this limit (-1 = no limit)", -1,
-          G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
+          GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
-  g_object_class_install_property (gobject_class, PROP_BURST_UNIT,
-      g_param_spec_enum ("burst-unit", "Burst unit",
+  g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
+      g_param_spec_enum ("burst-format", "Burst format",
           "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
-          GST_TYPE_UNIT_TYPE, DEFAULT_BURST_UNIT,
+          GST_TYPE_FORMAT, DEFAULT_BURST_FORMAT,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
       g_param_spec_uint64 ("burst-value", "Burst value",
           "The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
           DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
-  g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
-      g_param_spec_int ("qos-dscp", "QoS diff srv code point",
-          "Quality of Service, differentiated services code point (-1 default)",
-          -1, 63, DEFAULT_QOS_DSCP,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   /**
    * GstMultiFdSink::handle-read
    *
@@ -359,12 +317,12 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
    * @gstmultifdsink: the multifdsink element to emit this signal on
    * @fd:             the file descriptor to add to multifdsink
    * @sync:           the sync method to use
-   * @unit_type_min:  the unit-type of @value_min
+   * @unit_format_min:  the unit-format of @value_min
    * @value_min:      the minimum amount of data to burst expressed in
-   *                  @unit_type_min units.
-   * @unit_type_max:  the unit-type of @value_max
+   *                  @unit_format_min units.
+   * @unit_format_max:  the unit-format of @value_max
    * @value_max:      the maximum amount of data to burst expressed in
-   *                  @unit_type_max units.
+   *                  @unit_format_max units.
    *
    * Hand the given open file descriptor to multifdsink to write to and
    * specify the burst parameters for the new connection.
@@ -374,8 +332,8 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
           add_full), NULL, NULL,
       gst_tcp_marshal_VOID__INT_ENUM_INT_UINT64_INT_UINT64,
-      G_TYPE_NONE, 6, G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_TYPE,
-      G_TYPE_UINT64, GST_TYPE_UNIT_TYPE, G_TYPE_UINT64);
+      G_TYPE_NONE, 6, G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_FORMAT,
+      G_TYPE_UINT64, GST_TYPE_UNIT_FORMAT, G_TYPE_UINT64);
   /**
    * GstMultiFdSink::remove:
    * @gstmultifdsink: the multifdsink element to emit this signal on
@@ -477,17 +435,12 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
           client_fd_removed), NULL, NULL, gst_tcp_marshal_VOID__INT,
       G_TYPE_NONE, 1, G_TYPE_INT);
 
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&sinktemplate));
-
   gst_element_class_set_details_simple (gstelement_class,
       "Multi filedescriptor sink", "Sink/Network",
       "Send data to multiple filedescriptors",
       "Thomas Vander Stichele <thomas at apestaart dot org>, "
       "Wim Taymans <wim@fluendo.com>");
 
-  gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_render);
-
   gstmultihandlesink_class->clear_post =
       GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear_post);
 
@@ -499,6 +452,12 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
       GST_DEBUG_FUNCPTR (gst_multi_fd_sink_start_pre);
   gstmultihandlesink_class->thread =
       GST_DEBUG_FUNCPTR (gst_multi_fd_sink_thread);
+  gstmultihandlesink_class->queue_buffer =
+      GST_DEBUG_FUNCPTR (gst_multi_fd_sink_queue_buffer);
+  gstmultihandlesink_class->client_queue_buffer =
+      GST_DEBUG_FUNCPTR (gst_multi_fd_sink_client_queue_buffer);
+  gstmultihandlesink_class->client_get_fd =
+      GST_DEBUG_FUNCPTR (gst_multi_fd_sink_client_get_fd);
 
   gstmultihandlesink_class->remove_client_link =
       GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_client_link);
@@ -520,17 +479,12 @@ gst_multi_fd_sink_init (GstMultiFdSink * this)
 
   this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal);
 
-  this->unit_type = DEFAULT_UNIT_TYPE;
-  this->units_max = DEFAULT_UNITS_MAX;
-  this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
+  this->unit_format = DEFAULT_UNIT_FORMAT;
 
-  this->def_burst_unit = DEFAULT_BURST_UNIT;
+  this->def_burst_format = DEFAULT_BURST_FORMAT;
   this->def_burst_value = DEFAULT_BURST_VALUE;
 
-  this->qos_dscp = DEFAULT_QOS_DSCP;
   this->handle_read = DEFAULT_HANDLE_READ;
-
-  this->header_flags = 0;
 }
 
 static void
@@ -543,90 +497,19 @@ gst_multi_fd_sink_finalize (GObject * object)
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
-static gint
-setup_dscp_client (GstMultiFdSink * sink, GstTCPClient * client)
+static int
+gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client)
 {
-  gint tos;
-  gint ret;
-  union gst_sockaddr
-  {
-    struct sockaddr sa;
-    struct sockaddr_in6 sa_in6;
-    struct sockaddr_storage sa_stor;
-  } sa;
-  socklen_t slen = sizeof (sa);
-  gint af;
-
-  /* don't touch */
-  if (sink->qos_dscp < 0)
-    return 0;
-
-  if ((ret = getsockname (client->fd.fd, &sa.sa, &slen)) < 0) {
-    GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
-    return ret;
-  }
-
-  af = sa.sa.sa_family;
-
-  /* if this is an IPv4-mapped address then do IPv4 QoS */
-  if (af == AF_INET6) {
-
-    GST_DEBUG_OBJECT (sink, "check IP6 socket");
-    if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
-      GST_DEBUG_OBJECT (sink, "mapped to IPV4");
-      af = AF_INET;
-    }
-  }
-
-  /* extract and shift 6 bits of the DSCP */
-  tos = (sink->qos_dscp & 0x3f) << 2;
+  GstTCPClient *tclient = (GstTCPClient *) client;
 
-  switch (af) {
-    case AF_INET:
-      ret = setsockopt (client->fd.fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos));
-      break;
-    case AF_INET6:
-#ifdef IPV6_TCLASS
-      ret =
-          setsockopt (client->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, &tos,
-          sizeof (tos));
-      break;
-#endif
-    default:
-      ret = 0;
-      GST_ERROR_OBJECT (sink, "unsupported AF");
-      break;
-  }
-  if (ret)
-    GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
-
-  return ret;
-}
-
-
-static void
-setup_dscp (GstMultiFdSink * sink)
-{
-  GList *clients, *next;
-  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-
-  CLIENTS_LOCK (sink);
-  for (clients = mhsink->clients; clients; clients = next) {
-    GstTCPClient *client;
-
-    client = (GstTCPClient *) clients->data;
-    next = g_list_next (clients);
-
-    setup_dscp_client (sink, client);
-  }
-  CLIENTS_UNLOCK (sink);
+  return tclient->fd.fd;
 }
 
 /* "add-full" signal implementation */
 void
 gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
-    GstSyncMethod sync_method, GstTCPUnitType min_unit, guint64 min_value,
-    GstTCPUnitType max_unit, guint64 max_value)
+    GstSyncMethod sync_method, GstFormat min_format, guint64 min_value,
+    GstFormat max_format, guint64 max_value)
 {
   GstTCPClient *client;
   GstMultiHandleClient *mhclient;
@@ -636,12 +519,12 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
 
   GST_DEBUG_OBJECT (sink, "[fd %5d] adding client, sync_method %d, "
-      "min_unit %d, min_value %" G_GUINT64_FORMAT
-      ", max_unit %d, max_value %" G_GUINT64_FORMAT, fd, sync_method,
-      min_unit, min_value, max_unit, max_value);
+      "min_format %d, min_value %" G_GUINT64_FORMAT
+      ", max_format %d, max_value %" G_GUINT64_FORMAT, fd, sync_method,
+      min_format, min_value, max_format, max_value);
 
   /* do limits check if we can */
-  if (min_unit == max_unit) {
+  if (min_format == max_format) {
     if (max_value != -1 && min_value != -1 && max_value < min_value)
       goto wrong_limits;
   }
@@ -653,9 +536,9 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
   g_snprintf (mhclient->debug, 30, "[fd %5d]", fd);
 
   client->fd.fd = fd;
-  client->burst_min_unit = min_unit;
+  client->burst_min_format = min_format;
   client->burst_min_value = min_value;
-  client->burst_max_unit = max_unit;
+  client->burst_max_format = max_format;
   client->burst_max_value = max_value;
 
   CLIENTS_LOCK (sink);
@@ -689,7 +572,7 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
   /* figure out the mode, can't use send() for non sockets */
   if (fstat (fd, &statbuf) == 0 && S_ISSOCK (statbuf.st_mode)) {
     client->is_socket = TRUE;
-    setup_dscp_client (sink, client);
+    gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient);
   }
 
   gst_poll_restart (sink->fdset);
@@ -707,7 +590,7 @@ wrong_limits:
     GST_WARNING_OBJECT (sink,
         "[fd %5d] wrong values min =%" G_GUINT64_FORMAT ", max=%"
         G_GUINT64_FORMAT ", unit %d specified when adding client", fd,
-        min_value, max_value, min_unit);
+        min_value, max_value, min_format);
     return;
   }
 duplicate:
@@ -731,7 +614,8 @@ gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd)
 
   mhsink = GST_MULTI_HANDLE_SINK (sink);
   gst_multi_fd_sink_add_full (sink, fd, mhsink->def_sync_method,
-      sink->def_burst_unit, sink->def_burst_value, sink->def_burst_unit, -1);
+      sink->def_burst_format, sink->def_burst_value, sink->def_burst_format,
+      -1);
 }
 
 /* "remove" signal implementation */
@@ -1086,16 +970,16 @@ ioctl_failed:
 
 /* queue the given buffer for the given client */
 static gboolean
-gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink,
-    GstTCPClient * client, GstBuffer * buffer)
+gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
+    GstMultiHandleClient * mhclient, GstBuffer * buffer)
 {
   GstCaps *caps;
 
   /* TRUE: send them if the new caps have them */
   gboolean send_streamheader = FALSE;
   GstStructure *s;
-  GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
-  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
+  GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink);
+  GstTCPClient *client = (GstTCPClient *) mhclient;
 
   /* before we queue the buffer, we check if we need to queue streamheader
    * buffers (because it's a new client, or because they changed) */
@@ -1212,10 +1096,10 @@ get_buffers_max (GstMultiFdSink * sink, gint64 max)
 {
   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
 
-  switch (sink->unit_type) {
-    case GST_TCP_UNIT_TYPE_BUFFERS:
+  switch (sink->unit_format) {
+    case GST_TCP_UNIT_FORMAT_BUFFERS:
       return max;
-    case GST_TCP_UNIT_TYPE_TIME:
+    case GST_TCP_UNIT_FORMAT_TIME:
     {
       GstBuffer *buf;
       int i;
@@ -1239,7 +1123,7 @@ get_buffers_max (GstMultiFdSink * sink, gint64 max)
       }
       return len + 1;
     }
-    case GST_TCP_UNIT_TYPE_BYTES:
+    case GST_TCP_UNIT_FORMAT_BYTES:
     {
       GstBuffer *buf;
       int i;
@@ -1383,23 +1267,23 @@ find_limits (GstMultiFdSink * sink,
  * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
  */
 static gboolean
-assign_value (GstTCPUnitType unit, guint64 value, gint * bytes, gint * buffers,
+assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers,
     GstClockTime * time)
 {
   gboolean res = TRUE;
 
   /* set only the limit of the given format to the given value */
-  switch (unit) {
-    case GST_TCP_UNIT_TYPE_BUFFERS:
+  switch (format) {
+    case GST_TCP_UNIT_FORMAT_BUFFERS:
       *buffers = (gint) value;
       break;
-    case GST_TCP_UNIT_TYPE_TIME:
+    case GST_TCP_UNIT_FORMAT_TIME:
       *time = value;
       break;
-    case GST_TCP_UNIT_TYPE_BYTES:
+    case GST_TCP_UNIT_FORMAT_BYTES:
       *bytes = (gint) value;
       break;
-    case GST_TCP_UNIT_TYPE_UNDEFINED:
+    case GST_TCP_UNIT_FORMAT_UNDEFINED:
     default:
       res = FALSE;
       break;
@@ -1416,16 +1300,16 @@ assign_value (GstTCPUnitType unit, guint64 value, gint * bytes, gint * buffers,
  * function returns FALSE.
  */
 static gboolean
-count_burst_unit (GstMultiFdSink * sink, gint * min_idx,
-    GstTCPUnitType min_unit, guint64 min_value, gint * max_idx,
-    GstTCPUnitType max_unit, guint64 max_value)
+count_burst_format (GstMultiFdSink * sink, gint * min_idx,
+    GstFormat min_format, guint64 min_value, gint * max_idx,
+    GstFormat max_format, guint64 max_value)
 {
   gint bytes_min = -1, buffers_min = -1;
   gint bytes_max = -1, buffers_max = -1;
   GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE;
 
-  assign_value (min_unit, min_value, &bytes_min, &buffers_min, &time_min);
-  assign_value (max_unit, max_value, &bytes_max, &buffers_max, &time_max);
+  assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min);
+  assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max);
 
   return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
       max_idx, bytes_max, buffers_max, time_max);
@@ -1518,11 +1402,11 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
        * is not enough data, we just send what we have (which is in result).
        * We use the max value to limit the search
        */
-      ok = count_burst_unit (sink, &result, client->burst_min_unit,
-          client->burst_min_value, &max, client->burst_max_unit,
+      ok = count_burst_format (sink, &result, client->burst_min_format,
+          client->burst_min_value, &max, client->burst_max_format,
           client->burst_max_value);
       GST_DEBUG_OBJECT (sink,
-          "[fd %5d] SYNC_METHOD_BURST: burst_unit returned %d, result %d",
+          "[fd %5d] SYNC_METHOD_BURST: burst_format returned %d, result %d",
           client->fd.fd, ok, result);
 
       GST_LOG_OBJECT (sink, "min %d, max %d", result, max);
@@ -1549,8 +1433,8 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
        * NEXT_KEYFRAME.
        */
       /* gather burst limits */
-      count_burst_unit (sink, &min_idx, client->burst_min_unit,
-          client->burst_min_value, &max_idx, client->burst_max_unit,
+      count_burst_format (sink, &min_idx, client->burst_min_format,
+          client->burst_min_value, &max_idx, client->burst_max_format,
           client->burst_max_value);
 
       GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
@@ -1596,8 +1480,8 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
        * amount of data up 'till min.
        */
       /* gather enough data to burst */
-      count_burst_unit (sink, &min_idx, client->burst_min_unit,
-          client->burst_min_value, &max_idx, client->burst_max_unit,
+      count_burst_format (sink, &min_idx, client->burst_min_format,
+          client->burst_min_value, &max_idx, client->burst_max_format,
           client->burst_max_value);
 
       GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
@@ -1662,10 +1546,11 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
   gboolean flushing;
   GstClockTime now;
   GTimeVal nowtv;
-  GstMultiHandleSink *mhsink;
+  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
+  GstMultiHandleSinkClass *mhsinkclass =
+      GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
 
-  mhsink = GST_MULTI_HANDLE_SINK (sink);
 
   g_get_current_time (&nowtv);
   now = GST_TIMEVAL_TO_TIME (nowtv);
@@ -1731,7 +1616,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
             fd, client, mhclient->bufpos);
 
         /* queueing a buffer will ref it */
-        gst_multi_fd_sink_client_queue_buffer (sink, client, buf);
+        mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
 
         /* need to start from the first byte for this new buffer */
         mhclient->bufoffset = 0;
@@ -1850,13 +1735,13 @@ gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
       break;
     case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
       /* move to beginning of soft max */
-      newbufpos = get_buffers_max (sink, sink->units_soft_max);
+      newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
       break;
     case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
       /* find keyframe in buffers, we search backwards to find the
        * closest keyframe relative to what this client already received. */
       newbufpos = MIN (mhsink->bufqueue->len - 1,
-          get_buffers_max (sink, sink->units_soft_max) - 1);
+          get_buffers_max (sink, mhsink->units_soft_max) - 1);
 
       while (newbufpos >= 0) {
         GstBuffer *buf;
@@ -1871,7 +1756,7 @@ gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
       break;
     default:
       /* unknown recovery procedure */
-      newbufpos = get_buffers_max (sink, sink->units_soft_max);
+      newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
       break;
   }
   return newbufpos;
@@ -1896,7 +1781,7 @@ gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
  * the select thread that the fd_set changed.
  */
 static void
-gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
+gst_multi_fd_sink_queue_buffer (GstMultiHandleSink * mhsink, GstBuffer * buffer)
 {
   GList *clients, *next;
   gint queuelen;
@@ -1907,25 +1792,25 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
   GstClockTime now;
   gint max_buffers, soft_max_buffers;
   guint cookie;
-  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
+  GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink);
   GstMultiHandleSinkClass *mhsinkclass =
       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
 
   g_get_current_time (&nowtv);
   now = GST_TIMEVAL_TO_TIME (nowtv);
 
-  CLIENTS_LOCK (sink);
+  CLIENTS_LOCK (mhsink);
   /* add buffer to queue */
-  g_array_prepend_val (mhsink->bufqueue, buf);
+  g_array_prepend_val (mhsink->bufqueue, buffer);
   queuelen = mhsink->bufqueue->len;
 
-  if (sink->units_max > 0)
-    max_buffers = get_buffers_max (sink, sink->units_max);
+  if (mhsink->units_max > 0)
+    max_buffers = get_buffers_max (sink, mhsink->units_max);
   else
     max_buffers = -1;
 
-  if (sink->units_soft_max > 0)
-    soft_max_buffers = get_buffers_max (sink, sink->units_soft_max);
+  if (mhsink->units_soft_max > 0)
+    soft_max_buffers = get_buffers_max (sink, mhsink->units_soft_max);
   else
     soft_max_buffers = -1;
   GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers,
@@ -2260,116 +2145,6 @@ gst_multi_fd_sink_thread (GstMultiHandleSink * mhsink)
   return NULL;
 }
 
-static GstFlowReturn
-gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
-{
-  GstMultiFdSink *sink;
-  gboolean in_caps;
-#if 0
-  GstCaps *bufcaps, *padcaps;
-#endif
-  GstMultiHandleSink *mhsink;
-
-  sink = GST_MULTI_FD_SINK (bsink);
-  mhsink = GST_MULTI_HANDLE_SINK (sink);
-
-  g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink,
-          GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING);
-
-#if 0
-  /* since we check every buffer for streamheader caps, we need to make
-   * sure every buffer has caps set */
-  bufcaps = gst_buffer_get_caps (buf);
-  padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink));
-
-  /* make sure we have caps on the pad */
-  if (!padcaps && !bufcaps)
-    goto no_caps;
-#endif
-
-  /* get HEADER first, code below might mess with the flags */
-  in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
-
-#if 0
-  /* stamp the buffer with previous caps if no caps set */
-  if (!bufcaps) {
-    if (!gst_buffer_is_writable (buf)) {
-      /* metadata is not writable, copy will be made and original buffer
-       * will be unreffed so we need to ref so that we don't lose the
-       * buffer in the render method. */
-      gst_buffer_ref (buf);
-      /* the new buffer is ours only, we keep it out of the scope of this
-       * function */
-      buf = gst_buffer_make_writable (buf);
-    } else {
-      /* else the metadata is writable, we ref because we keep the buffer
-       * out of the scope of this method */
-      gst_buffer_ref (buf);
-    }
-    /* buffer metadata is writable now, set the caps */
-    gst_buffer_set_caps (buf, padcaps);
-  } else {
-    gst_caps_unref (bufcaps);
-
-    /* since we keep this buffer out of the scope of this method */
-    gst_buffer_ref (buf);
-  }
-#endif
-  gst_buffer_ref (buf);
-
-  GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %"
-      G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
-      ", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT,
-      buf, in_caps ? "yes" : "no", GST_BUFFER_OFFSET (buf),
-      GST_BUFFER_OFFSET_END (buf),
-      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
-      GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
-
-  /* if we get HEADER buffers, but the previous buffer was not HEADER,
-   * it means we're getting new streamheader buffers, and we should clear
-   * the old ones */
-  if (in_caps && sink->previous_buffer_in_caps == FALSE) {
-    GST_DEBUG_OBJECT (sink,
-        "receiving new HEADER buffers, clearing old streamheader");
-    g_slist_foreach (mhsink->streamheader, (GFunc) gst_mini_object_unref, NULL);
-    g_slist_free (mhsink->streamheader);
-    mhsink->streamheader = NULL;
-  }
-
-  /* save the current in_caps */
-  sink->previous_buffer_in_caps = in_caps;
-
-  /* if the incoming buffer is marked as IN CAPS, then we assume for now
-   * it's a streamheader that needs to be sent to each new client, so we
-   * put it on our internal list of streamheader buffers.
-   * FIXME: we could check if the buffer's contents are in fact part of the
-   * current streamheader.
-   *
-   * We don't send the buffer to the client, since streamheaders are sent
-   * separately when necessary. */
-  if (in_caps) {
-    GST_DEBUG_OBJECT (sink, "appending HEADER buffer with length %"
-        G_GSIZE_FORMAT " to streamheader", gst_buffer_get_size (buf));
-    mhsink->streamheader = g_slist_append (mhsink->streamheader, buf);
-  } else {
-    /* queue the buffer, this is a regular data buffer. */
-    gst_multi_fd_sink_queue_buffer (sink, buf);
-
-    mhsink->bytes_to_serve += gst_buffer_get_size (buf);
-  }
-  return GST_FLOW_OK;
-
-  /* ERRORS */
-#if 0
-no_caps:
-  {
-    GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL),
-        ("Received first buffer without caps set"));
-    return GST_FLOW_NOT_NEGOTIATED;
-  }
-#endif
-}
-
 static void
 gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec)
@@ -2382,31 +2157,15 @@ gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
     case PROP_MODE:
       multifdsink->mode = g_value_get_enum (value);
       break;
-    case PROP_BUFFERS_MAX:
-      multifdsink->units_max = g_value_get_int (value);
-      break;
-    case PROP_BUFFERS_SOFT_MAX:
-      multifdsink->units_soft_max = g_value_get_int (value);
-      break;
-    case PROP_UNIT_TYPE:
-      multifdsink->unit_type = g_value_get_enum (value);
-      break;
-    case PROP_UNITS_MAX:
-      multifdsink->units_max = g_value_get_int64 (value);
-      break;
-    case PROP_UNITS_SOFT_MAX:
-      multifdsink->units_soft_max = g_value_get_int64 (value);
+    case PROP_UNIT_FORMAT:
+      multifdsink->unit_format = g_value_get_enum (value);
       break;
-    case PROP_BURST_UNIT:
-      multifdsink->def_burst_unit = g_value_get_enum (value);
+    case PROP_BURST_FORMAT:
+      multifdsink->def_burst_format = g_value_get_enum (value);
       break;
     case PROP_BURST_VALUE:
       multifdsink->def_burst_value = g_value_get_uint64 (value);
       break;
-    case PROP_QOS_DSCP:
-      multifdsink->qos_dscp = g_value_get_int (value);
-      setup_dscp (multifdsink);
-      break;
     case PROP_HANDLE_READ:
       multifdsink->handle_read = g_value_get_boolean (value);
       break;
@@ -2429,30 +2188,15 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_MODE:
       g_value_set_enum (value, multifdsink->mode);
       break;
-    case PROP_BUFFERS_MAX:
-      g_value_set_int (value, multifdsink->units_max);
-      break;
-    case PROP_BUFFERS_SOFT_MAX:
-      g_value_set_int (value, multifdsink->units_soft_max);
-      break;
-    case PROP_UNIT_TYPE:
-      g_value_set_enum (value, multifdsink->unit_type);
+    case PROP_UNIT_FORMAT:
+      g_value_set_enum (value, multifdsink->unit_format);
       break;
-    case PROP_UNITS_MAX:
-      g_value_set_int64 (value, multifdsink->units_max);
-      break;
-    case PROP_UNITS_SOFT_MAX:
-      g_value_set_int64 (value, multifdsink->units_soft_max);
-      break;
-    case PROP_BURST_UNIT:
-      g_value_set_enum (value, multifdsink->def_burst_unit);
+    case PROP_BURST_FORMAT:
+      g_value_set_enum (value, multifdsink->def_burst_format);
       break;
     case PROP_BURST_VALUE:
       g_value_set_uint64 (value, multifdsink->def_burst_value);
       break;
-    case PROP_QOS_DSCP:
-      g_value_set_int (value, multifdsink->qos_dscp);
-      break;
     case PROP_HANDLE_READ:
       g_value_set_boolean (value, multifdsink->handle_read);
       break;
index a545189..89ec423 100644 (file)
@@ -49,19 +49,19 @@ typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass;
 
 /**
  * GstTCPUnitType:
- * @GST_TCP_UNIT_TYPE_UNDEFINED: undefined
- * @GST_TCP_UNIT_TYPE_BUFFERS  : buffers
- * @GST_TCP_UNIT_TYPE_TIME     : timeunits (in nanoseconds)
- * @GST_TCP_UNIT_TYPE_BYTES    : bytes
+ * @GST_TCP_UNIT_FORMAT_UNDEFINED: undefined
+ * @GST_TCP_UNIT_FORMAT_BUFFERS  : buffers
+ * @GST_TCP_UNIT_FORMAT_TIME     : timeunits (in nanoseconds)
+ * @GST_TCP_UNIT_FORMAT_BYTES    : bytes
  *
  * The units used to specify limits.
  */
 typedef enum
 {
-  GST_TCP_UNIT_TYPE_UNDEFINED,
-  GST_TCP_UNIT_TYPE_BUFFERS,
-  GST_TCP_UNIT_TYPE_TIME,
-  GST_TCP_UNIT_TYPE_BYTES
+  GST_TCP_UNIT_FORMAT_UNDEFINED,
+  GST_TCP_UNIT_FORMAT_BUFFERS,
+  GST_TCP_UNIT_FORMAT_TIME,
+  GST_TCP_UNIT_FORMAT_BYTES,
 } GstTCPUnitType;
 
 /* structure for a client
@@ -77,9 +77,9 @@ typedef struct {
 
   /* method to sync client when connecting */
   GstSyncMethod sync_method;
-  GstTCPUnitType   burst_min_unit;
+  GstFormat   burst_min_format;
   guint64          burst_min_value;
-  GstTCPUnitType   burst_max_unit;
+  GstFormat   burst_max_format;
   guint64          burst_max_value;
 } GstTCPClient;
 
@@ -97,19 +97,13 @@ struct _GstMultiFdSink {
   gint mode;
   GstPoll *fdset;
 
-  gboolean previous_buffer_in_caps;
-
-  guint mtu;
-  gint qos_dscp;
   gboolean handle_read;
 
   /* these values are used to check if a client is reading fast
    * enough and to control receovery */
-  GstTCPUnitType unit_type;/* the type of the units */
-  gint64 units_max;       /* max units to queue for a client */
-  gint64 units_soft_max;  /* max units a client can lag before recovery starts */
+  GstFormat unit_format;/* the type of the units */
 
-  GstTCPUnitType   def_burst_unit;
+  GstFormat   def_burst_format;
   guint64       def_burst_value;
 
   guint8 header_flags;
@@ -121,8 +115,8 @@ struct _GstMultiFdSinkClass {
   /* element methods */
   void          (*add)          (GstMultiFdSink *sink, int fd);
   void          (*add_full)     (GstMultiFdSink *sink, int fd, GstSyncMethod sync,
-                                GstTCPUnitType format, guint64 value, 
-                                GstTCPUnitType max_unit, guint64 max_value);
+                                GstFormat format, guint64 value, 
+                                GstFormat max_format, guint64 max_value);
   void          (*remove)       (GstMultiFdSink *sink, int fd);
   void          (*remove_flush) (GstMultiFdSink *sink, int fd);
   void          (*clear)        (GstMultiFdSink *sink);
@@ -142,8 +136,8 @@ GType gst_multi_fd_sink_get_type (void);
 
 void          gst_multi_fd_sink_add          (GstMultiFdSink *sink, int fd);
 void          gst_multi_fd_sink_add_full     (GstMultiFdSink *sink, int fd, GstSyncMethod sync, 
-                                              GstTCPUnitType min_unit, guint64 min_value,
-                                              GstTCPUnitType max_unit, guint64 max_value);
+                                              GstFormat min_format, guint64 min_value,
+                                              GstFormat max_format, guint64 max_value);
 void          gst_multi_fd_sink_remove       (GstMultiFdSink *sink, int fd);
 void          gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, int fd);
 void          gst_multi_fd_sink_clear        (GstMultiHandleSink *sink);
index 2dcaedf..bb0ffcb 100644 (file)
@@ -144,13 +144,12 @@ enum
 
 
 /* this is really arbitrarily chosen */
-#define DEFAULT_MODE                    1
 #define DEFAULT_BUFFERS_MAX             -1
 #define DEFAULT_BUFFERS_SOFT_MAX        -1
 #define DEFAULT_TIME_MIN                -1
 #define DEFAULT_BYTES_MIN               -1
 #define DEFAULT_BUFFERS_MIN             -1
-#define DEFAULT_UNIT_TYPE               GST_FORMAT_BUFFERS
+#define DEFAULT_UNIT_FORMAT               GST_FORMAT_BUFFERS
 #define DEFAULT_UNITS_MAX               -1
 #define DEFAULT_UNITS_SOFT_MAX          -1
 #define DEFAULT_RECOVER_POLICY          GST_RECOVER_POLICY_NONE
@@ -167,21 +166,18 @@ enum
 enum
 {
   PROP_0,
-#if 0
-  PROP_MODE,
-#endif
   PROP_BUFFERS_QUEUED,
   PROP_BYTES_QUEUED,
   PROP_TIME_QUEUED,
 
 #if 0
-  PROP_UNIT_TYPE,
+  PROP_UNIT_FORMAT,
+#endif
   PROP_UNITS_MAX,
   PROP_UNITS_SOFT_MAX,
 
   PROP_BUFFERS_MAX,
   PROP_BUFFERS_SOFT_MAX,
-#endif
 
   PROP_TIME_MIN,
   PROP_BYTES_MIN,
@@ -196,9 +192,9 @@ enum
 #if 0
   PROP_BURST_FORMAT,
   PROP_BURST_VALUE,
+#endif
 
   PROP_QOS_DSCP,
-#endif
 
   PROP_RESEND_STREAMHEADER,
 
@@ -304,9 +300,9 @@ static gboolean gst_multi_handle_sink_socket_condition (GSocket * socket,
     GIOCondition condition, GstMultiHandleSink * sink);
 #endif
 
-#if 0
 static GstFlowReturn gst_multi_handle_sink_render (GstBaseSink * bsink,
     GstBuffer * buf);
+#if 0
 static gboolean gst_multi_handle_sink_unlock (GstBaseSink * bsink);
 static gboolean gst_multi_handle_sink_unlock_stop (GstBaseSink * bsink);
 #endif
@@ -328,15 +324,11 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
 {
   GObjectClass *gobject_class;
   GstElementClass *gstelement_class;
-#if 0
   GstBaseSinkClass *gstbasesink_class;
-#endif
 
   gobject_class = (GObjectClass *) klass;
   gstelement_class = (GstElementClass *) klass;
-#if 0
   gstbasesink_class = (GstBaseSinkClass *) klass;
-#endif
 
   gobject_class->set_property = gst_multi_handle_sink_set_property;
   gobject_class->get_property = gst_multi_handle_sink_get_property;
@@ -374,10 +366,10 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
 #if 0
-  g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
-      g_param_spec_enum ("unit-type", "Units type",
+  g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT,
+      g_param_spec_enum ("unit-format", "Units format",
           "The unit to measure the max/soft-max/queued properties",
-          GST_TYPE_FORMAT, DEFAULT_UNIT_TYPE,
+          GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
       g_param_spec_int64 ("units-max", "Units max",
@@ -438,6 +430,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
       g_param_spec_uint64 ("burst-value", "Burst value",
           "The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
           DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+#endif
 
   g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
       g_param_spec_int ("qos-dscp", "QoS diff srv code point",
@@ -445,6 +438,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
           -1, 63, DEFAULT_QOS_DSCP,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+#if 0
   /**
    * GstMultiHandleSink::handle-read
    *
@@ -638,9 +632,9 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
 
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_multi_handle_sink_change_state);
-#if 0
 
   gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_render);
+#if 0
   gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_unlock);
   gstbasesink_class->unlock_stop =
       GST_DEBUG_FUNCPTR (gst_multi_handle_sink_unlock_stop);
@@ -673,7 +667,7 @@ gst_multi_handle_sink_init (GstMultiHandleSink * this)
 #endif
 
   this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
-  this->unit_type = DEFAULT_UNIT_TYPE;
+  this->unit_format = DEFAULT_UNIT_FORMAT;
   this->units_max = DEFAULT_UNITS_MAX;
   this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
   this->time_min = DEFAULT_TIME_MIN;
@@ -683,15 +677,15 @@ gst_multi_handle_sink_init (GstMultiHandleSink * this)
 
   this->timeout = DEFAULT_TIMEOUT;
   this->def_sync_method = DEFAULT_SYNC_METHOD;
+
+#if 0
   this->def_burst_format = DEFAULT_BURST_FORMAT;
   this->def_burst_value = DEFAULT_BURST_VALUE;
+#endif
 
   this->qos_dscp = DEFAULT_QOS_DSCP;
 
   this->resend_streamheader = DEFAULT_RESEND_STREAMHEADER;
-
-  this->header_flags = 0;
-  this->cancellable = g_cancellable_new ();
 }
 
 static void
@@ -707,9 +701,9 @@ gst_multi_handle_sink_finalize (GObject * object)
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
-#if 0
-static gint
-setup_dscp_client (GstMultiHandleSink * sink, GstSocketClient * client)
+gint
+gst_multi_handle_sink_setup_dscp_client (GstMultiHandleSink * sink,
+    GstMultiHandleClient * client)
 {
 #ifndef IP_TOS
   return 0;
@@ -725,12 +719,13 @@ setup_dscp_client (GstMultiHandleSink * sink, GstSocketClient * client)
   } sa;
   socklen_t slen = sizeof (sa);
   gint af;
+  GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (sink);
 
   /* don't touch */
   if (sink->qos_dscp < 0)
     return 0;
 
-  fd = g_socket_get_fd (client->socket);
+  fd = mhsinkclass->client_get_fd (client);
 
   if ((ret = getsockname (fd, &sa.sa, &slen)) < 0) {
     GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
@@ -773,23 +768,6 @@ setup_dscp_client (GstMultiHandleSink * sink, GstSocketClient * client)
 #endif
 }
 
-static void
-setup_dscp (GstMultiHandleSink * sink)
-{
-  GList *clients;
-
-  CLIENTS_LOCK (sink);
-  for (clients = sink->clients; clients; clients = clients->next) {
-    GstSocketClient *client;
-
-    client = clients->data;
-
-    setup_dscp_client (sink, client);
-  }
-  CLIENTS_UNLOCK (sink);
-}
-#endif
-
 void
 gst_multi_handle_sink_client_init (GstMultiHandleClient * client,
     GstSyncMethod sync_method)
@@ -818,6 +796,23 @@ gst_multi_handle_sink_client_init (GstMultiHandleClient * client,
   client->last_activity_time = client->connect_time;
 }
 
+void
+gst_multi_handle_sink_setup_dscp (GstMultiHandleSink * mhsink)
+{
+  GList *clients;
+
+  CLIENTS_LOCK (mhsink);
+  for (clients = mhsink->clients; clients; clients = clients->next) {
+    GstMultiHandleClient *client;
+
+    client = clients->data;
+
+    gst_multi_handle_sink_setup_dscp_client (mhsink, client);
+  }
+  CLIENTS_UNLOCK (mhsink);
+}
+
+
 #if 0
 /* "add-full" signal implementation */
 void
@@ -1421,7 +1416,7 @@ find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction)
 static gint
 get_buffers_max (GstMultiHandleSink * sink, gint64 max)
 {
-  switch (sink->unit_type) {
+  switch (sink->unit_format) {
     case GST_FORMAT_BUFFERS:
       return max;
     case GST_FORMAT_TIME:
@@ -2409,20 +2404,19 @@ gst_multi_handle_sink_thread (GstMultiHandleSink * sink)
 }
 #endif
 
-#if 0
 static GstFlowReturn
 gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf)
 {
-  GstMultiHandleSink *sink;
   gboolean in_caps;
 #if 0
   GstCaps *bufcaps, *padcaps;
 #endif
 
-  sink = GST_MULTI_HANDLE_SINK (bsink);
+  GstMultiHandleSink *sink = GST_MULTI_HANDLE_SINK (bsink);
+  GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (sink);
 
   g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink,
-          GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_WRONG_STATE);
+          GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING);
 
 #if 0
   /* since we check every buffer for streamheader caps, we need to make
@@ -2436,7 +2430,7 @@ gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf)
 #endif
 
   /* get IN_CAPS first, code below might mess with the flags */
-  in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
+  in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
 
 #if 0
   /* stamp the buffer with previous caps if no caps set */
@@ -2463,6 +2457,7 @@ gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf)
     gst_buffer_ref (buf);
   }
 #endif
+  gst_buffer_ref (buf);
 
   GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %"
       G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
@@ -2477,7 +2472,7 @@ gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf)
    * the old ones */
   if (in_caps && sink->previous_buffer_in_caps == FALSE) {
     GST_DEBUG_OBJECT (sink,
-        "receiving new IN_CAPS buffers, clearing old streamheader");
+        "receiving new HEADER buffers, clearing old streamheader");
     g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL);
     g_slist_free (sink->streamheader);
     sink->streamheader = NULL;
@@ -2495,12 +2490,12 @@ gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf)
    * We don't send the buffer to the client, since streamheaders are sent
    * separately when necessary. */
   if (in_caps) {
-    GST_DEBUG_OBJECT (sink, "appending IN_CAPS buffer with length %"
+    GST_DEBUG_OBJECT (sink, "appending HEADER buffer with length %"
         G_GSIZE_FORMAT " to streamheader", gst_buffer_get_size (buf));
     sink->streamheader = g_slist_append (sink->streamheader, buf);
   } else {
     /* queue the buffer, this is a regular data buffer. */
-    gst_multi_handle_sink_queue_buffer (sink, buf);
+    mhsinkclass->queue_buffer (sink, buf);
 
     sink->bytes_to_serve += gst_buffer_get_size (buf);
   }
@@ -2516,7 +2511,6 @@ no_caps:
   }
 #endif
 }
-#endif
 
 static void
 gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
@@ -2527,14 +2521,12 @@ gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
   multihandlesink = GST_MULTI_HANDLE_SINK (object);
 
   switch (prop_id) {
-#if 0
     case PROP_BUFFERS_MAX:
       multihandlesink->units_max = g_value_get_int (value);
       break;
     case PROP_BUFFERS_SOFT_MAX:
       multihandlesink->units_soft_max = g_value_get_int (value);
       break;
-#endif
     case PROP_TIME_MIN:
       multihandlesink->time_min = g_value_get_int64 (value);
       break;
@@ -2545,8 +2537,8 @@ gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
       multihandlesink->buffers_min = g_value_get_int (value);
       break;
 #if 0
-    case PROP_UNIT_TYPE:
-      multihandlesink->unit_type = g_value_get_enum (value);
+    case PROP_UNIT_FORMAT:
+      multihandlesink->unit_format = g_value_get_enum (value);
       break;
     case PROP_UNITS_MAX:
       multihandlesink->units_max = g_value_get_int64 (value);
@@ -2571,14 +2563,16 @@ gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
     case PROP_BURST_VALUE:
       multihandlesink->def_burst_value = g_value_get_uint64 (value);
       break;
+#endif
     case PROP_QOS_DSCP:
       multihandlesink->qos_dscp = g_value_get_int (value);
-      setup_dscp (multihandlesink);
+      gst_multi_handle_sink_setup_dscp (multihandlesink);
       break;
+
     case PROP_RESEND_STREAMHEADER:
       multihandlesink->resend_streamheader = g_value_get_boolean (value);
       break;
-#endif
+
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -2594,14 +2588,12 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
   multihandlesink = GST_MULTI_HANDLE_SINK (object);
 
   switch (prop_id) {
-#if 0
     case PROP_BUFFERS_MAX:
       g_value_set_int (value, multihandlesink->units_max);
       break;
     case PROP_BUFFERS_SOFT_MAX:
       g_value_set_int (value, multihandlesink->units_soft_max);
       break;
-#endif
     case PROP_TIME_MIN:
       g_value_set_int64 (value, multihandlesink->time_min);
       break;
@@ -2621,8 +2613,8 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
       g_value_set_uint64 (value, multihandlesink->time_queued);
       break;
 #if 0
-    case PROP_UNIT_TYPE:
-      g_value_set_enum (value, multihandlesink->unit_type);
+    case PROP_UNIT_FORMAT:
+      g_value_set_enum (value, multihandlesink->unit_format);
       break;
     case PROP_UNITS_MAX:
       g_value_set_int64 (value, multihandlesink->units_max);
@@ -2653,12 +2645,14 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
     case PROP_BURST_VALUE:
       g_value_set_uint64 (value, multihandlesink->def_burst_value);
       break;
+#endif
     case PROP_QOS_DSCP:
       g_value_set_int (value, multihandlesink->qos_dscp);
       break;
     case PROP_RESEND_STREAMHEADER:
       g_value_set_boolean (value, multihandlesink->resend_streamheader);
       break;
+#if 0
     case PROP_NUM_SOCKETS:
       g_value_set_uint (value,
           g_hash_table_size (multihandlesink->socket_hash));
index 87aafc5..cbd86c7 100644 (file)
@@ -177,6 +177,8 @@ find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction);
 gboolean is_sync_frame (GstMultiHandleSink * sink, GstBuffer * buffer);
 gboolean gst_multi_handle_sink_stop (GstBaseSink * bsink);
 gboolean gst_multi_handle_sink_start (GstBaseSink * bsink);
+void gst_multi_handle_sink_setup_dscp (GstMultiHandleSink * mhsink);
+gint gst_multi_handle_sink_setup_dscp_client (GstMultiHandleSink * sink, GstMultiHandleClient * client);
 
 /**
  * GstMultiHandleSink:
@@ -200,7 +202,6 @@ struct _GstMultiHandleSink {
   GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
   gboolean previous_buffer_in_caps;
 
-  guint mtu;
   gint qos_dscp;
 
   GArray *bufqueue;     /* global queue of buffers */
@@ -210,7 +211,7 @@ struct _GstMultiHandleSink {
 
   /* these values are used to check if a client is reading fast
    * enough and to control receovery */
-  GstFormat unit_type;/* the format of the units */
+  GstFormat unit_format;/* the format of the units */
   gint64 units_max;       /* max units to queue for a client */
   gint64 units_soft_max;  /* max units a client can lag before recovery starts */
   GstRecoverPolicy recover_policy;
@@ -253,6 +254,15 @@ struct _GstMultiHandleSinkClass {
   void          (*stop_post)    (GstMultiHandleSink *sink);
   gboolean      (*start_pre)    (GstMultiHandleSink *sink);
   gpointer      (*thread)       (GstMultiHandleSink *sink);
+  void          (*queue_buffer) (GstMultiHandleSink *sink,
+                                 GstBuffer *buffer);
+  gboolean      (*client_queue_buffer)
+                                (GstMultiHandleSink *sink,
+                                 GstMultiHandleClient *client,
+                                 GstBuffer *buffer);
+  int           (*client_get_fd)
+                                (GstMultiHandleClient *client);
+
 
   GstStructure* (*get_stats)    (GstMultiHandleSink *sink, GSocket *socket);
   void          (*remove_client_link) (GstMultiHandleSink * sink, GList * link);
index 0414aea..f7b1840 100644 (file)
 
 #define NOT_IMPLEMENTED 0
 
-static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
-    GST_PAD_SINK,
-    GST_PAD_ALWAYS,
-    GST_STATIC_CAPS_ANY);
-
 GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug);
 #define GST_CAT_DEFAULT (multisocketsink_debug)
 
@@ -143,35 +138,20 @@ enum
 
 
 /* this is really arbitrarily chosen */
-#define DEFAULT_MODE                    1
-#define DEFAULT_BUFFERS_MAX             -1
-#define DEFAULT_BUFFERS_SOFT_MAX        -1
-#define DEFAULT_UNIT_TYPE               GST_FORMAT_BUFFERS
-#define DEFAULT_UNITS_MAX               -1
-#define DEFAULT_UNITS_SOFT_MAX          -1
+#define DEFAULT_UNIT_FORMAT               GST_FORMAT_BUFFERS
 
 #define DEFAULT_BURST_FORMAT            GST_FORMAT_UNDEFINED
 #define DEFAULT_BURST_VALUE             0
 
-#define DEFAULT_QOS_DSCP                -1
-
 enum
 {
   PROP_0,
-  PROP_MODE,
 
-  PROP_UNIT_TYPE,
-  PROP_UNITS_MAX,
-  PROP_UNITS_SOFT_MAX,
-
-  PROP_BUFFERS_MAX,
-  PROP_BUFFERS_SOFT_MAX,
+  PROP_UNIT_FORMAT,
 
   PROP_BURST_FORMAT,
   PROP_BURST_VALUE,
 
-  PROP_QOS_DSCP,
-
   PROP_NUM_SOCKETS,
 
   PROP_LAST
@@ -183,14 +163,18 @@ static void gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink);
 static void gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink);
 static gboolean gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink);
 static gpointer gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink);
+static void gst_multi_socket_sink_queue_buffer (GstMultiHandleSink * mhsink,
+    GstBuffer * buffer);
+static gboolean gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink *
+    mhsink, GstMultiHandleClient * mhclient, GstBuffer * buffer);
+static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client);
+
 
 static void gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
     GList * link);
 static gboolean gst_multi_socket_sink_socket_condition (GSocket * socket,
     GIOCondition condition, GstMultiSocketSink * sink);
 
-static GstFlowReturn gst_multi_socket_sink_render (GstBaseSink * bsink,
-    GstBuffer * buf);
 static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink);
 static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink);
 
@@ -222,30 +206,10 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
   gobject_class->get_property = gst_multi_socket_sink_get_property;
   gobject_class->finalize = gst_multi_socket_sink_finalize;
 
-  g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
-      g_param_spec_int ("buffers-max", "Buffers max",
-          "max number of buffers to queue for a client (-1 = no limit)", -1,
-          G_MAXINT, DEFAULT_BUFFERS_MAX,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX,
-      g_param_spec_int ("buffers-soft-max", "Buffers soft max",
-          "Recover client when going over this limit (-1 = no limit)", -1,
-          G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
-  g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
-      g_param_spec_enum ("unit-type", "Units type",
+  g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT,
+      g_param_spec_enum ("unit-format", "Units format",
           "The unit to measure the max/soft-max/queued properties",
-          GST_TYPE_FORMAT, DEFAULT_UNIT_TYPE,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
-      g_param_spec_int64 ("units-max", "Units max",
-          "max number of units to queue (-1 = no limit)", -1, G_MAXINT64,
-          DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX,
-      g_param_spec_int64 ("units-soft-max", "Units soft max",
-          "Recover client when going over this limit (-1 = no limit)", -1,
-          G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
+          GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
@@ -258,12 +222,6 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
           "The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
           DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
-  g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
-      g_param_spec_int ("qos-dscp", "QoS diff srv code point",
-          "Quality of Service, differentiated services code point (-1 default)",
-          -1, 63, DEFAULT_QOS_DSCP,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
   g_object_class_install_property (gobject_class, PROP_NUM_SOCKETS,
       g_param_spec_uint ("num-sockets", "Number of sockets",
           "The current number of client sockets",
@@ -403,9 +361,6 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
           client_socket_removed), NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
       G_TYPE_NONE, 1, G_TYPE_SOCKET);
 
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&sinktemplate));
-
   gst_element_class_set_details_simple (gstelement_class,
       "Multi socket sink", "Sink/Network",
       "Send data to multiple sockets",
@@ -413,8 +368,6 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
       "Wim Taymans <wim@fluendo.com>, "
       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
 
-  gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_render);
-
   gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock);
   gstbasesink_class->unlock_stop =
       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop);
@@ -427,6 +380,12 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_start_pre);
   gstmultihandlesink_class->thread =
       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_thread);
+  gstmultihandlesink_class->queue_buffer =
+      GST_DEBUG_FUNCPTR (gst_multi_socket_sink_queue_buffer);
+  gstmultihandlesink_class->client_queue_buffer =
+      GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_queue_buffer);
+  gstmultihandlesink_class->client_get_fd =
+      GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_get_fd);
 
   gstmultihandlesink_class->remove_client_link =
       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_client_link);
@@ -446,16 +405,11 @@ gst_multi_socket_sink_init (GstMultiSocketSink * this)
 {
   this->socket_hash = g_hash_table_new (g_direct_hash, g_int_equal);
 
-  this->unit_type = DEFAULT_UNIT_TYPE;
-  this->units_max = DEFAULT_UNITS_MAX;
-  this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
+  this->unit_format = DEFAULT_UNIT_FORMAT;
 
   this->def_burst_format = DEFAULT_BURST_FORMAT;
   this->def_burst_value = DEFAULT_BURST_VALUE;
 
-  this->qos_dscp = DEFAULT_QOS_DSCP;
-
-  this->header_flags = 0;
   this->cancellable = g_cancellable_new ();
 }
 
@@ -473,86 +427,12 @@ gst_multi_socket_sink_finalize (GObject * object)
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
-static gint
-setup_dscp_client (GstMultiSocketSink * sink, GstSocketClient * client)
+static int
+gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client)
 {
-#ifndef IP_TOS
-  return 0;
-#else
-  gint tos;
-  gint ret;
-  int fd;
-  union gst_sockaddr
-  {
-    struct sockaddr sa;
-    struct sockaddr_in6 sa_in6;
-    struct sockaddr_storage sa_stor;
-  } sa;
-  socklen_t slen = sizeof (sa);
-  gint af;
-
-  /* don't touch */
-  if (sink->qos_dscp < 0)
-    return 0;
-
-  fd = g_socket_get_fd (client->socket);
-
-  if ((ret = getsockname (fd, &sa.sa, &slen)) < 0) {
-    GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
-    return ret;
-  }
+  GstSocketClient *msclient = (GstSocketClient *) client;
 
-  af = sa.sa.sa_family;
-
-  /* if this is an IPv4-mapped address then do IPv4 QoS */
-  if (af == AF_INET6) {
-
-    GST_DEBUG_OBJECT (sink, "check IP6 socket");
-    if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
-      GST_DEBUG_OBJECT (sink, "mapped to IPV4");
-      af = AF_INET;
-    }
-  }
-
-  /* extract and shift 6 bits of the DSCP */
-  tos = (sink->qos_dscp & 0x3f) << 2;
-
-  switch (af) {
-    case AF_INET:
-      ret = setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos));
-      break;
-    case AF_INET6:
-#ifdef IPV6_TCLASS
-      ret = setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos));
-      break;
-#endif
-    default:
-      ret = 0;
-      GST_ERROR_OBJECT (sink, "unsupported AF");
-      break;
-  }
-  if (ret)
-    GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
-
-  return ret;
-#endif
-}
-
-static void
-setup_dscp (GstMultiSocketSink * sink)
-{
-  GList *clients;
-  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-
-  CLIENTS_LOCK (sink);
-  for (clients = mhsink->clients; clients; clients = clients->next) {
-    GstSocketClient *client;
-
-    client = clients->data;
-
-    setup_dscp_client (sink, client);
-  }
-  CLIENTS_UNLOCK (sink);
+  return g_socket_get_fd (msclient->socket);
 }
 
 /* "add-full" signal implementation */
@@ -615,7 +495,7 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
     g_source_attach (client->source, sink->main_context);
   }
 
-  setup_dscp_client (sink, client);
+  gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient);
 
   CLIENTS_UNLOCK (sink);
 
@@ -958,11 +838,11 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
 
 /* queue the given buffer for the given client */
 static gboolean
-gst_multi_socket_sink_client_queue_buffer (GstMultiSocketSink * sink,
-    GstSocketClient * client, GstBuffer * buffer)
+gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
+    GstMultiHandleClient * mhclient, GstBuffer * buffer)
 {
-  GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
-  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
+  GstSocketClient *client = (GstSocketClient *) mhclient;
+  GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
   GstCaps *caps;
 
   /* TRUE: send them if the new caps have them */
@@ -1086,7 +966,7 @@ get_buffers_max (GstMultiSocketSink * sink, gint64 max)
 {
   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
 
-  switch (sink->unit_type) {
+  switch (sink->unit_format) {
     case GST_FORMAT_BUFFERS:
       return max;
     case GST_FORMAT_TIME:
@@ -1540,10 +1420,11 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
   GstClockTime now;
   GTimeVal nowtv;
   GError *err = NULL;
-  GstMultiHandleSink *mhsink;
+  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
+  GstMultiHandleSinkClass *mhsinkclass =
+      GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
 
-  mhsink = GST_MULTI_HANDLE_SINK (sink);
 
   g_get_current_time (&nowtv);
   now = GST_TIMEVAL_TO_TIME (nowtv);
@@ -1617,7 +1498,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
             socket, client, mhclient->bufpos);
 
         /* queueing a buffer will ref it */
-        gst_multi_socket_sink_client_queue_buffer (sink, client, buf);
+        mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
 
         /* need to start from the first byte for this new buffer */
         mhclient->bufoffset = 0;
@@ -1730,13 +1611,13 @@ gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink,
       break;
     case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
       /* move to beginning of soft max */
-      newbufpos = get_buffers_max (sink, sink->units_soft_max);
+      newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
       break;
     case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
       /* find keyframe in buffers, we search backwards to find the
        * closest keyframe relative to what this client already received. */
       newbufpos = MIN (mhsink->bufqueue->len - 1,
-          get_buffers_max (sink, sink->units_soft_max) - 1);
+          get_buffers_max (sink, mhsink->units_soft_max) - 1);
 
       while (newbufpos >= 0) {
         GstBuffer *buf;
@@ -1751,7 +1632,7 @@ gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink,
       break;
     default:
       /* unknown recovery procedure */
-      newbufpos = get_buffers_max (sink, sink->units_soft_max);
+      newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
       break;
   }
   return newbufpos;
@@ -1776,7 +1657,8 @@ gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink,
  * the select thread that the fd_set changed.
  */
 static void
-gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf)
+gst_multi_socket_sink_queue_buffer (GstMultiHandleSink * mhsink,
+    GstBuffer * buf)
 {
   GList *clients, *next;
   gint queuelen;
@@ -1786,7 +1668,7 @@ gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf)
   GstClockTime now;
   gint max_buffers, soft_max_buffers;
   guint cookie;
-  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
+  GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
   GstMultiHandleSinkClass *mhsinkclass =
       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
 
@@ -1799,13 +1681,13 @@ gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf)
   g_array_prepend_val (mhsink->bufqueue, buf);
   queuelen = mhsink->bufqueue->len;
 
-  if (sink->units_max > 0)
-    max_buffers = get_buffers_max (sink, sink->units_max);
+  if (mhsink->units_max > 0)
+    max_buffers = get_buffers_max (sink, mhsink->units_max);
   else
     max_buffers = -1;
 
-  if (sink->units_soft_max > 0)
-    soft_max_buffers = get_buffers_max (sink, sink->units_soft_max);
+  if (mhsink->units_soft_max > 0)
+    soft_max_buffers = get_buffers_max (sink, mhsink->units_soft_max);
   else
     soft_max_buffers = -1;
   GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers,
@@ -2082,116 +1964,6 @@ gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink)
   return NULL;
 }
 
-static GstFlowReturn
-gst_multi_socket_sink_render (GstBaseSink * bsink, GstBuffer * buf)
-{
-  GstMultiSocketSink *sink;
-  gboolean in_caps;
-#if 0
-  GstCaps *bufcaps, *padcaps;
-#endif
-  GstMultiHandleSink *mhsink;
-
-  sink = GST_MULTI_SOCKET_SINK (bsink);
-  mhsink = GST_MULTI_HANDLE_SINK (sink);
-
-  g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink,
-          GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING);
-
-#if 0
-  /* since we check every buffer for streamheader caps, we need to make
-   * sure every buffer has caps set */
-  bufcaps = gst_buffer_get_caps (buf);
-  padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink));
-
-  /* make sure we have caps on the pad */
-  if (!padcaps && !bufcaps)
-    goto no_caps;
-#endif
-
-  /* get HEADER first, code below might mess with the flags */
-  in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
-
-#if 0
-  /* stamp the buffer with previous caps if no caps set */
-  if (!bufcaps) {
-    if (!gst_buffer_is_writable (buf)) {
-      /* metadata is not writable, copy will be made and original buffer
-       * will be unreffed so we need to ref so that we don't lose the
-       * buffer in the render method. */
-      gst_buffer_ref (buf);
-      /* the new buffer is ours only, we keep it out of the scope of this
-       * function */
-      buf = gst_buffer_make_writable (buf);
-    } else {
-      /* else the metadata is writable, we ref because we keep the buffer
-       * out of the scope of this method */
-      gst_buffer_ref (buf);
-    }
-    /* buffer metadata is writable now, set the caps */
-    gst_buffer_set_caps (buf, padcaps);
-  } else {
-    gst_caps_unref (bufcaps);
-
-    /* since we keep this buffer out of the scope of this method */
-    gst_buffer_ref (buf);
-  }
-#endif
-  gst_buffer_ref (buf);
-
-  GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %"
-      G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
-      ", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT,
-      buf, in_caps ? "yes" : "no", GST_BUFFER_OFFSET (buf),
-      GST_BUFFER_OFFSET_END (buf),
-      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
-      GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
-
-  /* if we get HEADER buffers, but the previous buffer was not HEADER,
-   * it means we're getting new streamheader buffers, and we should clear
-   * the old ones */
-  if (in_caps && sink->previous_buffer_in_caps == FALSE) {
-    GST_DEBUG_OBJECT (sink,
-        "receiving new HEADER buffers, clearing old streamheader");
-    g_slist_foreach (mhsink->streamheader, (GFunc) gst_mini_object_unref, NULL);
-    g_slist_free (mhsink->streamheader);
-    mhsink->streamheader = NULL;
-  }
-
-  /* save the current in_caps */
-  sink->previous_buffer_in_caps = in_caps;
-
-  /* if the incoming buffer is marked as IN CAPS, then we assume for now
-   * it's a streamheader that needs to be sent to each new client, so we
-   * put it on our internal list of streamheader buffers.
-   * FIXME: we could check if the buffer's contents are in fact part of the
-   * current streamheader.
-   *
-   * We don't send the buffer to the client, since streamheaders are sent
-   * separately when necessary. */
-  if (in_caps) {
-    GST_DEBUG_OBJECT (sink, "appending HEADER buffer with length %"
-        G_GSIZE_FORMAT " to streamheader", gst_buffer_get_size (buf));
-    mhsink->streamheader = g_slist_append (mhsink->streamheader, buf);
-  } else {
-    /* queue the buffer, this is a regular data buffer. */
-    gst_multi_socket_sink_queue_buffer (sink, buf);
-
-    mhsink->bytes_to_serve += gst_buffer_get_size (buf);
-  }
-  return GST_FLOW_OK;
-
-  /* ERRORS */
-#if 0
-no_caps:
-  {
-    GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL),
-        ("Received first buffer without caps set"));
-    return GST_FLOW_NOT_NEGOTIATED;
-  }
-#endif
-}
-
 static void
 gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec)
@@ -2201,20 +1973,8 @@ gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
   multisocketsink = GST_MULTI_SOCKET_SINK (object);
 
   switch (prop_id) {
-    case PROP_BUFFERS_MAX:
-      multisocketsink->units_max = g_value_get_int (value);
-      break;
-    case PROP_BUFFERS_SOFT_MAX:
-      multisocketsink->units_soft_max = g_value_get_int (value);
-      break;
-    case PROP_UNIT_TYPE:
-      multisocketsink->unit_type = g_value_get_enum (value);
-      break;
-    case PROP_UNITS_MAX:
-      multisocketsink->units_max = g_value_get_int64 (value);
-      break;
-    case PROP_UNITS_SOFT_MAX:
-      multisocketsink->units_soft_max = g_value_get_int64 (value);
+    case PROP_UNIT_FORMAT:
+      multisocketsink->unit_format = g_value_get_enum (value);
       break;
     case PROP_BURST_FORMAT:
       multisocketsink->def_burst_format = g_value_get_enum (value);
@@ -2222,10 +1982,6 @@ gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
     case PROP_BURST_VALUE:
       multisocketsink->def_burst_value = g_value_get_uint64 (value);
       break;
-    case PROP_QOS_DSCP:
-      multisocketsink->qos_dscp = g_value_get_int (value);
-      setup_dscp (multisocketsink);
-      break;
 
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -2242,20 +1998,8 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
   multisocketsink = GST_MULTI_SOCKET_SINK (object);
 
   switch (prop_id) {
-    case PROP_BUFFERS_MAX:
-      g_value_set_int (value, multisocketsink->units_max);
-      break;
-    case PROP_BUFFERS_SOFT_MAX:
-      g_value_set_int (value, multisocketsink->units_soft_max);
-      break;
-    case PROP_UNIT_TYPE:
-      g_value_set_enum (value, multisocketsink->unit_type);
-      break;
-    case PROP_UNITS_MAX:
-      g_value_set_int64 (value, multisocketsink->units_max);
-      break;
-    case PROP_UNITS_SOFT_MAX:
-      g_value_set_int64 (value, multisocketsink->units_soft_max);
+    case PROP_UNIT_FORMAT:
+      g_value_set_enum (value, multisocketsink->unit_format);
       break;
     case PROP_BURST_FORMAT:
       g_value_set_enum (value, multisocketsink->def_burst_format);
@@ -2263,9 +2007,6 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
     case PROP_BURST_VALUE:
       g_value_set_uint64 (value, multisocketsink->def_burst_value);
       break;
-    case PROP_QOS_DSCP:
-      g_value_set_int (value, multisocketsink->qos_dscp);
-      break;
     case PROP_NUM_SOCKETS:
       g_value_set_uint (value,
           g_hash_table_size (multisocketsink->socket_hash));
index fa8030a..d3e40d8 100644 (file)
@@ -83,13 +83,10 @@ struct _GstMultiSocketSink {
   gboolean previous_buffer_in_caps;
 
   guint mtu;
-  gint qos_dscp;
 
   /* these values are used to check if a client is reading fast
    * enough and to control receovery */
-  GstFormat unit_type;/* the format of the units */
-  gint64 units_max;       /* max units to queue for a client */
-  gint64 units_soft_max;  /* max units a client can lag before recovery starts */
+  GstFormat unit_format;/* the format of the units */
 
   GstFormat     def_burst_format;
   guint64       def_burst_value;
@@ -103,7 +100,7 @@ struct _GstMultiSocketSinkClass {
   /* element methods */
   void          (*add)          (GstMultiSocketSink *sink, GSocket *socket);
   void          (*add_full)     (GstMultiSocketSink *sink, GSocket *socket, GstSyncMethod sync,
-                                GstFormat format, guint64 value, 
+                                GstFormat format, guint64 value,
                                 GstFormat max_format, guint64 max_value);
   void          (*remove)       (GstMultiSocketSink *sink, GSocket *socket);
   void          (*remove_flush) (GstMultiSocketSink *sink, GSocket *socket);
index ace3c49..37cc5b1 100644 (file)
@@ -26,6 +26,9 @@
 
 #include <gst/check/gstcheck.h>
 
+/* FIXME: remove this header once formats are refactored */
+#include "gst/tcp/gstmultifdsink.h"
+
 static GstPad *mysrcpad;
 
 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
@@ -457,7 +460,6 @@ GST_START_TEST (test_burst_client_bytes)
   int pfd1[2];
   int pfd2[2];
   int pfd3[2];
-  gchar data[16];
   gint i;
   guint buffers_queued;
 
@@ -465,7 +467,7 @@ GST_START_TEST (test_burst_client_bytes)
   /* make sure we keep at least 100 bytes at all times */
   g_object_set (sink, "bytes-min", 100, NULL);
   g_object_set (sink, "sync-method", 3, NULL);  /* 3 = burst */
-  g_object_set (sink, "burst-unit", 3, NULL);   /* 3 = bytes */
+  g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL);
   g_object_set (sink, "burst-value", (guint64) 80, NULL);
 
   fail_if (pipe (pfd1) == -1);
@@ -505,38 +507,26 @@ GST_START_TEST (test_burst_client_bytes)
 
   /* now we should only read the last 5 buffers (5 * 16 = 80 bytes) */
   GST_DEBUG ("Reading from client 1");
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000005");
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000006");
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000007");
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000008");
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000009");
 
   /* second client only bursts 50 bytes = 4 buffers (we get 4 buffers since
    * the max alows it) */
   GST_DEBUG ("Reading from client 2");
-  fail_if (read (pfd2[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
-  fail_if (read (pfd2[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
-  fail_if (read (pfd2[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
-  fail_if (read (pfd2[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+  fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000006");
+  fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000007");
+  fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000008");
+  fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000009");
 
   /* third client only bursts 50 bytes = 4 buffers, we can't send
    * more than 50 bytes so we only get 3 buffers (48 bytes). */
   GST_DEBUG ("Reading from client 3");
-  fail_if (read (pfd3[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
-  fail_if (read (pfd3[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
-  fail_if (read (pfd3[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+  fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000007");
+  fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000008");
+  fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000009");
 
   GST_DEBUG ("cleaning up multifdsink");
   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
@@ -556,15 +546,14 @@ GST_START_TEST (test_burst_client_bytes_keyframe)
   int pfd1[2];
   int pfd2[2];
   int pfd3[2];
-  gchar data[16];
   gint i;
   guint buffers_queued;
 
   sink = setup_multifdsink ();
   /* make sure we keep at least 100 bytes at all times */
   g_object_set (sink, "bytes-min", 100, NULL);
-  g_object_set (sink, "sync-method", 4, NULL);  /* 3 = burst_keyframe */
-  g_object_set (sink, "burst-unit", 3, NULL);   /* 3 = bytes */
+  g_object_set (sink, "sync-method", 4, NULL);  /* 4 = burst_keyframe */
+  g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL);
   g_object_set (sink, "burst-value", (guint64) 80, NULL);
 
   fail_if (pipe (pfd1) == -1);
@@ -574,8 +563,8 @@ GST_START_TEST (test_burst_client_bytes_keyframe)
   ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
 
   caps = gst_caps_from_string ("application/x-gst-check");
-  GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
   gst_pad_set_caps (mysrcpad, caps);
+  GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
 
   /* push buffers in, 9 * 16 bytes = 144 bytes */
   for (i = 0; i < 9; i++) {
@@ -611,34 +600,24 @@ GST_START_TEST (test_burst_client_bytes_keyframe)
   /* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes),
    * keyframe at buffer 4 */
   GST_DEBUG ("Reading from client 1");
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000004", 16) == 0);
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000004");
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000005");
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000006");
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000007");
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000008");
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000009");
 
   /* second client only bursts 50 bytes = 4 buffers, there is
    * no keyframe above min and below max, so get one below min */
   GST_DEBUG ("Reading from client 2");
-  fail_if (read (pfd2[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
-  fail_if (read (pfd2[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+  fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000008");
+  fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000009");
 
   /* third client only bursts 50 bytes = 4 buffers, we can't send
    * more than 50 bytes so we only get 2 buffers (32 bytes). */
   GST_DEBUG ("Reading from client 3");
-  fail_if (read (pfd3[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
-  fail_if (read (pfd3[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+  fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000008");
+  fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000009");
 
   GST_DEBUG ("cleaning up multifdsink");
   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
@@ -658,15 +637,14 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe)
   int pfd1[2];
   int pfd2[2];
   int pfd3[2];
-  gchar data[16];
   gint i;
   guint buffers_queued;
 
   sink = setup_multifdsink ();
   /* make sure we keep at least 100 bytes at all times */
   g_object_set (sink, "bytes-min", 100, NULL);
-  g_object_set (sink, "sync-method", 5, NULL);  /* 3 = burst_with_keyframe */
-  g_object_set (sink, "burst-unit", 3, NULL);   /* 3 = bytes */
+  g_object_set (sink, "sync-method", 5, NULL);  /* 5 = burst_with_keyframe */
+  g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL);
   g_object_set (sink, "burst-value", (guint64) 80, NULL);
 
   fail_if (pipe (pfd1) == -1);
@@ -713,40 +691,27 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe)
   /* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes),
    * keyframe at buffer 4 */
   GST_DEBUG ("Reading from client 1");
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000004", 16) == 0);
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000004");
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000005");
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000006");
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000007");
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000008");
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000009");
 
   /* second client only bursts 50 bytes = 4 buffers, there is
    * no keyframe above min and below max, so send min */
   GST_DEBUG ("Reading from client 2");
-  fail_if (read (pfd2[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
-  fail_if (read (pfd2[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
-  fail_if (read (pfd2[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
-  fail_if (read (pfd2[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+  fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000006");
+  fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000007");
+  fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000008");
+  fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000009");
 
   /* third client only bursts 50 bytes = 4 buffers, we can't send
    * more than 50 bytes so we only get 3 buffers (48 bytes). */
   GST_DEBUG ("Reading from client 3");
-  fail_if (read (pfd3[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
-  fail_if (read (pfd3[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
-  fail_if (read (pfd3[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
+  fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000007");
+  fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000008");
+  fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000009");
 
   GST_DEBUG ("cleaning up multifdsink");
   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
@@ -765,7 +730,6 @@ GST_START_TEST (test_client_next_keyframe)
   GstElement *sink;
   GstCaps *caps;
   int pfd1[2];
-  gchar data[16];
   gint i;
 
   sink = setup_multifdsink ();
@@ -793,10 +757,8 @@ GST_START_TEST (test_client_next_keyframe)
 
   /* now we should be able to read some data */
   GST_DEBUG ("Reading from client 1");
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000000", 16) == 0);
-  fail_if (read (pfd1[0], data, 16) < 16);
-  fail_unless (strncmp (data, "deadbee00000001", 16) == 0);
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000000");
+  fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000001");
 
   GST_DEBUG ("cleaning up multifdsink");
   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
index 293d848..cbd9789 100644 (file)
@@ -596,7 +596,7 @@ GST_START_TEST (test_burst_client_bytes_keyframe)
   sink = setup_multisocketsink ();
   /* make sure we keep at least 100 bytes at all times */
   g_object_set (sink, "bytes-min", 100, NULL);
-  g_object_set (sink, "sync-method", 4, NULL);  /* 3 = burst_keyframe */
+  g_object_set (sink, "sync-method", 4, NULL);  /* 4 = burst_keyframe */
   g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL);
   g_object_set (sink, "burst-value", (guint64) 80, NULL);
 
@@ -700,7 +700,7 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe)
 
   /* make sure we keep at least 100 bytes at all times */
   g_object_set (sink, "bytes-min", 100, NULL);
-  g_object_set (sink, "sync-method", 5, NULL);  /* 3 = burst_with_keyframe */
+  g_object_set (sink, "sync-method", 5, NULL);  /* 5 = burst_with_keyframe */
   g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL);
   g_object_set (sink, "burst-value", (guint64) 80, NULL);