From: Thomas Vander Stichele Date: Fri, 27 Jan 2012 14:46:31 +0000 (+0100) Subject: multihandlesink: further refactoring X-Git-Tag: 1.19.3~511^2~6803 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=020739664a04a15e29436925610f65c1b40b20da;p=platform%2Fupstream%2Fgstreamer.git multihandlesink: further refactoring --- diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index b78a99b..c510e82 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -125,11 +125,6 @@ #define NOT_IMPLEMENTED 0 -static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", - GST_PAD_SINK, - GST_PAD_ALWAYS, - GST_STATIC_CAPS_ANY); - GST_DEBUG_CATEGORY_STATIC (multifdsink_debug); #define GST_CAT_DEFAULT (multifdsink_debug) @@ -154,16 +149,11 @@ enum /* this is really arbitrarily chosen */ #define DEFAULT_MODE 1 -#define DEFAULT_BUFFERS_MAX -1 -#define DEFAULT_BUFFERS_SOFT_MAX -1 -#define DEFAULT_UNIT_TYPE GST_TCP_UNIT_TYPE_BUFFERS -#define DEFAULT_UNITS_MAX -1 -#define DEFAULT_UNITS_SOFT_MAX -1 +#define DEFAULT_UNIT_FORMAT GST_FORMAT_BUFFERS -#define DEFAULT_BURST_UNIT GST_TCP_UNIT_TYPE_UNDEFINED +#define DEFAULT_BURST_FORMAT GST_FORMAT_UNDEFINED #define DEFAULT_BURST_VALUE 0 -#define DEFAULT_QOS_DSCP -1 #define DEFAULT_HANDLE_READ TRUE enum @@ -171,18 +161,11 @@ enum PROP_0, PROP_MODE, - PROP_UNIT_TYPE, - PROP_UNITS_MAX, - PROP_UNITS_SOFT_MAX, - - PROP_BUFFERS_MAX, - PROP_BUFFERS_SOFT_MAX, + PROP_UNIT_FORMAT, - PROP_BURST_UNIT, + PROP_BURST_FORMAT, PROP_BURST_VALUE, - PROP_QOS_DSCP, - PROP_HANDLE_READ, PROP_NUM_FDS, @@ -210,23 +193,23 @@ gst_fdset_mode_get_type (void) return fdset_mode_type; } -#define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type()) +#define GST_TYPE_UNIT_FORMAT (gst_unit_format_get_type()) static GType -gst_unit_type_get_type (void) +gst_unit_format_get_type (void) { - static GType unit_type_type = 0; - static const GEnumValue unit_type[] = { - {GST_TCP_UNIT_TYPE_UNDEFINED, "Undefined", "undefined"}, - {GST_TCP_UNIT_TYPE_BUFFERS, "Buffers", "buffers"}, - {GST_TCP_UNIT_TYPE_BYTES, "Bytes", "bytes"}, - {GST_TCP_UNIT_TYPE_TIME, "Time", "time"}, + static GType unit_format_type = 0; + static const GEnumValue unit_format[] = { + {GST_TCP_UNIT_FORMAT_UNDEFINED, "Undefined", "undefined"}, + {GST_TCP_UNIT_FORMAT_BYTES, "Bytes", "bytes"}, + {GST_TCP_UNIT_FORMAT_TIME, "Time", "time"}, + {GST_TCP_UNIT_FORMAT_BUFFERS, "Buffers", "buffers"}, {0, NULL, NULL}, }; - if (!unit_type_type) { - unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type); + if (!unit_format_type) { + unit_format_type = g_enum_register_static ("GstTCPUnitType", unit_format); } - return unit_type_type; + return unit_format_type; } static void gst_multi_fd_sink_finalize (GObject * object); @@ -236,13 +219,15 @@ static void gst_multi_fd_sink_stop_pre (GstMultiHandleSink * mhsink); static void gst_multi_fd_sink_stop_post (GstMultiHandleSink * mhsink); static gboolean gst_multi_fd_sink_start_pre (GstMultiHandleSink * mhsink); static gpointer gst_multi_fd_sink_thread (GstMultiHandleSink * mhsink); +static void gst_multi_fd_sink_queue_buffer (GstMultiHandleSink * mhsink, + GstBuffer * buffer); +static gboolean gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * + mhsink, GstMultiHandleClient * mhclient, GstBuffer * buffer); +static int gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client); static void gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link); -static GstFlowReturn gst_multi_fd_sink_render (GstBaseSink * bsink, - GstBuffer * buf); - static void gst_multi_fd_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); static void gst_multi_fd_sink_get_property (GObject * object, guint prop_id, @@ -258,12 +243,10 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) { GObjectClass *gobject_class; GstElementClass *gstelement_class; - GstBaseSinkClass *gstbasesink_class; GstMultiHandleSinkClass *gstmultihandlesink_class; gobject_class = (GObjectClass *) klass; gstelement_class = (GstElementClass *) klass; - gstbasesink_class = (GstBaseSinkClass *) klass; gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass; gobject_class->set_property = gst_multi_fd_sink_set_property; @@ -284,47 +267,22 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) GST_TYPE_FDSET_MODE, DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX, - g_param_spec_int ("buffers-max", "Buffers max", - "max number of buffers to queue for a client (-1 = no limit)", -1, - G_MAXINT, DEFAULT_BUFFERS_MAX, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX, - g_param_spec_int ("buffers-soft-max", "Buffers soft max", - "Recover client when going over this limit (-1 = no limit)", -1, - G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_UNIT_TYPE, - g_param_spec_enum ("unit-type", "Units type", + g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT, + g_param_spec_enum ("unit-format", "Units format", "The unit to measure the max/soft-max/queued properties", - GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_UNITS_MAX, - g_param_spec_int64 ("units-max", "Units max", - "max number of units to queue (-1 = no limit)", -1, G_MAXINT64, - DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX, - g_param_spec_int64 ("units-soft-max", "Units soft max", - "Recover client when going over this limit (-1 = no limit)", -1, - G_MAXINT64, DEFAULT_UNITS_SOFT_MAX, + GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BURST_UNIT, - g_param_spec_enum ("burst-unit", "Burst unit", + g_object_class_install_property (gobject_class, PROP_BURST_FORMAT, + g_param_spec_enum ("burst-format", "Burst format", "The format of the burst units (when sync-method is burst[[-with]-keyframe])", - GST_TYPE_UNIT_TYPE, DEFAULT_BURST_UNIT, + GST_TYPE_FORMAT, DEFAULT_BURST_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_BURST_VALUE, g_param_spec_uint64 ("burst-value", "Burst value", "The amount of burst expressed in burst-unit", 0, G_MAXUINT64, DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_QOS_DSCP, - g_param_spec_int ("qos-dscp", "QoS diff srv code point", - "Quality of Service, differentiated services code point (-1 default)", - -1, 63, DEFAULT_QOS_DSCP, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstMultiFdSink::handle-read * @@ -359,12 +317,12 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) * @gstmultifdsink: the multifdsink element to emit this signal on * @fd: the file descriptor to add to multifdsink * @sync: the sync method to use - * @unit_type_min: the unit-type of @value_min + * @unit_format_min: the unit-format of @value_min * @value_min: the minimum amount of data to burst expressed in - * @unit_type_min units. - * @unit_type_max: the unit-type of @value_max + * @unit_format_min units. + * @unit_format_max: the unit-format of @value_max * @value_max: the maximum amount of data to burst expressed in - * @unit_type_max units. + * @unit_format_max units. * * Hand the given open file descriptor to multifdsink to write to and * specify the burst parameters for the new connection. @@ -374,8 +332,8 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass, add_full), NULL, NULL, gst_tcp_marshal_VOID__INT_ENUM_INT_UINT64_INT_UINT64, - G_TYPE_NONE, 6, G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_TYPE, - G_TYPE_UINT64, GST_TYPE_UNIT_TYPE, G_TYPE_UINT64); + G_TYPE_NONE, 6, G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_FORMAT, + G_TYPE_UINT64, GST_TYPE_UNIT_FORMAT, G_TYPE_UINT64); /** * GstMultiFdSink::remove: * @gstmultifdsink: the multifdsink element to emit this signal on @@ -477,17 +435,12 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) client_fd_removed), NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&sinktemplate)); - gst_element_class_set_details_simple (gstelement_class, "Multi filedescriptor sink", "Sink/Network", "Send data to multiple filedescriptors", "Thomas Vander Stichele , " "Wim Taymans "); - gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_render); - gstmultihandlesink_class->clear_post = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear_post); @@ -499,6 +452,12 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) GST_DEBUG_FUNCPTR (gst_multi_fd_sink_start_pre); gstmultihandlesink_class->thread = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_thread); + gstmultihandlesink_class->queue_buffer = + GST_DEBUG_FUNCPTR (gst_multi_fd_sink_queue_buffer); + gstmultihandlesink_class->client_queue_buffer = + GST_DEBUG_FUNCPTR (gst_multi_fd_sink_client_queue_buffer); + gstmultihandlesink_class->client_get_fd = + GST_DEBUG_FUNCPTR (gst_multi_fd_sink_client_get_fd); gstmultihandlesink_class->remove_client_link = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_client_link); @@ -520,17 +479,12 @@ gst_multi_fd_sink_init (GstMultiFdSink * this) this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal); - this->unit_type = DEFAULT_UNIT_TYPE; - this->units_max = DEFAULT_UNITS_MAX; - this->units_soft_max = DEFAULT_UNITS_SOFT_MAX; + this->unit_format = DEFAULT_UNIT_FORMAT; - this->def_burst_unit = DEFAULT_BURST_UNIT; + this->def_burst_format = DEFAULT_BURST_FORMAT; this->def_burst_value = DEFAULT_BURST_VALUE; - this->qos_dscp = DEFAULT_QOS_DSCP; this->handle_read = DEFAULT_HANDLE_READ; - - this->header_flags = 0; } static void @@ -543,90 +497,19 @@ gst_multi_fd_sink_finalize (GObject * object) G_OBJECT_CLASS (parent_class)->finalize (object); } -static gint -setup_dscp_client (GstMultiFdSink * sink, GstTCPClient * client) +static int +gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client) { - gint tos; - gint ret; - union gst_sockaddr - { - struct sockaddr sa; - struct sockaddr_in6 sa_in6; - struct sockaddr_storage sa_stor; - } sa; - socklen_t slen = sizeof (sa); - gint af; - - /* don't touch */ - if (sink->qos_dscp < 0) - return 0; - - if ((ret = getsockname (client->fd.fd, &sa.sa, &slen)) < 0) { - GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno)); - return ret; - } - - af = sa.sa.sa_family; - - /* if this is an IPv4-mapped address then do IPv4 QoS */ - if (af == AF_INET6) { - - GST_DEBUG_OBJECT (sink, "check IP6 socket"); - if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) { - GST_DEBUG_OBJECT (sink, "mapped to IPV4"); - af = AF_INET; - } - } - - /* extract and shift 6 bits of the DSCP */ - tos = (sink->qos_dscp & 0x3f) << 2; + GstTCPClient *tclient = (GstTCPClient *) client; - switch (af) { - case AF_INET: - ret = setsockopt (client->fd.fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)); - break; - case AF_INET6: -#ifdef IPV6_TCLASS - ret = - setsockopt (client->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, - sizeof (tos)); - break; -#endif - default: - ret = 0; - GST_ERROR_OBJECT (sink, "unsupported AF"); - break; - } - if (ret) - GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno)); - - return ret; -} - - -static void -setup_dscp (GstMultiFdSink * sink) -{ - GList *clients, *next; - GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); - - CLIENTS_LOCK (sink); - for (clients = mhsink->clients; clients; clients = next) { - GstTCPClient *client; - - client = (GstTCPClient *) clients->data; - next = g_list_next (clients); - - setup_dscp_client (sink, client); - } - CLIENTS_UNLOCK (sink); + return tclient->fd.fd; } /* "add-full" signal implementation */ void gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, - GstSyncMethod sync_method, GstTCPUnitType min_unit, guint64 min_value, - GstTCPUnitType max_unit, guint64 max_value) + GstSyncMethod sync_method, GstFormat min_format, guint64 min_value, + GstFormat max_format, guint64 max_value) { GstTCPClient *client; GstMultiHandleClient *mhclient; @@ -636,12 +519,12 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); GST_DEBUG_OBJECT (sink, "[fd %5d] adding client, sync_method %d, " - "min_unit %d, min_value %" G_GUINT64_FORMAT - ", max_unit %d, max_value %" G_GUINT64_FORMAT, fd, sync_method, - min_unit, min_value, max_unit, max_value); + "min_format %d, min_value %" G_GUINT64_FORMAT + ", max_format %d, max_value %" G_GUINT64_FORMAT, fd, sync_method, + min_format, min_value, max_format, max_value); /* do limits check if we can */ - if (min_unit == max_unit) { + if (min_format == max_format) { if (max_value != -1 && min_value != -1 && max_value < min_value) goto wrong_limits; } @@ -653,9 +536,9 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, g_snprintf (mhclient->debug, 30, "[fd %5d]", fd); client->fd.fd = fd; - client->burst_min_unit = min_unit; + client->burst_min_format = min_format; client->burst_min_value = min_value; - client->burst_max_unit = max_unit; + client->burst_max_format = max_format; client->burst_max_value = max_value; CLIENTS_LOCK (sink); @@ -689,7 +572,7 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, /* figure out the mode, can't use send() for non sockets */ if (fstat (fd, &statbuf) == 0 && S_ISSOCK (statbuf.st_mode)) { client->is_socket = TRUE; - setup_dscp_client (sink, client); + gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient); } gst_poll_restart (sink->fdset); @@ -707,7 +590,7 @@ wrong_limits: GST_WARNING_OBJECT (sink, "[fd %5d] wrong values min =%" G_GUINT64_FORMAT ", max=%" G_GUINT64_FORMAT ", unit %d specified when adding client", fd, - min_value, max_value, min_unit); + min_value, max_value, min_format); return; } duplicate: @@ -731,7 +614,8 @@ gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd) mhsink = GST_MULTI_HANDLE_SINK (sink); gst_multi_fd_sink_add_full (sink, fd, mhsink->def_sync_method, - sink->def_burst_unit, sink->def_burst_value, sink->def_burst_unit, -1); + sink->def_burst_format, sink->def_burst_value, sink->def_burst_format, + -1); } /* "remove" signal implementation */ @@ -1086,16 +970,16 @@ ioctl_failed: /* queue the given buffer for the given client */ static gboolean -gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, - GstTCPClient * client, GstBuffer * buffer) +gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink, + GstMultiHandleClient * mhclient, GstBuffer * buffer) { GstCaps *caps; /* TRUE: send them if the new caps have them */ gboolean send_streamheader = FALSE; GstStructure *s; - GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; - GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); + GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink); + GstTCPClient *client = (GstTCPClient *) mhclient; /* before we queue the buffer, we check if we need to queue streamheader * buffers (because it's a new client, or because they changed) */ @@ -1212,10 +1096,10 @@ get_buffers_max (GstMultiFdSink * sink, gint64 max) { GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); - switch (sink->unit_type) { - case GST_TCP_UNIT_TYPE_BUFFERS: + switch (sink->unit_format) { + case GST_TCP_UNIT_FORMAT_BUFFERS: return max; - case GST_TCP_UNIT_TYPE_TIME: + case GST_TCP_UNIT_FORMAT_TIME: { GstBuffer *buf; int i; @@ -1239,7 +1123,7 @@ get_buffers_max (GstMultiFdSink * sink, gint64 max) } return len + 1; } - case GST_TCP_UNIT_TYPE_BYTES: + case GST_TCP_UNIT_FORMAT_BYTES: { GstBuffer *buf; int i; @@ -1383,23 +1267,23 @@ find_limits (GstMultiFdSink * sink, * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise. */ static gboolean -assign_value (GstTCPUnitType unit, guint64 value, gint * bytes, gint * buffers, +assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers, GstClockTime * time) { gboolean res = TRUE; /* set only the limit of the given format to the given value */ - switch (unit) { - case GST_TCP_UNIT_TYPE_BUFFERS: + switch (format) { + case GST_TCP_UNIT_FORMAT_BUFFERS: *buffers = (gint) value; break; - case GST_TCP_UNIT_TYPE_TIME: + case GST_TCP_UNIT_FORMAT_TIME: *time = value; break; - case GST_TCP_UNIT_TYPE_BYTES: + case GST_TCP_UNIT_FORMAT_BYTES: *bytes = (gint) value; break; - case GST_TCP_UNIT_TYPE_UNDEFINED: + case GST_TCP_UNIT_FORMAT_UNDEFINED: default: res = FALSE; break; @@ -1416,16 +1300,16 @@ assign_value (GstTCPUnitType unit, guint64 value, gint * bytes, gint * buffers, * function returns FALSE. */ static gboolean -count_burst_unit (GstMultiFdSink * sink, gint * min_idx, - GstTCPUnitType min_unit, guint64 min_value, gint * max_idx, - GstTCPUnitType max_unit, guint64 max_value) +count_burst_format (GstMultiFdSink * sink, gint * min_idx, + GstFormat min_format, guint64 min_value, gint * max_idx, + GstFormat max_format, guint64 max_value) { gint bytes_min = -1, buffers_min = -1; gint bytes_max = -1, buffers_max = -1; GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE; - assign_value (min_unit, min_value, &bytes_min, &buffers_min, &time_min); - assign_value (max_unit, max_value, &bytes_max, &buffers_max, &time_max); + assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min); + assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max); return find_limits (sink, min_idx, bytes_min, buffers_min, time_min, max_idx, bytes_max, buffers_max, time_max); @@ -1518,11 +1402,11 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) * is not enough data, we just send what we have (which is in result). * We use the max value to limit the search */ - ok = count_burst_unit (sink, &result, client->burst_min_unit, - client->burst_min_value, &max, client->burst_max_unit, + ok = count_burst_format (sink, &result, client->burst_min_format, + client->burst_min_value, &max, client->burst_max_format, client->burst_max_value); GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_BURST: burst_unit returned %d, result %d", + "[fd %5d] SYNC_METHOD_BURST: burst_format returned %d, result %d", client->fd.fd, ok, result); GST_LOG_OBJECT (sink, "min %d, max %d", result, max); @@ -1549,8 +1433,8 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) * NEXT_KEYFRAME. */ /* gather burst limits */ - count_burst_unit (sink, &min_idx, client->burst_min_unit, - client->burst_min_value, &max_idx, client->burst_max_unit, + count_burst_format (sink, &min_idx, client->burst_min_format, + client->burst_min_value, &max_idx, client->burst_max_format, client->burst_max_value); GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx); @@ -1596,8 +1480,8 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) * amount of data up 'till min. */ /* gather enough data to burst */ - count_burst_unit (sink, &min_idx, client->burst_min_unit, - client->burst_min_value, &max_idx, client->burst_max_unit, + count_burst_format (sink, &min_idx, client->burst_min_format, + client->burst_min_value, &max_idx, client->burst_max_format, client->burst_max_value); GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx); @@ -1662,10 +1546,11 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, gboolean flushing; GstClockTime now; GTimeVal nowtv; - GstMultiHandleSink *mhsink; + GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); + GstMultiHandleSinkClass *mhsinkclass = + GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; - mhsink = GST_MULTI_HANDLE_SINK (sink); g_get_current_time (&nowtv); now = GST_TIMEVAL_TO_TIME (nowtv); @@ -1731,7 +1616,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, fd, client, mhclient->bufpos); /* queueing a buffer will ref it */ - gst_multi_fd_sink_client_queue_buffer (sink, client, buf); + mhsinkclass->client_queue_buffer (mhsink, mhclient, buf); /* need to start from the first byte for this new buffer */ mhclient->bufoffset = 0; @@ -1850,13 +1735,13 @@ gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) break; case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT: /* move to beginning of soft max */ - newbufpos = get_buffers_max (sink, sink->units_soft_max); + newbufpos = get_buffers_max (sink, mhsink->units_soft_max); break; case GST_RECOVER_POLICY_RESYNC_KEYFRAME: /* find keyframe in buffers, we search backwards to find the * closest keyframe relative to what this client already received. */ newbufpos = MIN (mhsink->bufqueue->len - 1, - get_buffers_max (sink, sink->units_soft_max) - 1); + get_buffers_max (sink, mhsink->units_soft_max) - 1); while (newbufpos >= 0) { GstBuffer *buf; @@ -1871,7 +1756,7 @@ gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) break; default: /* unknown recovery procedure */ - newbufpos = get_buffers_max (sink, sink->units_soft_max); + newbufpos = get_buffers_max (sink, mhsink->units_soft_max); break; } return newbufpos; @@ -1896,7 +1781,7 @@ gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) * the select thread that the fd_set changed. */ static void -gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) +gst_multi_fd_sink_queue_buffer (GstMultiHandleSink * mhsink, GstBuffer * buffer) { GList *clients, *next; gint queuelen; @@ -1907,25 +1792,25 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) GstClockTime now; gint max_buffers, soft_max_buffers; guint cookie; - GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); + GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink); GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); g_get_current_time (&nowtv); now = GST_TIMEVAL_TO_TIME (nowtv); - CLIENTS_LOCK (sink); + CLIENTS_LOCK (mhsink); /* add buffer to queue */ - g_array_prepend_val (mhsink->bufqueue, buf); + g_array_prepend_val (mhsink->bufqueue, buffer); queuelen = mhsink->bufqueue->len; - if (sink->units_max > 0) - max_buffers = get_buffers_max (sink, sink->units_max); + if (mhsink->units_max > 0) + max_buffers = get_buffers_max (sink, mhsink->units_max); else max_buffers = -1; - if (sink->units_soft_max > 0) - soft_max_buffers = get_buffers_max (sink, sink->units_soft_max); + if (mhsink->units_soft_max > 0) + soft_max_buffers = get_buffers_max (sink, mhsink->units_soft_max); else soft_max_buffers = -1; GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers, @@ -2260,116 +2145,6 @@ gst_multi_fd_sink_thread (GstMultiHandleSink * mhsink) return NULL; } -static GstFlowReturn -gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf) -{ - GstMultiFdSink *sink; - gboolean in_caps; -#if 0 - GstCaps *bufcaps, *padcaps; -#endif - GstMultiHandleSink *mhsink; - - sink = GST_MULTI_FD_SINK (bsink); - mhsink = GST_MULTI_HANDLE_SINK (sink); - - g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, - GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING); - -#if 0 - /* since we check every buffer for streamheader caps, we need to make - * sure every buffer has caps set */ - bufcaps = gst_buffer_get_caps (buf); - padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink)); - - /* make sure we have caps on the pad */ - if (!padcaps && !bufcaps) - goto no_caps; -#endif - - /* get HEADER first, code below might mess with the flags */ - in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER); - -#if 0 - /* stamp the buffer with previous caps if no caps set */ - if (!bufcaps) { - if (!gst_buffer_is_writable (buf)) { - /* metadata is not writable, copy will be made and original buffer - * will be unreffed so we need to ref so that we don't lose the - * buffer in the render method. */ - gst_buffer_ref (buf); - /* the new buffer is ours only, we keep it out of the scope of this - * function */ - buf = gst_buffer_make_writable (buf); - } else { - /* else the metadata is writable, we ref because we keep the buffer - * out of the scope of this method */ - gst_buffer_ref (buf); - } - /* buffer metadata is writable now, set the caps */ - gst_buffer_set_caps (buf, padcaps); - } else { - gst_caps_unref (bufcaps); - - /* since we keep this buffer out of the scope of this method */ - gst_buffer_ref (buf); - } -#endif - gst_buffer_ref (buf); - - GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %" - G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT - ", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT, - buf, in_caps ? "yes" : "no", GST_BUFFER_OFFSET (buf), - GST_BUFFER_OFFSET_END (buf), - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), - GST_TIME_ARGS (GST_BUFFER_DURATION (buf))); - - /* if we get HEADER buffers, but the previous buffer was not HEADER, - * it means we're getting new streamheader buffers, and we should clear - * the old ones */ - if (in_caps && sink->previous_buffer_in_caps == FALSE) { - GST_DEBUG_OBJECT (sink, - "receiving new HEADER buffers, clearing old streamheader"); - g_slist_foreach (mhsink->streamheader, (GFunc) gst_mini_object_unref, NULL); - g_slist_free (mhsink->streamheader); - mhsink->streamheader = NULL; - } - - /* save the current in_caps */ - sink->previous_buffer_in_caps = in_caps; - - /* if the incoming buffer is marked as IN CAPS, then we assume for now - * it's a streamheader that needs to be sent to each new client, so we - * put it on our internal list of streamheader buffers. - * FIXME: we could check if the buffer's contents are in fact part of the - * current streamheader. - * - * We don't send the buffer to the client, since streamheaders are sent - * separately when necessary. */ - if (in_caps) { - GST_DEBUG_OBJECT (sink, "appending HEADER buffer with length %" - G_GSIZE_FORMAT " to streamheader", gst_buffer_get_size (buf)); - mhsink->streamheader = g_slist_append (mhsink->streamheader, buf); - } else { - /* queue the buffer, this is a regular data buffer. */ - gst_multi_fd_sink_queue_buffer (sink, buf); - - mhsink->bytes_to_serve += gst_buffer_get_size (buf); - } - return GST_FLOW_OK; - - /* ERRORS */ -#if 0 -no_caps: - { - GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL), - ("Received first buffer without caps set")); - return GST_FLOW_NOT_NEGOTIATED; - } -#endif -} - static void gst_multi_fd_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) @@ -2382,31 +2157,15 @@ gst_multi_fd_sink_set_property (GObject * object, guint prop_id, case PROP_MODE: multifdsink->mode = g_value_get_enum (value); break; - case PROP_BUFFERS_MAX: - multifdsink->units_max = g_value_get_int (value); - break; - case PROP_BUFFERS_SOFT_MAX: - multifdsink->units_soft_max = g_value_get_int (value); - break; - case PROP_UNIT_TYPE: - multifdsink->unit_type = g_value_get_enum (value); - break; - case PROP_UNITS_MAX: - multifdsink->units_max = g_value_get_int64 (value); - break; - case PROP_UNITS_SOFT_MAX: - multifdsink->units_soft_max = g_value_get_int64 (value); + case PROP_UNIT_FORMAT: + multifdsink->unit_format = g_value_get_enum (value); break; - case PROP_BURST_UNIT: - multifdsink->def_burst_unit = g_value_get_enum (value); + case PROP_BURST_FORMAT: + multifdsink->def_burst_format = g_value_get_enum (value); break; case PROP_BURST_VALUE: multifdsink->def_burst_value = g_value_get_uint64 (value); break; - case PROP_QOS_DSCP: - multifdsink->qos_dscp = g_value_get_int (value); - setup_dscp (multifdsink); - break; case PROP_HANDLE_READ: multifdsink->handle_read = g_value_get_boolean (value); break; @@ -2429,30 +2188,15 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value, case PROP_MODE: g_value_set_enum (value, multifdsink->mode); break; - case PROP_BUFFERS_MAX: - g_value_set_int (value, multifdsink->units_max); - break; - case PROP_BUFFERS_SOFT_MAX: - g_value_set_int (value, multifdsink->units_soft_max); - break; - case PROP_UNIT_TYPE: - g_value_set_enum (value, multifdsink->unit_type); + case PROP_UNIT_FORMAT: + g_value_set_enum (value, multifdsink->unit_format); break; - case PROP_UNITS_MAX: - g_value_set_int64 (value, multifdsink->units_max); - break; - case PROP_UNITS_SOFT_MAX: - g_value_set_int64 (value, multifdsink->units_soft_max); - break; - case PROP_BURST_UNIT: - g_value_set_enum (value, multifdsink->def_burst_unit); + case PROP_BURST_FORMAT: + g_value_set_enum (value, multifdsink->def_burst_format); break; case PROP_BURST_VALUE: g_value_set_uint64 (value, multifdsink->def_burst_value); break; - case PROP_QOS_DSCP: - g_value_set_int (value, multifdsink->qos_dscp); - break; case PROP_HANDLE_READ: g_value_set_boolean (value, multifdsink->handle_read); break; diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index a545189..89ec423 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -49,19 +49,19 @@ typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass; /** * GstTCPUnitType: - * @GST_TCP_UNIT_TYPE_UNDEFINED: undefined - * @GST_TCP_UNIT_TYPE_BUFFERS : buffers - * @GST_TCP_UNIT_TYPE_TIME : timeunits (in nanoseconds) - * @GST_TCP_UNIT_TYPE_BYTES : bytes + * @GST_TCP_UNIT_FORMAT_UNDEFINED: undefined + * @GST_TCP_UNIT_FORMAT_BUFFERS : buffers + * @GST_TCP_UNIT_FORMAT_TIME : timeunits (in nanoseconds) + * @GST_TCP_UNIT_FORMAT_BYTES : bytes * * The units used to specify limits. */ typedef enum { - GST_TCP_UNIT_TYPE_UNDEFINED, - GST_TCP_UNIT_TYPE_BUFFERS, - GST_TCP_UNIT_TYPE_TIME, - GST_TCP_UNIT_TYPE_BYTES + GST_TCP_UNIT_FORMAT_UNDEFINED, + GST_TCP_UNIT_FORMAT_BUFFERS, + GST_TCP_UNIT_FORMAT_TIME, + GST_TCP_UNIT_FORMAT_BYTES, } GstTCPUnitType; /* structure for a client @@ -77,9 +77,9 @@ typedef struct { /* method to sync client when connecting */ GstSyncMethod sync_method; - GstTCPUnitType burst_min_unit; + GstFormat burst_min_format; guint64 burst_min_value; - GstTCPUnitType burst_max_unit; + GstFormat burst_max_format; guint64 burst_max_value; } GstTCPClient; @@ -97,19 +97,13 @@ struct _GstMultiFdSink { gint mode; GstPoll *fdset; - gboolean previous_buffer_in_caps; - - guint mtu; - gint qos_dscp; gboolean handle_read; /* these values are used to check if a client is reading fast * enough and to control receovery */ - GstTCPUnitType unit_type;/* the type of the units */ - gint64 units_max; /* max units to queue for a client */ - gint64 units_soft_max; /* max units a client can lag before recovery starts */ + GstFormat unit_format;/* the type of the units */ - GstTCPUnitType def_burst_unit; + GstFormat def_burst_format; guint64 def_burst_value; guint8 header_flags; @@ -121,8 +115,8 @@ struct _GstMultiFdSinkClass { /* element methods */ void (*add) (GstMultiFdSink *sink, int fd); void (*add_full) (GstMultiFdSink *sink, int fd, GstSyncMethod sync, - GstTCPUnitType format, guint64 value, - GstTCPUnitType max_unit, guint64 max_value); + GstFormat format, guint64 value, + GstFormat max_format, guint64 max_value); void (*remove) (GstMultiFdSink *sink, int fd); void (*remove_flush) (GstMultiFdSink *sink, int fd); void (*clear) (GstMultiFdSink *sink); @@ -142,8 +136,8 @@ GType gst_multi_fd_sink_get_type (void); void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd); void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync, - GstTCPUnitType min_unit, guint64 min_value, - GstTCPUnitType max_unit, guint64 max_value); + GstFormat min_format, guint64 min_value, + GstFormat max_format, guint64 max_value); void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd); void gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, int fd); void gst_multi_fd_sink_clear (GstMultiHandleSink *sink); diff --git a/gst/tcp/gstmultihandlesink.c b/gst/tcp/gstmultihandlesink.c index 2dcaedf..bb0ffcb 100644 --- a/gst/tcp/gstmultihandlesink.c +++ b/gst/tcp/gstmultihandlesink.c @@ -144,13 +144,12 @@ enum /* this is really arbitrarily chosen */ -#define DEFAULT_MODE 1 #define DEFAULT_BUFFERS_MAX -1 #define DEFAULT_BUFFERS_SOFT_MAX -1 #define DEFAULT_TIME_MIN -1 #define DEFAULT_BYTES_MIN -1 #define DEFAULT_BUFFERS_MIN -1 -#define DEFAULT_UNIT_TYPE GST_FORMAT_BUFFERS +#define DEFAULT_UNIT_FORMAT GST_FORMAT_BUFFERS #define DEFAULT_UNITS_MAX -1 #define DEFAULT_UNITS_SOFT_MAX -1 #define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE @@ -167,21 +166,18 @@ enum enum { PROP_0, -#if 0 - PROP_MODE, -#endif PROP_BUFFERS_QUEUED, PROP_BYTES_QUEUED, PROP_TIME_QUEUED, #if 0 - PROP_UNIT_TYPE, + PROP_UNIT_FORMAT, +#endif PROP_UNITS_MAX, PROP_UNITS_SOFT_MAX, PROP_BUFFERS_MAX, PROP_BUFFERS_SOFT_MAX, -#endif PROP_TIME_MIN, PROP_BYTES_MIN, @@ -196,9 +192,9 @@ enum #if 0 PROP_BURST_FORMAT, PROP_BURST_VALUE, +#endif PROP_QOS_DSCP, -#endif PROP_RESEND_STREAMHEADER, @@ -304,9 +300,9 @@ static gboolean gst_multi_handle_sink_socket_condition (GSocket * socket, GIOCondition condition, GstMultiHandleSink * sink); #endif -#if 0 static GstFlowReturn gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf); +#if 0 static gboolean gst_multi_handle_sink_unlock (GstBaseSink * bsink); static gboolean gst_multi_handle_sink_unlock_stop (GstBaseSink * bsink); #endif @@ -328,15 +324,11 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) { GObjectClass *gobject_class; GstElementClass *gstelement_class; -#if 0 GstBaseSinkClass *gstbasesink_class; -#endif gobject_class = (GObjectClass *) klass; gstelement_class = (GstElementClass *) klass; -#if 0 gstbasesink_class = (GstBaseSinkClass *) klass; -#endif gobject_class->set_property = gst_multi_handle_sink_set_property; gobject_class->get_property = gst_multi_handle_sink_get_property; @@ -374,10 +366,10 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); #if 0 - g_object_class_install_property (gobject_class, PROP_UNIT_TYPE, - g_param_spec_enum ("unit-type", "Units type", + g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT, + g_param_spec_enum ("unit-format", "Units format", "The unit to measure the max/soft-max/queued properties", - GST_TYPE_FORMAT, DEFAULT_UNIT_TYPE, + GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_UNITS_MAX, g_param_spec_int64 ("units-max", "Units max", @@ -438,6 +430,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) g_param_spec_uint64 ("burst-value", "Burst value", "The amount of burst expressed in burst-unit", 0, G_MAXUINT64, DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +#endif g_object_class_install_property (gobject_class, PROP_QOS_DSCP, g_param_spec_int ("qos-dscp", "QoS diff srv code point", @@ -445,6 +438,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) -1, 63, DEFAULT_QOS_DSCP, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +#if 0 /** * GstMultiHandleSink::handle-read * @@ -638,9 +632,9 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_change_state); -#if 0 gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_render); +#if 0 gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_unlock); gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_unlock_stop); @@ -673,7 +667,7 @@ gst_multi_handle_sink_init (GstMultiHandleSink * this) #endif this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *)); - this->unit_type = DEFAULT_UNIT_TYPE; + this->unit_format = DEFAULT_UNIT_FORMAT; this->units_max = DEFAULT_UNITS_MAX; this->units_soft_max = DEFAULT_UNITS_SOFT_MAX; this->time_min = DEFAULT_TIME_MIN; @@ -683,15 +677,15 @@ gst_multi_handle_sink_init (GstMultiHandleSink * this) this->timeout = DEFAULT_TIMEOUT; this->def_sync_method = DEFAULT_SYNC_METHOD; + +#if 0 this->def_burst_format = DEFAULT_BURST_FORMAT; this->def_burst_value = DEFAULT_BURST_VALUE; +#endif this->qos_dscp = DEFAULT_QOS_DSCP; this->resend_streamheader = DEFAULT_RESEND_STREAMHEADER; - - this->header_flags = 0; - this->cancellable = g_cancellable_new (); } static void @@ -707,9 +701,9 @@ gst_multi_handle_sink_finalize (GObject * object) G_OBJECT_CLASS (parent_class)->finalize (object); } -#if 0 -static gint -setup_dscp_client (GstMultiHandleSink * sink, GstSocketClient * client) +gint +gst_multi_handle_sink_setup_dscp_client (GstMultiHandleSink * sink, + GstMultiHandleClient * client) { #ifndef IP_TOS return 0; @@ -725,12 +719,13 @@ setup_dscp_client (GstMultiHandleSink * sink, GstSocketClient * client) } sa; socklen_t slen = sizeof (sa); gint af; + GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (sink); /* don't touch */ if (sink->qos_dscp < 0) return 0; - fd = g_socket_get_fd (client->socket); + fd = mhsinkclass->client_get_fd (client); if ((ret = getsockname (fd, &sa.sa, &slen)) < 0) { GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno)); @@ -773,23 +768,6 @@ setup_dscp_client (GstMultiHandleSink * sink, GstSocketClient * client) #endif } -static void -setup_dscp (GstMultiHandleSink * sink) -{ - GList *clients; - - CLIENTS_LOCK (sink); - for (clients = sink->clients; clients; clients = clients->next) { - GstSocketClient *client; - - client = clients->data; - - setup_dscp_client (sink, client); - } - CLIENTS_UNLOCK (sink); -} -#endif - void gst_multi_handle_sink_client_init (GstMultiHandleClient * client, GstSyncMethod sync_method) @@ -818,6 +796,23 @@ gst_multi_handle_sink_client_init (GstMultiHandleClient * client, client->last_activity_time = client->connect_time; } +void +gst_multi_handle_sink_setup_dscp (GstMultiHandleSink * mhsink) +{ + GList *clients; + + CLIENTS_LOCK (mhsink); + for (clients = mhsink->clients; clients; clients = clients->next) { + GstMultiHandleClient *client; + + client = clients->data; + + gst_multi_handle_sink_setup_dscp_client (mhsink, client); + } + CLIENTS_UNLOCK (mhsink); +} + + #if 0 /* "add-full" signal implementation */ void @@ -1421,7 +1416,7 @@ find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction) static gint get_buffers_max (GstMultiHandleSink * sink, gint64 max) { - switch (sink->unit_type) { + switch (sink->unit_format) { case GST_FORMAT_BUFFERS: return max; case GST_FORMAT_TIME: @@ -2409,20 +2404,19 @@ gst_multi_handle_sink_thread (GstMultiHandleSink * sink) } #endif -#if 0 static GstFlowReturn gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf) { - GstMultiHandleSink *sink; gboolean in_caps; #if 0 GstCaps *bufcaps, *padcaps; #endif - sink = GST_MULTI_HANDLE_SINK (bsink); + GstMultiHandleSink *sink = GST_MULTI_HANDLE_SINK (bsink); + GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (sink); g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, - GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_WRONG_STATE); + GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING); #if 0 /* since we check every buffer for streamheader caps, we need to make @@ -2436,7 +2430,7 @@ gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf) #endif /* get IN_CAPS first, code below might mess with the flags */ - in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS); + in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER); #if 0 /* stamp the buffer with previous caps if no caps set */ @@ -2463,6 +2457,7 @@ gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf) gst_buffer_ref (buf); } #endif + gst_buffer_ref (buf); GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT @@ -2477,7 +2472,7 @@ gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf) * the old ones */ if (in_caps && sink->previous_buffer_in_caps == FALSE) { GST_DEBUG_OBJECT (sink, - "receiving new IN_CAPS buffers, clearing old streamheader"); + "receiving new HEADER buffers, clearing old streamheader"); g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL); g_slist_free (sink->streamheader); sink->streamheader = NULL; @@ -2495,12 +2490,12 @@ gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf) * We don't send the buffer to the client, since streamheaders are sent * separately when necessary. */ if (in_caps) { - GST_DEBUG_OBJECT (sink, "appending IN_CAPS buffer with length %" + GST_DEBUG_OBJECT (sink, "appending HEADER buffer with length %" G_GSIZE_FORMAT " to streamheader", gst_buffer_get_size (buf)); sink->streamheader = g_slist_append (sink->streamheader, buf); } else { /* queue the buffer, this is a regular data buffer. */ - gst_multi_handle_sink_queue_buffer (sink, buf); + mhsinkclass->queue_buffer (sink, buf); sink->bytes_to_serve += gst_buffer_get_size (buf); } @@ -2516,7 +2511,6 @@ no_caps: } #endif } -#endif static void gst_multi_handle_sink_set_property (GObject * object, guint prop_id, @@ -2527,14 +2521,12 @@ gst_multi_handle_sink_set_property (GObject * object, guint prop_id, multihandlesink = GST_MULTI_HANDLE_SINK (object); switch (prop_id) { -#if 0 case PROP_BUFFERS_MAX: multihandlesink->units_max = g_value_get_int (value); break; case PROP_BUFFERS_SOFT_MAX: multihandlesink->units_soft_max = g_value_get_int (value); break; -#endif case PROP_TIME_MIN: multihandlesink->time_min = g_value_get_int64 (value); break; @@ -2545,8 +2537,8 @@ gst_multi_handle_sink_set_property (GObject * object, guint prop_id, multihandlesink->buffers_min = g_value_get_int (value); break; #if 0 - case PROP_UNIT_TYPE: - multihandlesink->unit_type = g_value_get_enum (value); + case PROP_UNIT_FORMAT: + multihandlesink->unit_format = g_value_get_enum (value); break; case PROP_UNITS_MAX: multihandlesink->units_max = g_value_get_int64 (value); @@ -2571,14 +2563,16 @@ gst_multi_handle_sink_set_property (GObject * object, guint prop_id, case PROP_BURST_VALUE: multihandlesink->def_burst_value = g_value_get_uint64 (value); break; +#endif case PROP_QOS_DSCP: multihandlesink->qos_dscp = g_value_get_int (value); - setup_dscp (multihandlesink); + gst_multi_handle_sink_setup_dscp (multihandlesink); break; + case PROP_RESEND_STREAMHEADER: multihandlesink->resend_streamheader = g_value_get_boolean (value); break; -#endif + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -2594,14 +2588,12 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id, multihandlesink = GST_MULTI_HANDLE_SINK (object); switch (prop_id) { -#if 0 case PROP_BUFFERS_MAX: g_value_set_int (value, multihandlesink->units_max); break; case PROP_BUFFERS_SOFT_MAX: g_value_set_int (value, multihandlesink->units_soft_max); break; -#endif case PROP_TIME_MIN: g_value_set_int64 (value, multihandlesink->time_min); break; @@ -2621,8 +2613,8 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id, g_value_set_uint64 (value, multihandlesink->time_queued); break; #if 0 - case PROP_UNIT_TYPE: - g_value_set_enum (value, multihandlesink->unit_type); + case PROP_UNIT_FORMAT: + g_value_set_enum (value, multihandlesink->unit_format); break; case PROP_UNITS_MAX: g_value_set_int64 (value, multihandlesink->units_max); @@ -2653,12 +2645,14 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id, case PROP_BURST_VALUE: g_value_set_uint64 (value, multihandlesink->def_burst_value); break; +#endif case PROP_QOS_DSCP: g_value_set_int (value, multihandlesink->qos_dscp); break; case PROP_RESEND_STREAMHEADER: g_value_set_boolean (value, multihandlesink->resend_streamheader); break; +#if 0 case PROP_NUM_SOCKETS: g_value_set_uint (value, g_hash_table_size (multihandlesink->socket_hash)); diff --git a/gst/tcp/gstmultihandlesink.h b/gst/tcp/gstmultihandlesink.h index 87aafc5..cbd86c7 100644 --- a/gst/tcp/gstmultihandlesink.h +++ b/gst/tcp/gstmultihandlesink.h @@ -177,6 +177,8 @@ find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction); gboolean is_sync_frame (GstMultiHandleSink * sink, GstBuffer * buffer); gboolean gst_multi_handle_sink_stop (GstBaseSink * bsink); gboolean gst_multi_handle_sink_start (GstBaseSink * bsink); +void gst_multi_handle_sink_setup_dscp (GstMultiHandleSink * mhsink); +gint gst_multi_handle_sink_setup_dscp_client (GstMultiHandleSink * sink, GstMultiHandleClient * client); /** * GstMultiHandleSink: @@ -200,7 +202,6 @@ struct _GstMultiHandleSink { GSList *streamheader; /* GSList of GstBuffers to use as streamheader */ gboolean previous_buffer_in_caps; - guint mtu; gint qos_dscp; GArray *bufqueue; /* global queue of buffers */ @@ -210,7 +211,7 @@ struct _GstMultiHandleSink { /* these values are used to check if a client is reading fast * enough and to control receovery */ - GstFormat unit_type;/* the format of the units */ + GstFormat unit_format;/* the format of the units */ gint64 units_max; /* max units to queue for a client */ gint64 units_soft_max; /* max units a client can lag before recovery starts */ GstRecoverPolicy recover_policy; @@ -253,6 +254,15 @@ struct _GstMultiHandleSinkClass { void (*stop_post) (GstMultiHandleSink *sink); gboolean (*start_pre) (GstMultiHandleSink *sink); gpointer (*thread) (GstMultiHandleSink *sink); + void (*queue_buffer) (GstMultiHandleSink *sink, + GstBuffer *buffer); + gboolean (*client_queue_buffer) + (GstMultiHandleSink *sink, + GstMultiHandleClient *client, + GstBuffer *buffer); + int (*client_get_fd) + (GstMultiHandleClient *client); + GstStructure* (*get_stats) (GstMultiHandleSink *sink, GSocket *socket); void (*remove_client_link) (GstMultiHandleSink * sink, GList * link); diff --git a/gst/tcp/gstmultisocketsink.c b/gst/tcp/gstmultisocketsink.c index 0414aea..f7b1840 100644 --- a/gst/tcp/gstmultisocketsink.c +++ b/gst/tcp/gstmultisocketsink.c @@ -115,11 +115,6 @@ #define NOT_IMPLEMENTED 0 -static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", - GST_PAD_SINK, - GST_PAD_ALWAYS, - GST_STATIC_CAPS_ANY); - GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug); #define GST_CAT_DEFAULT (multisocketsink_debug) @@ -143,35 +138,20 @@ enum /* this is really arbitrarily chosen */ -#define DEFAULT_MODE 1 -#define DEFAULT_BUFFERS_MAX -1 -#define DEFAULT_BUFFERS_SOFT_MAX -1 -#define DEFAULT_UNIT_TYPE GST_FORMAT_BUFFERS -#define DEFAULT_UNITS_MAX -1 -#define DEFAULT_UNITS_SOFT_MAX -1 +#define DEFAULT_UNIT_FORMAT GST_FORMAT_BUFFERS #define DEFAULT_BURST_FORMAT GST_FORMAT_UNDEFINED #define DEFAULT_BURST_VALUE 0 -#define DEFAULT_QOS_DSCP -1 - enum { PROP_0, - PROP_MODE, - PROP_UNIT_TYPE, - PROP_UNITS_MAX, - PROP_UNITS_SOFT_MAX, - - PROP_BUFFERS_MAX, - PROP_BUFFERS_SOFT_MAX, + PROP_UNIT_FORMAT, PROP_BURST_FORMAT, PROP_BURST_VALUE, - PROP_QOS_DSCP, - PROP_NUM_SOCKETS, PROP_LAST @@ -183,14 +163,18 @@ static void gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink); static void gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink); static gboolean gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink); static gpointer gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink); +static void gst_multi_socket_sink_queue_buffer (GstMultiHandleSink * mhsink, + GstBuffer * buffer); +static gboolean gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * + mhsink, GstMultiHandleClient * mhclient, GstBuffer * buffer); +static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client); + static void gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink, GList * link); static gboolean gst_multi_socket_sink_socket_condition (GSocket * socket, GIOCondition condition, GstMultiSocketSink * sink); -static GstFlowReturn gst_multi_socket_sink_render (GstBaseSink * bsink, - GstBuffer * buf); static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink); static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink); @@ -222,30 +206,10 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass) gobject_class->get_property = gst_multi_socket_sink_get_property; gobject_class->finalize = gst_multi_socket_sink_finalize; - g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX, - g_param_spec_int ("buffers-max", "Buffers max", - "max number of buffers to queue for a client (-1 = no limit)", -1, - G_MAXINT, DEFAULT_BUFFERS_MAX, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX, - g_param_spec_int ("buffers-soft-max", "Buffers soft max", - "Recover client when going over this limit (-1 = no limit)", -1, - G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_UNIT_TYPE, - g_param_spec_enum ("unit-type", "Units type", + g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT, + g_param_spec_enum ("unit-format", "Units format", "The unit to measure the max/soft-max/queued properties", - GST_TYPE_FORMAT, DEFAULT_UNIT_TYPE, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_UNITS_MAX, - g_param_spec_int64 ("units-max", "Units max", - "max number of units to queue (-1 = no limit)", -1, G_MAXINT64, - DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX, - g_param_spec_int64 ("units-soft-max", "Units soft max", - "Recover client when going over this limit (-1 = no limit)", -1, - G_MAXINT64, DEFAULT_UNITS_SOFT_MAX, + GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_BURST_FORMAT, @@ -258,12 +222,6 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass) "The amount of burst expressed in burst-unit", 0, G_MAXUINT64, DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_QOS_DSCP, - g_param_spec_int ("qos-dscp", "QoS diff srv code point", - "Quality of Service, differentiated services code point (-1 default)", - -1, 63, DEFAULT_QOS_DSCP, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_NUM_SOCKETS, g_param_spec_uint ("num-sockets", "Number of sockets", "The current number of client sockets", @@ -403,9 +361,6 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass) client_socket_removed), NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&sinktemplate)); - gst_element_class_set_details_simple (gstelement_class, "Multi socket sink", "Sink/Network", "Send data to multiple sockets", @@ -413,8 +368,6 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass) "Wim Taymans , " "Sebastian Dröge "); - gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_render); - gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock); gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop); @@ -427,6 +380,12 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass) GST_DEBUG_FUNCPTR (gst_multi_socket_sink_start_pre); gstmultihandlesink_class->thread = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_thread); + gstmultihandlesink_class->queue_buffer = + GST_DEBUG_FUNCPTR (gst_multi_socket_sink_queue_buffer); + gstmultihandlesink_class->client_queue_buffer = + GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_queue_buffer); + gstmultihandlesink_class->client_get_fd = + GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_get_fd); gstmultihandlesink_class->remove_client_link = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_client_link); @@ -446,16 +405,11 @@ gst_multi_socket_sink_init (GstMultiSocketSink * this) { this->socket_hash = g_hash_table_new (g_direct_hash, g_int_equal); - this->unit_type = DEFAULT_UNIT_TYPE; - this->units_max = DEFAULT_UNITS_MAX; - this->units_soft_max = DEFAULT_UNITS_SOFT_MAX; + this->unit_format = DEFAULT_UNIT_FORMAT; this->def_burst_format = DEFAULT_BURST_FORMAT; this->def_burst_value = DEFAULT_BURST_VALUE; - this->qos_dscp = DEFAULT_QOS_DSCP; - - this->header_flags = 0; this->cancellable = g_cancellable_new (); } @@ -473,86 +427,12 @@ gst_multi_socket_sink_finalize (GObject * object) G_OBJECT_CLASS (parent_class)->finalize (object); } -static gint -setup_dscp_client (GstMultiSocketSink * sink, GstSocketClient * client) +static int +gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client) { -#ifndef IP_TOS - return 0; -#else - gint tos; - gint ret; - int fd; - union gst_sockaddr - { - struct sockaddr sa; - struct sockaddr_in6 sa_in6; - struct sockaddr_storage sa_stor; - } sa; - socklen_t slen = sizeof (sa); - gint af; - - /* don't touch */ - if (sink->qos_dscp < 0) - return 0; - - fd = g_socket_get_fd (client->socket); - - if ((ret = getsockname (fd, &sa.sa, &slen)) < 0) { - GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno)); - return ret; - } + GstSocketClient *msclient = (GstSocketClient *) client; - af = sa.sa.sa_family; - - /* if this is an IPv4-mapped address then do IPv4 QoS */ - if (af == AF_INET6) { - - GST_DEBUG_OBJECT (sink, "check IP6 socket"); - if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) { - GST_DEBUG_OBJECT (sink, "mapped to IPV4"); - af = AF_INET; - } - } - - /* extract and shift 6 bits of the DSCP */ - tos = (sink->qos_dscp & 0x3f) << 2; - - switch (af) { - case AF_INET: - ret = setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)); - break; - case AF_INET6: -#ifdef IPV6_TCLASS - ret = setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)); - break; -#endif - default: - ret = 0; - GST_ERROR_OBJECT (sink, "unsupported AF"); - break; - } - if (ret) - GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno)); - - return ret; -#endif -} - -static void -setup_dscp (GstMultiSocketSink * sink) -{ - GList *clients; - GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); - - CLIENTS_LOCK (sink); - for (clients = mhsink->clients; clients; clients = clients->next) { - GstSocketClient *client; - - client = clients->data; - - setup_dscp_client (sink, client); - } - CLIENTS_UNLOCK (sink); + return g_socket_get_fd (msclient->socket); } /* "add-full" signal implementation */ @@ -615,7 +495,7 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket, g_source_attach (client->source, sink->main_context); } - setup_dscp_client (sink, client); + gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient); CLIENTS_UNLOCK (sink); @@ -958,11 +838,11 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink, /* queue the given buffer for the given client */ static gboolean -gst_multi_socket_sink_client_queue_buffer (GstMultiSocketSink * sink, - GstSocketClient * client, GstBuffer * buffer) +gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink, + GstMultiHandleClient * mhclient, GstBuffer * buffer) { - GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; - GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); + GstSocketClient *client = (GstSocketClient *) mhclient; + GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink); GstCaps *caps; /* TRUE: send them if the new caps have them */ @@ -1086,7 +966,7 @@ get_buffers_max (GstMultiSocketSink * sink, gint64 max) { GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); - switch (sink->unit_type) { + switch (sink->unit_format) { case GST_FORMAT_BUFFERS: return max; case GST_FORMAT_TIME: @@ -1540,10 +1420,11 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, GstClockTime now; GTimeVal nowtv; GError *err = NULL; - GstMultiHandleSink *mhsink; + GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; + GstMultiHandleSinkClass *mhsinkclass = + GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); - mhsink = GST_MULTI_HANDLE_SINK (sink); g_get_current_time (&nowtv); now = GST_TIMEVAL_TO_TIME (nowtv); @@ -1617,7 +1498,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, socket, client, mhclient->bufpos); /* queueing a buffer will ref it */ - gst_multi_socket_sink_client_queue_buffer (sink, client, buf); + mhsinkclass->client_queue_buffer (mhsink, mhclient, buf); /* need to start from the first byte for this new buffer */ mhclient->bufoffset = 0; @@ -1730,13 +1611,13 @@ gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink, break; case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT: /* move to beginning of soft max */ - newbufpos = get_buffers_max (sink, sink->units_soft_max); + newbufpos = get_buffers_max (sink, mhsink->units_soft_max); break; case GST_RECOVER_POLICY_RESYNC_KEYFRAME: /* find keyframe in buffers, we search backwards to find the * closest keyframe relative to what this client already received. */ newbufpos = MIN (mhsink->bufqueue->len - 1, - get_buffers_max (sink, sink->units_soft_max) - 1); + get_buffers_max (sink, mhsink->units_soft_max) - 1); while (newbufpos >= 0) { GstBuffer *buf; @@ -1751,7 +1632,7 @@ gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink, break; default: /* unknown recovery procedure */ - newbufpos = get_buffers_max (sink, sink->units_soft_max); + newbufpos = get_buffers_max (sink, mhsink->units_soft_max); break; } return newbufpos; @@ -1776,7 +1657,8 @@ gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink, * the select thread that the fd_set changed. */ static void -gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf) +gst_multi_socket_sink_queue_buffer (GstMultiHandleSink * mhsink, + GstBuffer * buf) { GList *clients, *next; gint queuelen; @@ -1786,7 +1668,7 @@ gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf) GstClockTime now; gint max_buffers, soft_max_buffers; guint cookie; - GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); + GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink); GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); @@ -1799,13 +1681,13 @@ gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf) g_array_prepend_val (mhsink->bufqueue, buf); queuelen = mhsink->bufqueue->len; - if (sink->units_max > 0) - max_buffers = get_buffers_max (sink, sink->units_max); + if (mhsink->units_max > 0) + max_buffers = get_buffers_max (sink, mhsink->units_max); else max_buffers = -1; - if (sink->units_soft_max > 0) - soft_max_buffers = get_buffers_max (sink, sink->units_soft_max); + if (mhsink->units_soft_max > 0) + soft_max_buffers = get_buffers_max (sink, mhsink->units_soft_max); else soft_max_buffers = -1; GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers, @@ -2082,116 +1964,6 @@ gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink) return NULL; } -static GstFlowReturn -gst_multi_socket_sink_render (GstBaseSink * bsink, GstBuffer * buf) -{ - GstMultiSocketSink *sink; - gboolean in_caps; -#if 0 - GstCaps *bufcaps, *padcaps; -#endif - GstMultiHandleSink *mhsink; - - sink = GST_MULTI_SOCKET_SINK (bsink); - mhsink = GST_MULTI_HANDLE_SINK (sink); - - g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, - GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING); - -#if 0 - /* since we check every buffer for streamheader caps, we need to make - * sure every buffer has caps set */ - bufcaps = gst_buffer_get_caps (buf); - padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink)); - - /* make sure we have caps on the pad */ - if (!padcaps && !bufcaps) - goto no_caps; -#endif - - /* get HEADER first, code below might mess with the flags */ - in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER); - -#if 0 - /* stamp the buffer with previous caps if no caps set */ - if (!bufcaps) { - if (!gst_buffer_is_writable (buf)) { - /* metadata is not writable, copy will be made and original buffer - * will be unreffed so we need to ref so that we don't lose the - * buffer in the render method. */ - gst_buffer_ref (buf); - /* the new buffer is ours only, we keep it out of the scope of this - * function */ - buf = gst_buffer_make_writable (buf); - } else { - /* else the metadata is writable, we ref because we keep the buffer - * out of the scope of this method */ - gst_buffer_ref (buf); - } - /* buffer metadata is writable now, set the caps */ - gst_buffer_set_caps (buf, padcaps); - } else { - gst_caps_unref (bufcaps); - - /* since we keep this buffer out of the scope of this method */ - gst_buffer_ref (buf); - } -#endif - gst_buffer_ref (buf); - - GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %" - G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT - ", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT, - buf, in_caps ? "yes" : "no", GST_BUFFER_OFFSET (buf), - GST_BUFFER_OFFSET_END (buf), - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), - GST_TIME_ARGS (GST_BUFFER_DURATION (buf))); - - /* if we get HEADER buffers, but the previous buffer was not HEADER, - * it means we're getting new streamheader buffers, and we should clear - * the old ones */ - if (in_caps && sink->previous_buffer_in_caps == FALSE) { - GST_DEBUG_OBJECT (sink, - "receiving new HEADER buffers, clearing old streamheader"); - g_slist_foreach (mhsink->streamheader, (GFunc) gst_mini_object_unref, NULL); - g_slist_free (mhsink->streamheader); - mhsink->streamheader = NULL; - } - - /* save the current in_caps */ - sink->previous_buffer_in_caps = in_caps; - - /* if the incoming buffer is marked as IN CAPS, then we assume for now - * it's a streamheader that needs to be sent to each new client, so we - * put it on our internal list of streamheader buffers. - * FIXME: we could check if the buffer's contents are in fact part of the - * current streamheader. - * - * We don't send the buffer to the client, since streamheaders are sent - * separately when necessary. */ - if (in_caps) { - GST_DEBUG_OBJECT (sink, "appending HEADER buffer with length %" - G_GSIZE_FORMAT " to streamheader", gst_buffer_get_size (buf)); - mhsink->streamheader = g_slist_append (mhsink->streamheader, buf); - } else { - /* queue the buffer, this is a regular data buffer. */ - gst_multi_socket_sink_queue_buffer (sink, buf); - - mhsink->bytes_to_serve += gst_buffer_get_size (buf); - } - return GST_FLOW_OK; - - /* ERRORS */ -#if 0 -no_caps: - { - GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL), - ("Received first buffer without caps set")); - return GST_FLOW_NOT_NEGOTIATED; - } -#endif -} - static void gst_multi_socket_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) @@ -2201,20 +1973,8 @@ gst_multi_socket_sink_set_property (GObject * object, guint prop_id, multisocketsink = GST_MULTI_SOCKET_SINK (object); switch (prop_id) { - case PROP_BUFFERS_MAX: - multisocketsink->units_max = g_value_get_int (value); - break; - case PROP_BUFFERS_SOFT_MAX: - multisocketsink->units_soft_max = g_value_get_int (value); - break; - case PROP_UNIT_TYPE: - multisocketsink->unit_type = g_value_get_enum (value); - break; - case PROP_UNITS_MAX: - multisocketsink->units_max = g_value_get_int64 (value); - break; - case PROP_UNITS_SOFT_MAX: - multisocketsink->units_soft_max = g_value_get_int64 (value); + case PROP_UNIT_FORMAT: + multisocketsink->unit_format = g_value_get_enum (value); break; case PROP_BURST_FORMAT: multisocketsink->def_burst_format = g_value_get_enum (value); @@ -2222,10 +1982,6 @@ gst_multi_socket_sink_set_property (GObject * object, guint prop_id, case PROP_BURST_VALUE: multisocketsink->def_burst_value = g_value_get_uint64 (value); break; - case PROP_QOS_DSCP: - multisocketsink->qos_dscp = g_value_get_int (value); - setup_dscp (multisocketsink); - break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -2242,20 +1998,8 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id, multisocketsink = GST_MULTI_SOCKET_SINK (object); switch (prop_id) { - case PROP_BUFFERS_MAX: - g_value_set_int (value, multisocketsink->units_max); - break; - case PROP_BUFFERS_SOFT_MAX: - g_value_set_int (value, multisocketsink->units_soft_max); - break; - case PROP_UNIT_TYPE: - g_value_set_enum (value, multisocketsink->unit_type); - break; - case PROP_UNITS_MAX: - g_value_set_int64 (value, multisocketsink->units_max); - break; - case PROP_UNITS_SOFT_MAX: - g_value_set_int64 (value, multisocketsink->units_soft_max); + case PROP_UNIT_FORMAT: + g_value_set_enum (value, multisocketsink->unit_format); break; case PROP_BURST_FORMAT: g_value_set_enum (value, multisocketsink->def_burst_format); @@ -2263,9 +2007,6 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id, case PROP_BURST_VALUE: g_value_set_uint64 (value, multisocketsink->def_burst_value); break; - case PROP_QOS_DSCP: - g_value_set_int (value, multisocketsink->qos_dscp); - break; case PROP_NUM_SOCKETS: g_value_set_uint (value, g_hash_table_size (multisocketsink->socket_hash)); diff --git a/gst/tcp/gstmultisocketsink.h b/gst/tcp/gstmultisocketsink.h index fa8030a..d3e40d8 100644 --- a/gst/tcp/gstmultisocketsink.h +++ b/gst/tcp/gstmultisocketsink.h @@ -83,13 +83,10 @@ struct _GstMultiSocketSink { gboolean previous_buffer_in_caps; guint mtu; - gint qos_dscp; /* these values are used to check if a client is reading fast * enough and to control receovery */ - GstFormat unit_type;/* the format of the units */ - gint64 units_max; /* max units to queue for a client */ - gint64 units_soft_max; /* max units a client can lag before recovery starts */ + GstFormat unit_format;/* the format of the units */ GstFormat def_burst_format; guint64 def_burst_value; @@ -103,7 +100,7 @@ struct _GstMultiSocketSinkClass { /* element methods */ void (*add) (GstMultiSocketSink *sink, GSocket *socket); void (*add_full) (GstMultiSocketSink *sink, GSocket *socket, GstSyncMethod sync, - GstFormat format, guint64 value, + GstFormat format, guint64 value, GstFormat max_format, guint64 max_value); void (*remove) (GstMultiSocketSink *sink, GSocket *socket); void (*remove_flush) (GstMultiSocketSink *sink, GSocket *socket); diff --git a/tests/check/elements/multifdsink.c b/tests/check/elements/multifdsink.c index ace3c49..37cc5b1 100644 --- a/tests/check/elements/multifdsink.c +++ b/tests/check/elements/multifdsink.c @@ -26,6 +26,9 @@ #include +/* FIXME: remove this header once formats are refactored */ +#include "gst/tcp/gstmultifdsink.h" + static GstPad *mysrcpad; static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", @@ -457,7 +460,6 @@ GST_START_TEST (test_burst_client_bytes) int pfd1[2]; int pfd2[2]; int pfd3[2]; - gchar data[16]; gint i; guint buffers_queued; @@ -465,7 +467,7 @@ GST_START_TEST (test_burst_client_bytes) /* make sure we keep at least 100 bytes at all times */ g_object_set (sink, "bytes-min", 100, NULL); g_object_set (sink, "sync-method", 3, NULL); /* 3 = burst */ - g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */ + g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL); g_object_set (sink, "burst-value", (guint64) 80, NULL); fail_if (pipe (pfd1) == -1); @@ -505,38 +507,26 @@ GST_START_TEST (test_burst_client_bytes) /* now we should only read the last 5 buffers (5 * 16 = 80 bytes) */ GST_DEBUG ("Reading from client 1"); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000005", 16) == 0); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000006", 16) == 0); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000007", 16) == 0); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000008", 16) == 0); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000005"); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000006"); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000007"); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000008"); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000009"); /* second client only bursts 50 bytes = 4 buffers (we get 4 buffers since * the max alows it) */ GST_DEBUG ("Reading from client 2"); - fail_if (read (pfd2[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000006", 16) == 0); - fail_if (read (pfd2[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000007", 16) == 0); - fail_if (read (pfd2[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000008", 16) == 0); - fail_if (read (pfd2[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000006"); + fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000007"); + fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000008"); + fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000009"); /* third client only bursts 50 bytes = 4 buffers, we can't send * more than 50 bytes so we only get 3 buffers (48 bytes). */ GST_DEBUG ("Reading from client 3"); - fail_if (read (pfd3[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000007", 16) == 0); - fail_if (read (pfd3[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000008", 16) == 0); - fail_if (read (pfd3[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000007"); + fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000008"); + fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000009"); GST_DEBUG ("cleaning up multifdsink"); ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); @@ -556,15 +546,14 @@ GST_START_TEST (test_burst_client_bytes_keyframe) int pfd1[2]; int pfd2[2]; int pfd3[2]; - gchar data[16]; gint i; guint buffers_queued; sink = setup_multifdsink (); /* make sure we keep at least 100 bytes at all times */ g_object_set (sink, "bytes-min", 100, NULL); - g_object_set (sink, "sync-method", 4, NULL); /* 3 = burst_keyframe */ - g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */ + g_object_set (sink, "sync-method", 4, NULL); /* 4 = burst_keyframe */ + g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL); g_object_set (sink, "burst-value", (guint64) 80, NULL); fail_if (pipe (pfd1) == -1); @@ -574,8 +563,8 @@ GST_START_TEST (test_burst_client_bytes_keyframe) ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC); caps = gst_caps_from_string ("application/x-gst-check"); - GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps); gst_pad_set_caps (mysrcpad, caps); + GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps); /* push buffers in, 9 * 16 bytes = 144 bytes */ for (i = 0; i < 9; i++) { @@ -611,34 +600,24 @@ GST_START_TEST (test_burst_client_bytes_keyframe) /* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes), * keyframe at buffer 4 */ GST_DEBUG ("Reading from client 1"); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000004", 16) == 0); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000005", 16) == 0); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000006", 16) == 0); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000007", 16) == 0); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000008", 16) == 0); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000004"); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000005"); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000006"); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000007"); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000008"); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000009"); /* second client only bursts 50 bytes = 4 buffers, there is * no keyframe above min and below max, so get one below min */ GST_DEBUG ("Reading from client 2"); - fail_if (read (pfd2[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000008", 16) == 0); - fail_if (read (pfd2[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000008"); + fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000009"); /* third client only bursts 50 bytes = 4 buffers, we can't send * more than 50 bytes so we only get 2 buffers (32 bytes). */ GST_DEBUG ("Reading from client 3"); - fail_if (read (pfd3[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000008", 16) == 0); - fail_if (read (pfd3[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000008"); + fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000009"); GST_DEBUG ("cleaning up multifdsink"); ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); @@ -658,15 +637,14 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe) int pfd1[2]; int pfd2[2]; int pfd3[2]; - gchar data[16]; gint i; guint buffers_queued; sink = setup_multifdsink (); /* make sure we keep at least 100 bytes at all times */ g_object_set (sink, "bytes-min", 100, NULL); - g_object_set (sink, "sync-method", 5, NULL); /* 3 = burst_with_keyframe */ - g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */ + g_object_set (sink, "sync-method", 5, NULL); /* 5 = burst_with_keyframe */ + g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL); g_object_set (sink, "burst-value", (guint64) 80, NULL); fail_if (pipe (pfd1) == -1); @@ -713,40 +691,27 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe) /* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes), * keyframe at buffer 4 */ GST_DEBUG ("Reading from client 1"); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000004", 16) == 0); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000005", 16) == 0); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000006", 16) == 0); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000007", 16) == 0); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000008", 16) == 0); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000004"); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000005"); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000006"); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000007"); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000008"); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000009"); /* second client only bursts 50 bytes = 4 buffers, there is * no keyframe above min and below max, so send min */ GST_DEBUG ("Reading from client 2"); - fail_if (read (pfd2[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000006", 16) == 0); - fail_if (read (pfd2[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000007", 16) == 0); - fail_if (read (pfd2[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000008", 16) == 0); - fail_if (read (pfd2[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000006"); + fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000007"); + fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000008"); + fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000009"); /* third client only bursts 50 bytes = 4 buffers, we can't send * more than 50 bytes so we only get 3 buffers (48 bytes). */ GST_DEBUG ("Reading from client 3"); - fail_if (read (pfd3[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000007", 16) == 0); - fail_if (read (pfd3[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000008", 16) == 0); - fail_if (read (pfd3[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000007"); + fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000008"); + fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000009"); GST_DEBUG ("cleaning up multifdsink"); ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); @@ -765,7 +730,6 @@ GST_START_TEST (test_client_next_keyframe) GstElement *sink; GstCaps *caps; int pfd1[2]; - gchar data[16]; gint i; sink = setup_multifdsink (); @@ -793,10 +757,8 @@ GST_START_TEST (test_client_next_keyframe) /* now we should be able to read some data */ GST_DEBUG ("Reading from client 1"); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000000", 16) == 0); - fail_if (read (pfd1[0], data, 16) < 16); - fail_unless (strncmp (data, "deadbee00000001", 16) == 0); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000000"); + fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000001"); GST_DEBUG ("cleaning up multifdsink"); ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); diff --git a/tests/check/elements/multisocketsink.c b/tests/check/elements/multisocketsink.c index 293d848..cbd9789 100644 --- a/tests/check/elements/multisocketsink.c +++ b/tests/check/elements/multisocketsink.c @@ -596,7 +596,7 @@ GST_START_TEST (test_burst_client_bytes_keyframe) sink = setup_multisocketsink (); /* make sure we keep at least 100 bytes at all times */ g_object_set (sink, "bytes-min", 100, NULL); - g_object_set (sink, "sync-method", 4, NULL); /* 3 = burst_keyframe */ + g_object_set (sink, "sync-method", 4, NULL); /* 4 = burst_keyframe */ g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL); g_object_set (sink, "burst-value", (guint64) 80, NULL); @@ -700,7 +700,7 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe) /* make sure we keep at least 100 bytes at all times */ g_object_set (sink, "bytes-min", 100, NULL); - g_object_set (sink, "sync-method", 5, NULL); /* 3 = burst_with_keyframe */ + g_object_set (sink, "sync-method", 5, NULL); /* 5 = burst_with_keyframe */ g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL); g_object_set (sink, "burst-value", (guint64) 80, NULL);