From 17254bbd23c5e62c164628c4d02731d464bf75f9 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 16 Jan 2012 10:08:24 +0100 Subject: [PATCH] tcpserversink: Port to GIO And change multifdsink to GIO too and rename it to multisocketsink because it only works on GSockets now, not generic fds. --- docs/plugins/Makefile.am | 2 +- gst/tcp/Makefile.am | 5 +- gst/tcp/{gstmultifdsink.c => gstmultisocketsink.c} | 1599 +++++++++----------- gst/tcp/{gstmultifdsink.h => gstmultisocketsink.h} | 149 +- gst/tcp/gsttcp-marshal.list | 7 +- gst/tcp/gsttcp.h | 1 - gst/tcp/gsttcpplugin.c | 6 +- gst/tcp/gsttcpplugin.h | 40 - gst/tcp/gsttcpserversink.c | 297 ++-- gst/tcp/gsttcpserversink.h | 28 +- 10 files changed, 945 insertions(+), 1189 deletions(-) rename gst/tcp/{gstmultifdsink.c => gstmultisocketsink.c} (59%) rename gst/tcp/{gstmultifdsink.h => gstmultisocketsink.h} (61%) delete mode 100644 gst/tcp/gsttcpplugin.h diff --git a/docs/plugins/Makefile.am b/docs/plugins/Makefile.am index 60111f3..1d4355d 100644 --- a/docs/plugins/Makefile.am +++ b/docs/plugins/Makefile.am @@ -82,7 +82,7 @@ EXTRA_HFILES = \ $(top_srcdir)/gst/playback/gstsubtitleoverlay.h \ $(top_srcdir)/gst/audiorate/gstaudiorate.h \ $(top_srcdir)/gst/audioresample/gstaudioresample.h \ - $(top_srcdir)/gst/tcp/gstmultifdsink.h \ + $(top_srcdir)/gst/tcp/gstmultisocketsink.h \ $(top_srcdir)/gst/tcp/gsttcpclientsrc.h \ $(top_srcdir)/gst/tcp/gsttcpclientsink.h \ $(top_srcdir)/gst/tcp/gsttcpserversrc.h \ diff --git a/gst/tcp/Makefile.am b/gst/tcp/Makefile.am index d51f6ec..40e1913 100644 --- a/gst/tcp/Makefile.am +++ b/gst/tcp/Makefile.am @@ -16,8 +16,8 @@ BUILT_SOURCES = $(built_sources) $(built_headers) libgsttcp_la_SOURCES = \ gsttcpplugin.c \ gsttcp.c \ - gstmultifdsink.c \ gsttcpclientsrc.c gsttcpclientsink.c \ + gstmultisocketsink.c \ gsttcpserversrc.c gsttcpserversink.c nodist_libgsttcp_la_SOURCES = \ @@ -29,10 +29,9 @@ libgsttcp_la_LIBADD = $(GST_BASE_LIBS) $(GST_LIBS) $(GIO_LIBS) libgsttcp_la_LIBTOOLFLAGS = --tag=disable-static noinst_HEADERS = \ - gsttcpplugin.h \ gsttcp.h \ - gstmultifdsink.h \ gsttcpclientsrc.h gsttcpclientsink.h \ + gstmultisocketsink.h \ gsttcpserversrc.h gsttcpserversink.h CLEANFILES = $(BUILT_SOURCES) diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultisocketsink.c similarity index 59% rename from gst/tcp/gstmultifdsink.c rename to gst/tcp/gstmultisocketsink.c index 6f72a08..157f332 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultisocketsink.c @@ -2,6 +2,8 @@ * Copyright (C) <1999> Erik Walthinsen * Copyright (C) <2004> Thomas Vander Stichele * Copyright (C) 2006 Wim Taymans + * Copyright (C) <2011> Collabora Ltd. + * Author: Sebastian Dröge * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -20,79 +22,79 @@ */ /** - * SECTION:element-multifdsink + * SECTION:element-multisocketsink * @see_also: tcpserversink * * This plugin writes incoming data to a set of file descriptors. The - * file descriptors can be added to multifdsink by emitting the #GstMultiFdSink::add signal. - * For each descriptor added, the #GstMultiFdSink::client-added signal will be called. + * file descriptors can be added to multisocketsink by emitting the #GstMultiSocketSink::add signal. + * For each descriptor added, the #GstMultiSocketSink::client-added signal will be called. * - * As of version 0.10.8, a client can also be added with the #GstMultiFdSink::add-full signal + * As of version 0.10.8, a client can also be added with the #GstMultiSocketSink::add-full signal * that allows for more control over what and how much data a client * initially receives. * - * Clients can be removed from multifdsink by emitting the #GstMultiFdSink::remove signal. For - * each descriptor removed, the #GstMultiFdSink::client-removed signal will be called. The - * #GstMultiFdSink::client-removed signal can also be fired when multifdsink decides that a + * Clients can be removed from multisocketsink by emitting the #GstMultiSocketSink::remove signal. For + * each descriptor removed, the #GstMultiSocketSink::client-removed signal will be called. The + * #GstMultiSocketSink::client-removed signal can also be fired when multisocketsink decides that a * client is not active anymore or, depending on the value of the - * #GstMultiFdSink:recover-policy property, if the client is reading too slowly. - * In all cases, multifdsink will never close a file descriptor itself. - * The user of multifdsink is responsible for closing all file descriptors. - * This can for example be done in response to the #GstMultiFdSink::client-fd-removed signal. - * Note that multifdsink still has a reference to the file descriptor when the - * #GstMultiFdSink::client-removed signal is emitted, so that "get-stats" can be performed on + * #GstMultiSocketSink:recover-policy property, if the client is reading too slowly. + * In all cases, multisocketsink will never close a file descriptor itself. + * The user of multisocketsink is responsible for closing all file descriptors. + * This can for example be done in response to the #GstMultiSocketSink::client-fd-removed signal. + * Note that multisocketsink still has a reference to the file descriptor when the + * #GstMultiSocketSink::client-removed signal is emitted, so that "get-stats" can be performed on * the descriptor; it is therefore not safe to close the file descriptor in - * the #GstMultiFdSink::client-removed signal handler, and you should use the - * #GstMultiFdSink::client-fd-removed signal to safely close the fd. + * the #GstMultiSocketSink::client-removed signal handler, and you should use the + * #GstMultiSocketSink::client-fd-removed signal to safely close the fd. * - * Multifdsink internally keeps a queue of the incoming buffers and uses a + * Multisocketsink internally keeps a queue of the incoming buffers and uses a * separate thread to send the buffers to the clients. This ensures that no * client write can block the pipeline and that clients can read with different * speeds. * - * When adding a client to multifdsink, the #GstMultiFdSink:sync-method property will define + * When adding a client to multisocketsink, the #GstMultiSocketSink:sync-method property will define * which buffer in the queued buffers will be sent first to the client. Clients * can be sent the most recent buffer (which might not be decodable by the * client if it is not a keyframe), the next keyframe received in - * multifdsink (which can take some time depending on the keyframe rate), or the + * multisocketsink (which can take some time depending on the keyframe rate), or the * last received keyframe (which will cause a simple burst-on-connect). - * Multifdsink will always keep at least one keyframe in its internal buffers + * Multisocketsink will always keep at least one keyframe in its internal buffers * when the sync-mode is set to latest-keyframe. * - * As of version 0.10.8, there are additional values for the #GstMultiFdSink:sync-method + * As of version 0.10.8, there are additional values for the #GstMultiSocketSink:sync-method * property to allow finer control over burst-on-connect behaviour. By selecting * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe' * additionally requires that the burst begin with a keyframe, and * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will * prefer a minimum burst size even if it requires not starting with a keyframe. * - * Multifdsink can be instructed to keep at least a minimum amount of data + * Multisocketsink can be instructed to keep at least a minimum amount of data * expressed in time or byte units in its internal queues with the - * #GstMultiFdSink:time-min and #GstMultiFdSink:bytes-min properties respectively. + * #GstMultiSocketSink:time-min and #GstMultiSocketSink:bytes-min properties respectively. * These properties are useful if the application adds clients with the - * #GstMultiFdSink::add-full signal to make sure that a burst connect can + * #GstMultiSocketSink::add-full signal to make sure that a burst connect can * actually be honored. * * When streaming data, clients are allowed to read at a different rate than - * the rate at which multifdsink receives data. If the client is reading too - * fast, no data will be send to the client until multifdsink receives more + * the rate at which multisocketsink receives data. If the client is reading too + * fast, no data will be send to the client until multisocketsink receives more * data. If the client, however, reads too slowly, data for that client will be - * queued up in multifdsink. Two properties control the amount of data - * (buffers) that is queued in multifdsink: #GstMultiFdSink:buffers-max and - * #GstMultiFdSink:buffers-soft-max. A client that falls behind by - * #GstMultiFdSink:buffers-max is removed from multifdsink forcibly. + * queued up in multisocketsink. Two properties control the amount of data + * (buffers) that is queued in multisocketsink: #GstMultiSocketSink:buffers-max and + * #GstMultiSocketSink:buffers-soft-max. A client that falls behind by + * #GstMultiSocketSink:buffers-max is removed from multisocketsink forcibly. * - * A client with a lag of at least #GstMultiFdSink:buffers-soft-max enters the recovery - * procedure which is controlled with the #GstMultiFdSink:recover-policy property. + * A client with a lag of at least #GstMultiSocketSink:buffers-soft-max enters the recovery + * procedure which is controlled with the #GstMultiSocketSink:recover-policy property. * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT * positions the client to the soft limit in the buffer queue and * RESYNC_KEYFRAME positions the client at the most recent keyframe in the * buffer queue. * - * multifdsink will by default synchronize on the clock before serving the + * multisocketsink will by default synchronize on the clock before serving the * buffers to the clients. This behaviour can be disabled by setting the sync - * property to FALSE. Multifdsink will by default not do QoS and will never + * property to FALSE. Multisocketsink will by default not do QoS and will never * drop late buffers. * * Last reviewed on 2006-09-12 (0.10.10) @@ -108,23 +110,7 @@ #include -#include - -#ifdef HAVE_UNISTD_H -#include -#endif - -#include -#include -#include -#include -#include - -#ifdef HAVE_FIONREAD_IN_SYS_FILIO -#include -#endif - -#include "gstmultifdsink.h" +#include "gstmultisocketsink.h" #include "gsttcp-marshal.h" #define NOT_IMPLEMENTED 0 @@ -134,10 +120,10 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); -GST_DEBUG_CATEGORY_STATIC (multifdsink_debug); -#define GST_CAT_DEFAULT (multifdsink_debug) +GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug); +#define GST_CAT_DEFAULT (multisocketsink_debug) -/* MultiFdSink signals and args */ +/* MultiSocketSink signals and args */ enum { /* methods */ @@ -151,7 +137,7 @@ enum /* signals */ SIGNAL_CLIENT_ADDED, SIGNAL_CLIENT_REMOVED, - SIGNAL_CLIENT_FD_REMOVED, + SIGNAL_CLIENT_SOCKET_REMOVED, LAST_SIGNAL }; @@ -164,17 +150,16 @@ enum #define DEFAULT_TIME_MIN -1 #define DEFAULT_BYTES_MIN -1 #define DEFAULT_BUFFERS_MIN -1 -#define DEFAULT_UNIT_TYPE GST_TCP_UNIT_TYPE_BUFFERS +#define DEFAULT_UNIT_TYPE GST_FORMAT_BUFFERS #define DEFAULT_UNITS_MAX -1 #define DEFAULT_UNITS_SOFT_MAX -1 #define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE #define DEFAULT_TIMEOUT 0 #define DEFAULT_SYNC_METHOD GST_SYNC_METHOD_LATEST -#define DEFAULT_BURST_UNIT GST_TCP_UNIT_TYPE_UNDEFINED +#define DEFAULT_BURST_FORMAT GST_FORMAT_UNDEFINED #define DEFAULT_BURST_VALUE 0 -#define DEFAULT_QOS_DSCP -1 #define DEFAULT_HANDLE_READ TRUE #define DEFAULT_RESEND_STREAMHEADER TRUE @@ -204,40 +189,18 @@ enum PROP_BYTES_TO_SERVE, PROP_BYTES_SERVED, - PROP_BURST_UNIT, + PROP_BURST_FORMAT, PROP_BURST_VALUE, - PROP_QOS_DSCP, - PROP_HANDLE_READ, PROP_RESEND_STREAMHEADER, - PROP_NUM_FDS, + PROP_NUM_SOCKETS, PROP_LAST }; -/* For backward compat, we can't really select the poll mode anymore with - * GstPoll. */ -#define GST_TYPE_FDSET_MODE (gst_fdset_mode_get_type()) -static GType -gst_fdset_mode_get_type (void) -{ - static GType fdset_mode_type = 0; - static const GEnumValue fdset_mode[] = { - {0, "Select", "select"}, - {1, "Poll", "poll"}, - {2, "EPoll", "epoll"}, - {0, NULL, NULL}, - }; - - if (!fdset_mode_type) { - fdset_mode_type = g_enum_register_static ("GstFDSetMode", fdset_mode); - } - return fdset_mode_type; -} - #define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type()) static GType gst_recover_policy_get_type (void) @@ -291,25 +254,6 @@ gst_sync_method_get_type (void) return sync_method_type; } -#define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type()) -static GType -gst_unit_type_get_type (void) -{ - static GType unit_type_type = 0; - static const GEnumValue unit_type[] = { - {GST_TCP_UNIT_TYPE_UNDEFINED, "Undefined", "undefined"}, - {GST_TCP_UNIT_TYPE_BUFFERS, "Buffers", "buffers"}, - {GST_TCP_UNIT_TYPE_BYTES, "Bytes", "bytes"}, - {GST_TCP_UNIT_TYPE_TIME, "Time", "time"}, - {0, NULL, NULL}, - }; - - if (!unit_type_type) { - unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type); - } - return unit_type_type; -} - #define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type()) static GType gst_client_status_get_type (void) @@ -333,28 +277,32 @@ gst_client_status_get_type (void) return client_status_type; } -static void gst_multi_fd_sink_finalize (GObject * object); +static void gst_multi_socket_sink_finalize (GObject * object); -static void gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, +static void gst_multi_socket_sink_remove_client_link (GstMultiSocketSink * sink, GList * link); +static gboolean gst_multi_socket_sink_socket_condition (GSocket * socket, + GIOCondition condition, GstMultiSocketSink * sink); -static GstFlowReturn gst_multi_fd_sink_render (GstBaseSink * bsink, +static GstFlowReturn gst_multi_socket_sink_render (GstBaseSink * bsink, GstBuffer * buf); -static GstStateChangeReturn gst_multi_fd_sink_change_state (GstElement * +static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink); +static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink); +static GstStateChangeReturn gst_multi_socket_sink_change_state (GstElement * element, GstStateChange transition); -static void gst_multi_fd_sink_set_property (GObject * object, guint prop_id, +static void gst_multi_socket_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); -static void gst_multi_fd_sink_get_property (GObject * object, guint prop_id, +static void gst_multi_socket_sink_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -#define gst_multi_fd_sink_parent_class parent_class -G_DEFINE_TYPE (GstMultiFdSink, gst_multi_fd_sink, GST_TYPE_BASE_SINK); +#define gst_multi_socket_sink_parent_class parent_class +G_DEFINE_TYPE (GstMultiSocketSink, gst_multi_socket_sink, GST_TYPE_BASE_SINK); -static guint gst_multi_fd_sink_signals[LAST_SIGNAL] = { 0 }; +static guint gst_multi_socket_sink_signals[LAST_SIGNAL] = { 0 }; static void -gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) +gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass) { GObjectClass *gobject_class; GstElementClass *gstelement_class; @@ -364,23 +312,9 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) gstelement_class = (GstElementClass *) klass; gstbasesink_class = (GstBaseSinkClass *) klass; - gobject_class->set_property = gst_multi_fd_sink_set_property; - gobject_class->get_property = gst_multi_fd_sink_get_property; - gobject_class->finalize = gst_multi_fd_sink_finalize; - - /** - * GstMultiFdSink::mode - * - * The mode for selecting activity on the fds. - * - * This property is deprecated since 0.10.18, if will now automatically - * select and use the most optimal method. - */ - g_object_class_install_property (gobject_class, PROP_MODE, - g_param_spec_enum ("mode", "Mode", - "The mode for selecting activity on the fds (deprecated)", - GST_TYPE_FDSET_MODE, DEFAULT_MODE, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gobject_class->set_property = gst_multi_socket_sink_set_property; + gobject_class->get_property = gst_multi_socket_sink_get_property; + gobject_class->finalize = gst_multi_socket_sink_finalize; g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX, g_param_spec_int ("buffers-max", "Buffers max", @@ -412,7 +346,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) g_object_class_install_property (gobject_class, PROP_UNIT_TYPE, g_param_spec_enum ("unit-type", "Units type", "The unit to measure the max/soft-max/queued properties", - GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, + GST_TYPE_FORMAT, DEFAULT_UNIT_TYPE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_UNITS_MAX, g_param_spec_int64 ("units-max", "Units max", @@ -462,23 +396,18 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) "Total number of bytes send to all clients", 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BURST_UNIT, - g_param_spec_enum ("burst-unit", "Burst unit", + g_object_class_install_property (gobject_class, PROP_BURST_FORMAT, + g_param_spec_enum ("burst-format", "Burst format", "The format of the burst units (when sync-method is burst[[-with]-keyframe])", - GST_TYPE_UNIT_TYPE, DEFAULT_BURST_UNIT, + GST_TYPE_FORMAT, DEFAULT_BURST_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_BURST_VALUE, g_param_spec_uint64 ("burst-value", "Burst value", "The amount of burst expressed in burst-unit", 0, G_MAXUINT64, DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_QOS_DSCP, - g_param_spec_int ("qos-dscp", "QoS diff srv code point", - "Quality of Service, differentiated services code point (-1 default)", - -1, 63, DEFAULT_QOS_DSCP, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** - * GstMultiFdSink::handle-read + * GstMultiSocketSink::handle-read * * Handle read requests from clients and discard the data. * @@ -489,7 +418,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) "Handle client reads and discard the data", DEFAULT_HANDLE_READ, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** - * GstMultiFdSink::resend-streamheader + * GstMultiSocketSink::resend-streamheader * * Resend the streamheaders to existing clients when they change. * @@ -501,192 +430,195 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) DEFAULT_RESEND_STREAMHEADER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_NUM_FDS, - g_param_spec_uint ("num-fds", "Number of fds", - "The current number of client file descriptors.", + g_object_class_install_property (gobject_class, PROP_NUM_SOCKETS, + g_param_spec_uint ("num-sockets", "Number of sockets", + "The current number of client sockets", 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); /** - * GstMultiFdSink::add: - * @gstmultifdsink: the multifdsink element to emit this signal on - * @fd: the file descriptor to add to multifdsink + * GstMultiSocketSink::add: + * @gstmultisocketsink: the multisocketsink element to emit this signal on + * @socket: the socket to add to multisocketsink * - * Hand the given open file descriptor to multifdsink to write to. + * Hand the given open socket to multisocketsink to write to. */ - gst_multi_fd_sink_signals[SIGNAL_ADD] = + gst_multi_socket_sink_signals[SIGNAL_ADD] = g_signal_new ("add", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass, - add), NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, - G_TYPE_INT); + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (GstMultiSocketSinkClass, add), NULL, NULL, + g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET); /** - * GstMultiFdSink::add-full: - * @gstmultifdsink: the multifdsink element to emit this signal on - * @fd: the file descriptor to add to multifdsink + * GstMultiSocketSink::add-full: + * @gstmultisocketsink: the multisocketsink element to emit this signal on + * @socket: the socket to add to multisocketsink * @sync: the sync method to use - * @unit_type_min: the unit-type of @value_min + * @format_min: the format of @value_min * @value_min: the minimum amount of data to burst expressed in - * @unit_type_min units. - * @unit_type_max: the unit-type of @value_max + * @format_min units. + * @format_max: the format of @value_max * @value_max: the maximum amount of data to burst expressed in - * @unit_type_max units. + * @format_max units. * - * Hand the given open file descriptor to multifdsink to write to and + * Hand the given open socket to multisocketsink to write to and * specify the burst parameters for the new connection. */ - gst_multi_fd_sink_signals[SIGNAL_ADD_BURST] = + gst_multi_socket_sink_signals[SIGNAL_ADD_BURST] = g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass, - add_full), NULL, NULL, - gst_tcp_marshal_VOID__INT_ENUM_INT_UINT64_INT_UINT64, G_TYPE_NONE, 6, - G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_TYPE, G_TYPE_UINT64, - GST_TYPE_UNIT_TYPE, G_TYPE_UINT64); + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (GstMultiSocketSinkClass, add_full), NULL, NULL, + gst_tcp_marshal_VOID__OBJECT_ENUM_ENUM_UINT64_ENUM_UINT64, G_TYPE_NONE, 6, + G_TYPE_SOCKET, GST_TYPE_SYNC_METHOD, GST_TYPE_FORMAT, G_TYPE_UINT64, + GST_TYPE_FORMAT, G_TYPE_UINT64); /** - * GstMultiFdSink::remove: - * @gstmultifdsink: the multifdsink element to emit this signal on - * @fd: the file descriptor to remove from multifdsink + * GstMultiSocketSink::remove: + * @gstmultisocketsink: the multisocketsink element to emit this signal on + * @socket: the socket to remove from multisocketsink * - * Remove the given open file descriptor from multifdsink. + * Remove the given open socket from multisocketsink. */ - gst_multi_fd_sink_signals[SIGNAL_REMOVE] = + gst_multi_socket_sink_signals[SIGNAL_REMOVE] = g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass, - remove), NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, - G_TYPE_INT); + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove), NULL, NULL, + g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET); /** - * GstMultiFdSink::remove-flush: - * @gstmultifdsink: the multifdsink element to emit this signal on - * @fd: the file descriptor to remove from multifdsink + * GstMultiSocketSink::remove-flush: + * @gstmultisocketsink: the multisocketsink element to emit this signal on + * @socket: the socket to remove from multisocketsink * - * Remove the given open file descriptor from multifdsink after flushing all - * the pending data to the fd. + * Remove the given open socket from multisocketsink after flushing all + * the pending data to the socket. */ - gst_multi_fd_sink_signals[SIGNAL_REMOVE_FLUSH] = + gst_multi_socket_sink_signals[SIGNAL_REMOVE_FLUSH] = g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass, - remove_flush), NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, - G_TYPE_INT); + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove_flush), NULL, NULL, + g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET); /** - * GstMultiFdSink::clear: - * @gstmultifdsink: the multifdsink element to emit this signal on + * GstMultiSocketSink::clear: + * @gstmultisocketsink: the multisocketsink element to emit this signal on * - * Remove all file descriptors from multifdsink. Since multifdsink did not - * open fd's itself, it does not explicitly close the fd. The application - * should do so by connecting to the client-fd-removed callback. + * Remove all sockets from multisocketsink. Since multisocketsink did not + * open sockets itself, it does not explicitly close the sockets. The application + * should do so by connecting to the client-socket-removed callback. */ - gst_multi_fd_sink_signals[SIGNAL_CLEAR] = + gst_multi_socket_sink_signals[SIGNAL_CLEAR] = g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass, - clear), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (GstMultiSocketSinkClass, clear), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); /** - * GstMultiFdSink::get-stats: - * @gstmultifdsink: the multifdsink element to emit this signal on - * @fd: the file descriptor to get stats of from multifdsink + * GstMultiSocketSink::get-stats: + * @gstmultisocketsink: the multisocketsink element to emit this signal on + * @socket: the socket to get stats of from multisocketsink * - * Get statistics about @fd. This function returns a GValueArray to ease - * automatic wrapping for bindings. + * Get statistics about @socket. This function returns a GstStructure. * - * Returns: a GValueArray with the statistics. The array contains guint64 - * values that represent respectively: total number of bytes sent, time + * Returns: a GstStructure with the statistics. The structure contains + * values that represent: total number of bytes sent, time * when the client was added, time when the client was * disconnected/removed, time the client is/was active, last activity * time (in epoch seconds), number of buffers dropped. * All times are expressed in nanoseconds (GstClockTime). - * The array can be 0-length if the client was not found. */ - gst_multi_fd_sink_signals[SIGNAL_GET_STATS] = + gst_multi_socket_sink_signals[SIGNAL_GET_STATS] = g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass, - get_stats), NULL, NULL, gst_tcp_marshal_BOXED__INT, - G_TYPE_VALUE_ARRAY, 1, G_TYPE_INT); + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (GstMultiSocketSinkClass, get_stats), NULL, NULL, + gst_tcp_marshal_BOXED__OBJECT, GST_TYPE_STRUCTURE, 1, G_TYPE_SOCKET); /** - * GstMultiFdSink::client-added: - * @gstmultifdsink: the multifdsink element that emitted this signal - * @fd: the file descriptor that was added to multifdsink + * GstMultiSocketSink::client-added: + * @gstmultisocketsink: the multisocketsink element that emitted this signal + * @socket: the socket that was added to multisocketsink * - * The given file descriptor was added to multifdsink. This signal will + * The given socket was added to multisocketsink. This signal will * be emitted from the streaming thread so application should be prepared * for that. */ - gst_multi_fd_sink_signals[SIGNAL_CLIENT_ADDED] = + gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED] = g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, client_added), - NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiSocketSinkClass, + client_added), NULL, NULL, g_cclosure_marshal_VOID__OBJECT, + G_TYPE_NONE, 1, G_TYPE_OBJECT); /** - * GstMultiFdSink::client-removed: - * @gstmultifdsink: the multifdsink element that emitted this signal - * @fd: the file descriptor that is to be removed from multifdsink - * @status: the reason why the client was removed + * GstMultiSocketSink::client-removed: + * @gstmultisocketsink: the multisocketsink element that emitted this signal + * @socket: the socket that is to be removed from multisocketsink + * @status: the reason why the client was removed * - * The given file descriptor is about to be removed from multifdsink. This + * The given socket is about to be removed from multisocketsink. This * signal will be emitted from the streaming thread so applications should * be prepared for that. * - * @gstmultifdsink still holds a handle to @fd so it is possible to call + * @gstmultisocketsink still holds a handle to @socket so it is possible to call * the get-stats signal from this callback. For the same reason it is - * not safe to close() and reuse @fd in this callback. + * not safe to close() and reuse @socket in this callback. */ - gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED] = + gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED] = g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, - client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_BOXED, + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiSocketSinkClass, + client_removed), NULL, NULL, gst_tcp_marshal_VOID__OBJECT_ENUM, G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS); /** - * GstMultiFdSink::client-fd-removed: - * @gstmultifdsink: the multifdsink element that emitted this signal - * @fd: the file descriptor that was removed from multifdsink + * GstMultiSocketSink::client-socket-removed: + * @gstmultisocketsink: the multisocketsink element that emitted this signal + * @socket: the socket that was removed from multisocketsink * - * The given file descriptor was removed from multifdsink. This signal will + * The given socket was removed from multisocketsink. This signal will * be emitted from the streaming thread so applications should be prepared * for that. * - * In this callback, @gstmultifdsink has removed all the information - * associated with @fd and it is therefore not possible to call get-stats - * with @fd. It is however safe to close() and reuse @fd in the callback. + * In this callback, @gstmultisocketsink has removed all the information + * associated with @socket and it is therefore not possible to call get-stats + * with @socket. It is however safe to close() and reuse @fd in the callback. * * Since: 0.10.7 */ - gst_multi_fd_sink_signals[SIGNAL_CLIENT_FD_REMOVED] = - g_signal_new ("client-fd-removed", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, - client_fd_removed), NULL, NULL, gst_tcp_marshal_VOID__INT, - G_TYPE_NONE, 1, G_TYPE_INT); + gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED] = + g_signal_new ("client-socket-removed", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiSocketSinkClass, + client_socket_removed), NULL, NULL, g_cclosure_marshal_VOID__OBJECT, + G_TYPE_NONE, 1, G_TYPE_SOCKET); gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&sinktemplate)); gst_element_class_set_details_simple (gstelement_class, - "Multi filedescriptor sink", "Sink/Network", - "Send data to multiple filedescriptors", + "Multi socket sink", "Sink/Network", + "Send data to multiple sockets", "Thomas Vander Stichele , " - "Wim Taymans "); + "Wim Taymans , " + "Sebastian Dröge "); gstelement_class->change_state = - GST_DEBUG_FUNCPTR (gst_multi_fd_sink_change_state); - - gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_render); - - klass->add = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add); - klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add_full); - klass->remove = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove); - klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_flush); - klass->clear = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear); - klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_get_stats); - - GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink"); + GST_DEBUG_FUNCPTR (gst_multi_socket_sink_change_state); + + gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_render); + gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock); + gstbasesink_class->unlock_stop = + GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop); + + klass->add = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add); + klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add_full); + klass->remove = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove); + klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_flush); + klass->clear = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_clear); + klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_get_stats); + + GST_DEBUG_CATEGORY_INIT (multisocketsink_debug, "multisocketsink", 0, + "Multi socket sink"); } static void -gst_multi_fd_sink_init (GstMultiFdSink * this) +gst_multi_socket_sink_init (GstMultiSocketSink * this) { - GST_OBJECT_FLAG_UNSET (this, GST_MULTI_FD_SINK_OPEN); - - this->mode = DEFAULT_MODE; + GST_OBJECT_FLAG_UNSET (this, GST_MULTI_SOCKET_SINK_OPEN); CLIENTS_LOCK_INIT (this); this->clients = NULL; - this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal); + this->socket_hash = g_hash_table_new (g_direct_hash, g_int_equal); this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *)); this->unit_type = DEFAULT_UNIT_TYPE; @@ -699,135 +631,60 @@ gst_multi_fd_sink_init (GstMultiFdSink * this) this->timeout = DEFAULT_TIMEOUT; this->def_sync_method = DEFAULT_SYNC_METHOD; - this->def_burst_unit = DEFAULT_BURST_UNIT; + this->def_burst_format = DEFAULT_BURST_FORMAT; this->def_burst_value = DEFAULT_BURST_VALUE; - this->qos_dscp = DEFAULT_QOS_DSCP; this->handle_read = DEFAULT_HANDLE_READ; this->resend_streamheader = DEFAULT_RESEND_STREAMHEADER; this->header_flags = 0; + this->cancellable = g_cancellable_new (); } static void -gst_multi_fd_sink_finalize (GObject * object) +gst_multi_socket_sink_finalize (GObject * object) { - GstMultiFdSink *this; + GstMultiSocketSink *this; - this = GST_MULTI_FD_SINK (object); + this = GST_MULTI_SOCKET_SINK (object); CLIENTS_LOCK_FREE (this); - g_hash_table_destroy (this->fd_hash); + g_hash_table_destroy (this->socket_hash); g_array_free (this->bufqueue, TRUE); - G_OBJECT_CLASS (parent_class)->finalize (object); -} - -static gint -setup_dscp_client (GstMultiFdSink * sink, GstTCPClient * client) -{ - gint tos; - gint ret; - union gst_sockaddr - { - struct sockaddr sa; - struct sockaddr_in6 sa_in6; - struct sockaddr_storage sa_stor; - } sa; - socklen_t slen = sizeof (sa); - gint af; - - /* don't touch */ - if (sink->qos_dscp < 0) - return 0; - - if ((ret = getsockname (client->fd.fd, &sa.sa, &slen)) < 0) { - GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno)); - return ret; - } - - af = sa.sa.sa_family; - - /* if this is an IPv4-mapped address then do IPv4 QoS */ - if (af == AF_INET6) { - - GST_DEBUG_OBJECT (sink, "check IP6 socket"); - if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) { - GST_DEBUG_OBJECT (sink, "mapped to IPV4"); - af = AF_INET; - } - } - - /* extract and shift 6 bits of the DSCP */ - tos = (sink->qos_dscp & 0x3f) << 2; - - switch (af) { - case AF_INET: - ret = setsockopt (client->fd.fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)); - break; - case AF_INET6: -#ifdef IPV6_TCLASS - ret = - setsockopt (client->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, - sizeof (tos)); - break; -#endif - default: - ret = 0; - GST_ERROR_OBJECT (sink, "unsupported AF"); - break; + if (this->cancellable) { + g_object_unref (this->cancellable); + this->cancellable = NULL; } - if (ret) - GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno)); - - return ret; -} - -static void -setup_dscp (GstMultiFdSink * sink) -{ - GList *clients, *next; - - CLIENTS_LOCK (sink); - for (clients = sink->clients; clients; clients = next) { - GstTCPClient *client; - - client = (GstTCPClient *) clients->data; - next = g_list_next (clients); - - setup_dscp_client (sink, client); - } - CLIENTS_UNLOCK (sink); + G_OBJECT_CLASS (parent_class)->finalize (object); } /* "add-full" signal implementation */ void -gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, - GstSyncMethod sync_method, GstTCPUnitType min_unit, guint64 min_value, - GstTCPUnitType max_unit, guint64 max_value) +gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket, + GstSyncMethod sync_method, GstFormat min_format, guint64 min_value, + GstFormat max_format, guint64 max_value) { - GstTCPClient *client; + GstSocketClient *client; GList *clink; GTimeVal now; - gint flags; - struct stat statbuf; - GST_DEBUG_OBJECT (sink, "[fd %5d] adding client, sync_method %d, " - "min_unit %d, min_value %" G_GUINT64_FORMAT - ", max_unit %d, max_value %" G_GUINT64_FORMAT, fd, sync_method, - min_unit, min_value, max_unit, max_value); + GST_DEBUG_OBJECT (sink, "[socket %p] adding client, sync_method %d, " + "min_format %d, min_value %" G_GUINT64_FORMAT + ", max_format %d, max_value %" G_GUINT64_FORMAT, socket, + sync_method, min_format, min_value, max_format, max_value); /* do limits check if we can */ - if (min_unit == max_unit) { + if (min_format == max_format) { if (max_value != -1 && min_value != -1 && max_value < min_value) goto wrong_limits; } /* create client datastructure */ - client = g_new0 (GstTCPClient, 1); - client->fd.fd = fd; + client = g_new0 (GstSocketClient, 1); + client->socket = G_SOCKET (g_object_ref (socket)); client->status = GST_CLIENT_STATUS_OK; client->bufpos = -1; client->flushcount = -1; @@ -839,9 +696,9 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, client->first_buffer_ts = GST_CLOCK_TIME_NONE; client->last_buffer_ts = GST_CLOCK_TIME_NONE; client->new_connection = TRUE; - client->burst_min_unit = min_unit; + client->burst_min_format = min_format; client->burst_min_value = min_value; - client->burst_max_unit = max_unit; + client->burst_max_format = max_format; client->burst_max_value = max_value; client->sync_method = sync_method; client->currently_removing = FALSE; @@ -856,43 +713,33 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, CLIENTS_LOCK (sink); /* check the hash to find a duplicate fd */ - clink = g_hash_table_lookup (sink->fd_hash, &client->fd.fd); + clink = g_hash_table_lookup (sink->socket_hash, socket); if (clink != NULL) goto duplicate; /* we can add the fd now */ clink = sink->clients = g_list_prepend (sink->clients, client); - g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink); + g_hash_table_insert (sink->socket_hash, socket, clink); sink->clients_cookie++; /* set the socket to non blocking */ - if (fcntl (fd, F_SETFL, O_NONBLOCK) < 0) { - GST_ERROR_OBJECT (sink, "failed to make socket %d non-blocking: %s", fd, - g_strerror (errno)); - } + g_socket_set_blocking (socket, FALSE); /* we always read from a client */ - gst_poll_add_fd (sink->fdset, &client->fd); - - /* we don't try to read from write only fds */ - if (sink->handle_read) { - flags = fcntl (fd, F_GETFL, 0); - if ((flags & O_ACCMODE) != O_WRONLY) { - gst_poll_fd_ctl_read (sink->fdset, &client->fd, TRUE); - } + if (sink->main_context) { + client->source = + g_socket_create_source (client->socket, + G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, sink->cancellable); + g_source_set_callback (client->source, + (GSourceFunc) gst_multi_socket_sink_socket_condition, + gst_object_ref (sink), (GDestroyNotify) gst_object_unref); + g_source_attach (client->source, sink->main_context); } - /* figure out the mode, can't use send() for non sockets */ - if (fstat (fd, &statbuf) == 0 && S_ISSOCK (statbuf.st_mode)) { - client->is_socket = TRUE; - setup_dscp_client (sink, client); - } - - gst_poll_restart (sink->fdset); CLIENTS_UNLOCK (sink); g_signal_emit (G_OBJECT (sink), - gst_multi_fd_sink_signals[SIGNAL_CLIENT_ADDED], 0, fd); + gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0, socket); return; @@ -900,18 +747,19 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, wrong_limits: { GST_WARNING_OBJECT (sink, - "[fd %5d] wrong values min =%" G_GUINT64_FORMAT ", max=%" - G_GUINT64_FORMAT ", unit %d specified when adding client", fd, - min_value, max_value, min_unit); + "[socket %p] wrong values min =%" G_GUINT64_FORMAT ", max=%" + G_GUINT64_FORMAT ", format %d specified when adding client", socket, + min_value, max_value, min_format); return; } duplicate: { client->status = GST_CLIENT_STATUS_DUPLICATE; CLIENTS_UNLOCK (sink); - GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd); + GST_WARNING_OBJECT (sink, "[socket %p] duplicate client found, refusing", + socket); g_signal_emit (G_OBJECT (sink), - gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, + gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, socket, client->status); g_free (client); return; @@ -920,37 +768,38 @@ duplicate: /* "add" signal implementation */ void -gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd) +gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket) { - gst_multi_fd_sink_add_full (sink, fd, sink->def_sync_method, - sink->def_burst_unit, sink->def_burst_value, sink->def_burst_unit, -1); + gst_multi_socket_sink_add_full (sink, socket, sink->def_sync_method, + sink->def_burst_format, sink->def_burst_value, sink->def_burst_format, + -1); } /* "remove" signal implementation */ void -gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd) +gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket) { GList *clink; - GST_DEBUG_OBJECT (sink, "[fd %5d] removing client", fd); + GST_DEBUG_OBJECT (sink, "[socket %p] removing client", socket); CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->fd_hash, &fd); + clink = g_hash_table_lookup (sink->socket_hash, socket); if (clink != NULL) { - GstTCPClient *client = (GstTCPClient *) clink->data; + GstSocketClient *client = clink->data; if (client->status != GST_CLIENT_STATUS_OK) { GST_INFO_OBJECT (sink, - "[fd %5d] Client already disconnecting with status %d", - fd, client->status); + "[socket %p] Client already disconnecting with status %d", + socket, client->status); goto done; } client->status = GST_CLIENT_STATUS_REMOVED; - gst_multi_fd_sink_remove_client_link (sink, clink); - gst_poll_restart (sink->fdset); + gst_multi_socket_sink_remove_client_link (sink, clink); } else { - GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd); + GST_WARNING_OBJECT (sink, "[socket %p] no client with this socket found!", + socket); } done: @@ -959,21 +808,21 @@ done: /* "remove-flush" signal implementation */ void -gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd) +gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket) { GList *clink; - GST_DEBUG_OBJECT (sink, "[fd %5d] flushing client", fd); + GST_DEBUG_OBJECT (sink, "[socket %p] flushing client", socket); CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->fd_hash, &fd); + clink = g_hash_table_lookup (sink->socket_hash, socket); if (clink != NULL) { - GstTCPClient *client = (GstTCPClient *) clink->data; + GstSocketClient *client = clink->data; if (client->status != GST_CLIENT_STATUS_OK) { GST_INFO_OBJECT (sink, - "[fd %5d] Client already disconnecting with status %d", - fd, client->status); + "[socket %p] Client already disconnecting with status %d", + socket, client->status); goto done; } @@ -985,7 +834,8 @@ gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd) * it might have some buffers to flush in the ->sending queue. */ client->status = GST_CLIENT_STATUS_FLUSHING; } else { - GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd); + GST_WARNING_OBJECT (sink, "[socket %p] no client with this fd found!", + socket); } done: CLIENTS_UNLOCK (sink); @@ -994,9 +844,9 @@ done: /* can be called both through the signal (i.e. from any thread) or when * stopping, after the writing thread has shut down */ void -gst_multi_fd_sink_clear (GstMultiFdSink * sink) +gst_multi_socket_sink_clear (GstMultiSocketSink * sink) { - GList *clients, *next; + GList *clients; guint32 cookie; GST_DEBUG_OBJECT (sink, "clearing all clients"); @@ -1004,63 +854,42 @@ gst_multi_fd_sink_clear (GstMultiFdSink * sink) CLIENTS_LOCK (sink); restart: cookie = sink->clients_cookie; - for (clients = sink->clients; clients; clients = next) { - GstTCPClient *client; + for (clients = sink->clients; clients; clients = clients->next) { + GstSocketClient *client; if (cookie != sink->clients_cookie) { GST_DEBUG_OBJECT (sink, "cookie changed while removing all clients"); goto restart; } - client = (GstTCPClient *) clients->data; - next = g_list_next (clients); - + client = clients->data; client->status = GST_CLIENT_STATUS_REMOVED; - gst_multi_fd_sink_remove_client_link (sink, clients); + gst_multi_socket_sink_remove_client_link (sink, clients); } - gst_poll_restart (sink->fdset); + CLIENTS_UNLOCK (sink); } /* "get-stats" signal implementation - * the array returned contains: - * - * guint64 : bytes_sent - * guint64 : connect time (in nanoseconds, since Epoch) - * guint64 : disconnect time (in nanoseconds, since Epoch) - * guint64 : time the client is/was connected (in nanoseconds) - * guint64 : last activity time (in nanoseconds, since Epoch) - * guint64 : buffers dropped due to recovery - * guint64 : timestamp of the first buffer sent (in nanoseconds) - * guint64 : timestamp of the last buffer sent (in nanoseconds) */ -GValueArray * -gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd) +GstStructure * +gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, GSocket * socket) { - GstTCPClient *client; - GValueArray *result = NULL; + GstSocketClient *client; + GstStructure *result = NULL; GList *clink; CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->fd_hash, &fd); + clink = g_hash_table_lookup (sink->socket_hash, socket); if (clink == NULL) goto noclient; - client = (GstTCPClient *) clink->data; + client = clink->data; if (client != NULL) { - GValue value = { 0 }; guint64 interval; - result = g_value_array_new (7); + result = gst_structure_new_empty ("multisocketsink-stats"); - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->bytes_sent); - result = g_value_array_append (result, &value); - g_value_unset (&value); - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->connect_time); - result = g_value_array_append (result, &value); - g_value_unset (&value); if (client->disconnect_time == 0) { GTimeVal nowtv; @@ -1070,29 +899,16 @@ gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd) } else { interval = client->disconnect_time - client->connect_time; } - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->disconnect_time); - result = g_value_array_append (result, &value); - g_value_unset (&value); - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, interval); - result = g_value_array_append (result, &value); - g_value_unset (&value); - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->last_activity_time); - result = g_value_array_append (result, &value); - g_value_unset (&value); - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->dropped_buffers); - result = g_value_array_append (result, &value); - g_value_unset (&value); - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->first_buffer_ts); - result = g_value_array_append (result, &value); - g_value_unset (&value); - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->last_buffer_ts); - result = g_value_array_append (result, &value); + + gst_structure_set (result, + "bytes-sent", G_TYPE_UINT64, client->bytes_sent, + "connect-time", G_TYPE_UINT64, client->connect_time, + "disconnect-time", G_TYPE_UINT64, client->disconnect_time, + "connected-duration", G_TYPE_UINT64, interval, + "last-activatity-time", G_TYPE_UINT64, client->last_activity_time, + "dropped-buffers", G_TYPE_UINT64, client->dropped_buffers, + "first-buffer-ts", G_TYPE_UINT64, client->first_buffer_ts, + "last-buffer-ts", G_TYPE_UINT64, client->last_buffer_ts, NULL); } noclient: @@ -1100,8 +916,8 @@ noclient: /* python doesn't like a NULL pointer yet */ if (result == NULL) { - GST_WARNING_OBJECT (sink, "[fd %5d] no client with this found!", fd); - result = g_value_array_new (0); + GST_WARNING_OBJECT (sink, "[socket %p] no client with this found!", socket); + result = gst_structure_new_empty ("multisocketsink-stats"); } return result; @@ -1113,19 +929,21 @@ noclient: * close the fd itself. */ static void -gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link) +gst_multi_socket_sink_remove_client_link (GstMultiSocketSink * sink, + GList * link) { - int fd; + GSocket *socket; GTimeVal now; - GstTCPClient *client = (GstTCPClient *) link->data; - GstMultiFdSinkClass *fclass; + GstSocketClient *client = link->data; + GstMultiSocketSinkClass *fclass; - fclass = GST_MULTI_FD_SINK_GET_CLASS (sink); + fclass = GST_MULTI_SOCKET_SINK_GET_CLASS (sink); - fd = client->fd.fd; + socket = client->socket; if (client->currently_removing) { - GST_WARNING_OBJECT (sink, "[fd %5d] client is already being removed", fd); + GST_WARNING_OBJECT (sink, "[socket %p] client is already being removed", + socket); return; } else { client->currently_removing = TRUE; @@ -1134,34 +952,40 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link) /* FIXME: if we keep track of ip we can log it here and signal */ switch (client->status) { case GST_CLIENT_STATUS_OK: - GST_WARNING_OBJECT (sink, "[fd %5d] removing client %p for no reason", - fd, client); + GST_WARNING_OBJECT (sink, "[socket %p] removing client %p for no reason", + socket, client); break; case GST_CLIENT_STATUS_CLOSED: - GST_DEBUG_OBJECT (sink, "[fd %5d] removing client %p because of close", - fd, client); + GST_DEBUG_OBJECT (sink, "[socket %p] removing client %p because of close", + socket, client); break; case GST_CLIENT_STATUS_REMOVED: GST_DEBUG_OBJECT (sink, - "[fd %5d] removing client %p because the app removed it", fd, client); + "[socket %p] removing client %p because the app removed it", socket, + client); break; case GST_CLIENT_STATUS_SLOW: GST_INFO_OBJECT (sink, - "[fd %5d] removing client %p because it was too slow", fd, client); + "[socket %p] removing client %p because it was too slow", socket, + client); break; case GST_CLIENT_STATUS_ERROR: GST_WARNING_OBJECT (sink, - "[fd %5d] removing client %p because of error", fd, client); + "[socket %p] removing client %p because of error", socket, client); break; case GST_CLIENT_STATUS_FLUSHING: default: GST_WARNING_OBJECT (sink, - "[fd %5d] removing client %p with invalid reason %d", fd, client, - client->status); + "[socket %p] removing client %p with invalid reason %d", socket, + client, client->status); break; } - gst_poll_remove_fd (sink->fdset, &client->fd); + if (client->source) { + g_source_destroy (client->source); + g_source_unref (client->source); + client->source = NULL; + } g_get_current_time (&now); client->disconnect_time = GST_TIMEVAL_TO_TIME (now); @@ -1180,16 +1004,17 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link) CLIENTS_UNLOCK (sink); g_signal_emit (G_OBJECT (sink), - gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, client->status); + gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, socket, + client->status); /* lock again before we remove the client completely */ CLIENTS_LOCK (sink); /* fd cannot be reused in the above signal callback so we can safely * remove it from the hashtable here */ - if (!g_hash_table_remove (sink->fd_hash, &client->fd.fd)) { + if (!g_hash_table_remove (sink->socket_hash, socket)) { GST_WARNING_OBJECT (sink, - "[fd %5d] error removing client %p from hash", client->fd.fd, client); + "[socket %p] error removing client %p from hash", socket, client); } /* after releasing the lock above, the link could be invalid, more * precisely, the next and prev pointers could point to invalid list @@ -1200,92 +1025,69 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link) sink->clients_cookie++; if (fclass->removed) - fclass->removed (sink, client->fd.fd); + fclass->removed (sink, socket); g_free (client); CLIENTS_UNLOCK (sink); /* and the fd is really gone now */ g_signal_emit (G_OBJECT (sink), - gst_multi_fd_sink_signals[SIGNAL_CLIENT_FD_REMOVED], 0, fd); + gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0, socket); + g_object_unref (socket); CLIENTS_LOCK (sink); } -/* handle a read on a client fd, +/* handle a read on a client socket, * which either indicates a close or should be ignored * returns FALSE if some error occured or the client closed. */ static gboolean -gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink, - GstTCPClient * client) +gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink, + GstSocketClient * client) { - int avail, fd; gboolean ret; + gchar dummy[256]; + gssize nread; + GError *err = NULL; + gboolean first = TRUE; - fd = client->fd.fd; - - if (ioctl (fd, FIONREAD, &avail) < 0) - goto ioctl_failed; - - GST_DEBUG_OBJECT (sink, "[fd %5d] select reports client read of %d bytes", - fd, avail); + GST_DEBUG_OBJECT (sink, "[socket %p] select reports client read", + client->socket); ret = TRUE; - if (avail == 0) { - /* client sent close, so remove it */ - GST_DEBUG_OBJECT (sink, "[fd %5d] client asked for close, removing", fd); - client->status = GST_CLIENT_STATUS_CLOSED; - ret = FALSE; - } else if (avail < 0) { - GST_WARNING_OBJECT (sink, "[fd %5d] avail < 0, removing", fd); - client->status = GST_CLIENT_STATUS_ERROR; - ret = FALSE; - } else { - guint8 dummy[512]; - gint nread; - - /* just Read 'n' Drop, could also just drop the client as it's not supposed - * to write to us except for closing the socket, I guess it's because we - * like to listen to our customers. */ - do { - /* this is the maximum we can read */ - gint to_read = MIN (avail, 512); - - GST_DEBUG_OBJECT (sink, "[fd %5d] client wants us to read %d bytes", - fd, to_read); - - nread = read (fd, dummy, to_read); - if (nread < -1) { - GST_WARNING_OBJECT (sink, "[fd %5d] could not read %d bytes: %s (%d)", - fd, to_read, g_strerror (errno), errno); - client->status = GST_CLIENT_STATUS_ERROR; - ret = FALSE; - break; - } else if (nread == 0) { - GST_WARNING_OBJECT (sink, "[fd %5d] 0 bytes in read, removing", fd); - client->status = GST_CLIENT_STATUS_ERROR; - ret = FALSE; - break; - } - avail -= nread; + /* just Read 'n' Drop, could also just drop the client as it's not supposed + * to write to us except for closing the socket, I guess it's because we + * like to listen to our customers. */ + do { + GST_DEBUG_OBJECT (sink, "[socket %p] client wants us to read", + client->socket); + + nread = + g_socket_receive (client->socket, dummy, sizeof (dummy), + sink->cancellable, &err); + if (first && nread == 0) { + /* client sent close, so remove it */ + GST_DEBUG_OBJECT (sink, "[socket %p] client asked for close, removing", + client->socket); + client->status = GST_CLIENT_STATUS_CLOSED; + ret = FALSE; + } else if (nread < 0) { + GST_WARNING_OBJECT (sink, "[socket %p] could not read: %s", + client->socket, err->message); + client->status = GST_CLIENT_STATUS_ERROR; + ret = FALSE; + break; } - while (avail > 0); - } - return ret; + first = FALSE; + } while (nread > 0); + g_clear_error (&err); - /* ERRORS */ -ioctl_failed: - { - GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)", - fd, g_strerror (errno), errno); - client->status = GST_CLIENT_STATUS_ERROR; - return FALSE; - } + return ret; } static gboolean -is_sync_frame (GstMultiFdSink * sink, GstBuffer * buffer) +is_sync_frame (GstMultiSocketSink * sink, GstBuffer * buffer) { if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) { return FALSE; @@ -1298,8 +1100,8 @@ is_sync_frame (GstMultiFdSink * sink, GstBuffer * buffer) /* queue the given buffer for the given client */ static gboolean -gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, - GstTCPClient * client, GstBuffer * buffer) +gst_multi_socket_sink_client_queue_buffer (GstMultiSocketSink * sink, + GstSocketClient * client, GstBuffer * buffer) { GstCaps *caps; @@ -1313,8 +1115,8 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, if (!client->caps) { GST_DEBUG_OBJECT (sink, - "[fd %5d] no previous caps for this client, send streamheader", - client->fd.fd); + "[socket %p] no previous caps for this client, send streamheader", + client->socket); send_streamheader = TRUE; client->caps = gst_caps_ref (caps); } else { @@ -1327,23 +1129,23 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, if (!gst_structure_has_field (s, "streamheader")) { /* no new streamheader, so nothing new to send */ GST_DEBUG_OBJECT (sink, - "[fd %5d] new caps do not have streamheader, not sending", - client->fd.fd); + "[socket %p] new caps do not have streamheader, not sending", + client->socket); } else { /* there is a new streamheader */ s = gst_caps_get_structure (client->caps, 0); if (!gst_structure_has_field (s, "streamheader")) { /* no previous streamheader, so send the new one */ GST_DEBUG_OBJECT (sink, - "[fd %5d] previous caps did not have streamheader, sending", - client->fd.fd); + "[socket %p] previous caps did not have streamheader, sending", + client->socket); send_streamheader = TRUE; } else { /* both old and new caps have streamheader set */ if (!sink->resend_streamheader) { GST_DEBUG_OBJECT (sink, - "[fd %5d] asked to not resend the streamheader, not sending", - client->fd.fd); + "[socket %p] asked to not resend the streamheader, not sending", + client->socket); send_streamheader = FALSE; } else { sh1 = gst_structure_get_value (s, "streamheader"); @@ -1351,8 +1153,8 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, sh2 = gst_structure_get_value (s, "streamheader"); if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) { GST_DEBUG_OBJECT (sink, - "[fd %5d] new streamheader different from old, sending", - client->fd.fd); + "[socket %p] new streamheader different from old, sending", + client->socket); send_streamheader = TRUE; } } @@ -1370,16 +1172,17 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, int i; GST_LOG_OBJECT (sink, - "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT, - client->fd.fd, caps); + "[socket %p] sending streamheader from caps %" GST_PTR_FORMAT, + client->socket, caps); s = gst_caps_get_structure (caps, 0); if (!gst_structure_has_field (s, "streamheader")) { GST_DEBUG_OBJECT (sink, - "[fd %5d] no new streamheader, so nothing to send", client->fd.fd); + "[socket %p] no new streamheader, so nothing to send", + client->socket); } else { GST_LOG_OBJECT (sink, - "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT, - client->fd.fd, caps); + "[socket %p] sending streamheader from caps %" GST_PTR_FORMAT, + client->socket, caps); sh = gst_structure_get_value (s, "streamheader"); g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY); buffers = g_value_peek_pointer (sh); @@ -1392,8 +1195,8 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER); buffer = g_value_peek_pointer (bufval); GST_DEBUG_OBJECT (sink, - "[fd %5d] queueing streamheader buffer of length %" G_GSIZE_FORMAT, - client->fd.fd, gst_buffer_get_size (buffer)); + "[socket %p] queueing streamheader buffer of length %" + G_GSIZE_FORMAT, client->socket, gst_buffer_get_size (buffer)); gst_buffer_ref (buffer); client->sending = g_slist_append (client->sending, buffer); @@ -1404,8 +1207,9 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, gst_caps_unref (caps); caps = NULL; - GST_LOG_OBJECT (sink, "[fd %5d] queueing buffer of length %" G_GSIZE_FORMAT, - client->fd.fd, gst_buffer_get_size (buffer)); + GST_LOG_OBJECT (sink, + "[socket %p] queueing buffer of length %" G_GSIZE_FORMAT, client->socket, + gst_buffer_get_size (buffer)); gst_buffer_ref (buffer); client->sending = g_slist_append (client->sending, buffer); @@ -1419,7 +1223,7 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, * Returns: the index or -1 if there is no keyframe after idx. */ static gint -find_syncframe (GstMultiFdSink * sink, gint idx, gint direction) +find_syncframe (GstMultiSocketSink * sink, gint idx, gint direction) { gint i, len, result; @@ -1452,12 +1256,12 @@ find_syncframe (GstMultiFdSink * sink, gint idx, gint direction) * If units are not BUFFERS, and there are insufficient buffers in the * queue to satify the limit, return len(queue) + 1 */ static gint -get_buffers_max (GstMultiFdSink * sink, gint64 max) +get_buffers_max (GstMultiSocketSink * sink, gint64 max) { switch (sink->unit_type) { - case GST_TCP_UNIT_TYPE_BUFFERS: + case GST_FORMAT_BUFFERS: return max; - case GST_TCP_UNIT_TYPE_TIME: + case GST_FORMAT_TIME: { GstBuffer *buf; int i; @@ -1481,7 +1285,7 @@ get_buffers_max (GstMultiFdSink * sink, gint64 max) } return len + 1; } - case GST_TCP_UNIT_TYPE_BYTES: + case GST_FORMAT_BYTES: { GstBuffer *buf; int i; @@ -1517,7 +1321,7 @@ get_buffers_max (GstMultiFdSink * sink, gint64 max) * FIXME, this code might now work if any of the units is in buffers... */ static gboolean -find_limits (GstMultiFdSink * sink, +find_limits (GstMultiSocketSink * sink, gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min, gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max) { @@ -1624,23 +1428,23 @@ find_limits (GstMultiFdSink * sink, * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise. */ static gboolean -assign_value (GstTCPUnitType unit, guint64 value, gint * bytes, gint * buffers, +assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers, GstClockTime * time) { gboolean res = TRUE; /* set only the limit of the given format to the given value */ - switch (unit) { - case GST_TCP_UNIT_TYPE_BUFFERS: + switch (format) { + case GST_FORMAT_BUFFERS: *buffers = (gint) value; break; - case GST_TCP_UNIT_TYPE_TIME: + case GST_FORMAT_TIME: *time = value; break; - case GST_TCP_UNIT_TYPE_BYTES: + case GST_FORMAT_BYTES: *bytes = (gint) value; break; - case GST_TCP_UNIT_TYPE_UNDEFINED: + case GST_FORMAT_UNDEFINED: default: res = FALSE; break; @@ -1657,16 +1461,16 @@ assign_value (GstTCPUnitType unit, guint64 value, gint * bytes, gint * buffers, * function returns FALSE. */ static gboolean -count_burst_unit (GstMultiFdSink * sink, gint * min_idx, - GstTCPUnitType min_unit, guint64 min_value, gint * max_idx, - GstTCPUnitType max_unit, guint64 max_value) +count_burst_unit (GstMultiSocketSink * sink, gint * min_idx, + GstFormat min_format, guint64 min_value, gint * max_idx, + GstFormat max_format, guint64 max_value) { gint bytes_min = -1, buffers_min = -1; gint bytes_max = -1, buffers_max = -1; GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE; - assign_value (min_unit, min_value, &bytes_min, &buffers_min, &time_min); - assign_value (max_unit, max_value, &bytes_max, &buffers_max, &time_max); + assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min); + assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max); return find_limits (sink, min_idx, bytes_min, buffers_min, time_min, max_idx, bytes_max, buffers_max, time_max); @@ -1681,12 +1485,14 @@ count_burst_unit (GstMultiFdSink * sink, gint * min_idx, * when more buffers have arrived. */ static gint -gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) +gst_multi_socket_sink_new_client (GstMultiSocketSink * sink, + GstSocketClient * client) { gint result; GST_DEBUG_OBJECT (sink, - "[fd %5d] new client, deciding where to start in queue", client->fd.fd); + "[socket %p] new client, deciding where to start in queue", + client->socket); GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long", sink->bufqueue->len); switch (client->sync_method) { @@ -1694,36 +1500,37 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) /* no syncing, we are happy with whatever the client is going to get */ result = client->bufpos; GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_LATEST, position %d", client->fd.fd, result); + "[socket %p] SYNC_METHOD_LATEST, position %d", client->socket, + result); break; case GST_SYNC_METHOD_NEXT_KEYFRAME: { /* if one of the new buffers (between client->bufpos and 0) in the queue * is a sync point, we can proceed, otherwise we need to keep waiting */ GST_LOG_OBJECT (sink, - "[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd, - client->bufpos); + "[socket %p] new client, bufpos %d, waiting for keyframe", + client->socket, client->bufpos); result = find_prev_syncframe (sink, client->bufpos); if (result != -1) { GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_NEXT_KEYFRAME: result %d", - client->fd.fd, result); + "[socket %p] SYNC_METHOD_NEXT_KEYFRAME: result %d", + client->socket, result); break; } /* client is not on a syncbuffer, need to skip these buffers and * wait some more */ GST_LOG_OBJECT (sink, - "[fd %5d] new client, skipping buffer(s), no syncpoint found", - client->fd.fd); + "[socket %p] new client, skipping buffer(s), no syncpoint found", + client->socket); client->bufpos = -1; break; } case GST_SYNC_METHOD_LATEST_KEYFRAME: { GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME", client->fd.fd); + "[socket %p] SYNC_METHOD_LATEST_KEYFRAME", client->socket); /* for new clients we initially scan the complete buffer queue for * a sync point when a buffer is added. If we don't find a keyframe, @@ -1733,14 +1540,14 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) result = find_next_syncframe (sink, 0); if (result != -1) { GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME: result %d", client->fd.fd, - result); + "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: result %d", + client->socket, result); break; } GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, " - "switching to SYNC_METHOD_NEXT_KEYFRAME", client->fd.fd); + "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, " + "switching to SYNC_METHOD_NEXT_KEYFRAME", client->socket); /* throw client to the waiting state */ client->bufpos = -1; /* and make client sync to next keyframe */ @@ -1757,12 +1564,12 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) * is not enough data, we just send what we have (which is in result). * We use the max value to limit the search */ - ok = count_burst_unit (sink, &result, client->burst_min_unit, - client->burst_min_value, &max, client->burst_max_unit, + ok = count_burst_unit (sink, &result, client->burst_min_format, + client->burst_min_value, &max, client->burst_max_format, client->burst_max_value); GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_BURST: burst_unit returned %d, result %d", - client->fd.fd, ok, result); + "[socket %p] SYNC_METHOD_BURST: burst_unit returned %d, result %d", + client->socket, ok, result); GST_LOG_OBJECT (sink, "min %d, max %d", result, max); @@ -1770,8 +1577,8 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) if (max != -1 && max <= result) { result = MAX (max - 1, 0); GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_BURST: result above max, taken down to %d", - client->fd.fd, result); + "[socket %p] SYNC_METHOD_BURST: result above max, taken down to %d", + client->socket, result); } break; } @@ -1788,8 +1595,8 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) * NEXT_KEYFRAME. */ /* gather burst limits */ - count_burst_unit (sink, &min_idx, client->burst_min_unit, - client->burst_min_value, &max_idx, client->burst_max_unit, + count_burst_unit (sink, &min_idx, client->burst_min_format, + client->burst_min_value, &max_idx, client->burst_max_format, client->burst_max_value); GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx); @@ -1835,8 +1642,8 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) * amount of data up 'till min. */ /* gather enough data to burst */ - count_burst_unit (sink, &min_idx, client->burst_min_unit, - client->burst_min_value, &max_idx, client->burst_max_unit, + count_burst_unit (sink, &min_idx, client->burst_min_format, + client->burst_min_value, &max_idx, client->burst_max_format, client->burst_max_value); GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx); @@ -1893,14 +1700,15 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) * This functions returns FALSE if some error occured. */ static gboolean -gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, - GstTCPClient * client) +gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, + GstSocketClient * client) { - int fd = client->fd.fd; + GSocket *socket = client->socket; gboolean more; gboolean flushing; GstClockTime now; GTimeVal nowtv; + GError *err = NULL; g_get_current_time (&nowtv); now = GST_TIMEVAL_TO_TIME (nowtv); @@ -1916,7 +1724,11 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, if (client->bufpos == -1) { /* client is too fast, remove from write queue until new buffer is * available */ - gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE); + if (client->source) { + g_source_destroy (client->source); + g_source_unref (client->source); + client->source = NULL; + } /* if we flushed out all of the client buffers, we can stop */ if (client->flushcount == 0) goto flushed; @@ -1930,7 +1742,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, /* for new connections, we need to find a good spot in the * bufqueue to start streaming from */ if (client->new_connection && !flushing) { - gint position = gst_multi_fd_sink_new_client (sink, client); + gint position = gst_multi_socket_sink_new_client (sink, client); if (position >= 0) { /* we got a valid spot in the queue */ @@ -1938,7 +1750,11 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, client->bufpos = position; } else { /* cannot send data to this client yet */ - gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE); + if (client->source) { + g_source_destroy (client->source); + g_source_unref (client->source); + client->source = NULL; + } return TRUE; } } @@ -1962,11 +1778,11 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, if (client->flushcount != -1) client->flushcount--; - GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d", - fd, client, client->bufpos); + GST_LOG_OBJECT (sink, "[socket %p] client %p at position %d", + socket, client, client->bufpos); /* queueing a buffer will ref it */ - gst_multi_fd_sink_client_queue_buffer (sink, client, buf); + gst_multi_socket_sink_client_queue_buffer (sink, client, buf); /* need to start from the first byte for this new buffer */ client->bufoffset = 0; @@ -1975,7 +1791,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, /* see if we need to send something */ if (client->sending) { - ssize_t wrote; + gssize wrote; GstBuffer *head; guint8 *data; gsize size; @@ -1987,24 +1803,15 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, maxsize = size - client->bufoffset; /* try to write the complete buffer */ -#ifdef MSG_NOSIGNAL -#define FLAGS MSG_NOSIGNAL -#else -#define FLAGS 0 -#endif - if (client->is_socket) { - wrote = send (fd, data + client->bufoffset, maxsize, FLAGS); - } else { - wrote = write (fd, data + client->bufoffset, maxsize); - } + + wrote = + g_socket_send (socket, (gchar *) data + client->bufoffset, maxsize, + sink->cancellable, &err); gst_buffer_unmap (head, data, size); if (wrote < 0) { /* hmm error.. */ - if (errno == EAGAIN) { - /* nothing serious, resource was unavailable, try again later */ - more = FALSE; - } else if (errno == ECONNRESET) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED)) { goto connection_reset; } else { goto write_error; @@ -2014,7 +1821,8 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, /* partial write means that the client cannot read more and we should * stop sending more */ GST_LOG_OBJECT (sink, - "partial write on %d of %" G_GSSIZE_FORMAT " bytes", fd, wrote); + "partial write on %p of %" G_GSSIZE_FORMAT " bytes", socket, + wrote); client->bufoffset += wrote; more = FALSE; } else { @@ -2037,21 +1845,24 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, /* ERRORS */ flushed: { - GST_DEBUG_OBJECT (sink, "[fd %5d] flushed, removing", fd); + GST_DEBUG_OBJECT (sink, "[socket %p] flushed, removing", socket); client->status = GST_CLIENT_STATUS_REMOVED; return FALSE; } connection_reset: { - GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing", fd); + GST_DEBUG_OBJECT (sink, "[socket %p] connection reset by peer, removing", + socket); client->status = GST_CLIENT_STATUS_CLOSED; + g_clear_error (&err); return FALSE; } write_error: { GST_WARNING_OBJECT (sink, - "[fd %5d] could not write, removing client: %s (%d)", fd, - g_strerror (errno), errno); + "[socket %p] could not write, removing client: %s", socket, + err->message); + g_clear_error (&err); client->status = GST_CLIENT_STATUS_ERROR; return FALSE; } @@ -2062,13 +1873,14 @@ write_error: * position. */ static gint -gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) +gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink, + GstSocketClient * client) { gint newbufpos; GST_WARNING_OBJECT (sink, - "[fd %5d] client %p is lagging at %d, recover using policy %d", - client->fd.fd, client, client->bufpos, sink->recover_policy); + "[socket %p] client %p is lagging at %d, recover using policy %d", + client->socket, client, client->bufpos, sink->recover_policy); switch (sink->recover_policy) { case GST_RECOVER_POLICY_NONE: @@ -2128,11 +1940,10 @@ gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) * the select thread that the fd_set changed. */ static void -gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) +gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf) { GList *clients, *next; gint queuelen; - gboolean need_signal = FALSE; gint max_buffer_usage; gint i; GTimeVal nowtv; @@ -2145,6 +1956,7 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) CLIENTS_LOCK (sink); /* add buffer to queue */ + gst_buffer_ref (buf); g_array_prepend_val (sink->bufqueue, buf); queuelen = sink->bufqueue->len; @@ -2166,34 +1978,34 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) restart: cookie = sink->clients_cookie; for (clients = sink->clients; clients; clients = next) { - GstTCPClient *client; + GstSocketClient *client; if (cookie != sink->clients_cookie) { GST_DEBUG_OBJECT (sink, "Clients cookie outdated, restarting"); goto restart; } - client = (GstTCPClient *) clients->data; + client = clients->data; next = g_list_next (clients); client->bufpos++; - GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d", - client->fd.fd, client, client->bufpos); + GST_LOG_OBJECT (sink, "[socket %p] client %p at position %d", + client->socket, client, client->bufpos); /* check soft max if needed, recover client */ if (soft_max_buffers > 0 && client->bufpos >= soft_max_buffers) { gint newpos; - newpos = gst_multi_fd_sink_recover_client (sink, client); + newpos = gst_multi_socket_sink_recover_client (sink, client); if (newpos != client->bufpos) { client->dropped_buffers += client->bufpos - newpos; client->bufpos = newpos; client->discont = TRUE; - GST_INFO_OBJECT (sink, "[fd %5d] client %p position reset to %d", - client->fd.fd, client, client->bufpos); + GST_INFO_OBJECT (sink, "[socket %p] client %p position reset to %d", + client->socket, client, client->bufpos); } else { GST_INFO_OBJECT (sink, - "[fd %5d] client %p not recovering position", - client->fd.fd, client); + "[socket %p] client %p not recovering position", + client->socket, client); } } /* check hard max and timeout, remove client */ @@ -2201,21 +2013,28 @@ restart: (sink->timeout > 0 && now - client->last_activity_time > sink->timeout)) { /* remove client */ - GST_WARNING_OBJECT (sink, "[fd %5d] client %p is too slow, removing", - client->fd.fd, client); + GST_WARNING_OBJECT (sink, "[socket %p] client %p is too slow, removing", + client->socket, client); /* remove the client, the fd set will be cleared and the select thread * will be signaled */ client->status = GST_CLIENT_STATUS_SLOW; /* set client to invalid position while being removed */ client->bufpos = -1; - gst_multi_fd_sink_remove_client_link (sink, clients); - need_signal = TRUE; + gst_multi_socket_sink_remove_client_link (sink, clients); continue; } else if (client->bufpos == 0 || client->new_connection) { /* can send data to this client now. need to signal the select thread that * the fd_set changed */ - gst_poll_fd_ctl_write (sink->fdset, &client->fd, TRUE); - need_signal = TRUE; + if (!client->source) { + client->source = + g_socket_create_source (client->socket, + G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, + sink->cancellable); + g_source_set_callback (client->source, + (GSourceFunc) gst_multi_socket_sink_socket_condition, + gst_object_ref (sink), (GDestroyNotify) gst_object_unref); + g_source_attach (client->source, sink->main_context); + } } /* keep track of maximum buffer usage */ if (client->bufpos > max_buffer_usage) { @@ -2289,203 +2108,139 @@ restart: /* save for stats */ sink->buffers_queued = max_buffer_usage; CLIENTS_UNLOCK (sink); - - /* and send a signal to thread if fd_set changed */ - if (need_signal) { - gst_poll_restart (sink->fdset); - } } -/* Handle the clients. Basically does a blocking select for one - * of the client fds to become read or writable. We also have a - * filedescriptor to receive commands on that we need to check. - * - * After going out of the select call, we read and write to all - * clients that can do so. Badly behaving clients are put on a +/* Handle the clients. This is called when a socket becomes ready + * to read or writable. Badly behaving clients are put on a * garbage list and removed. */ -static void -gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) +static gboolean +gst_multi_socket_sink_socket_condition (GSocket * socket, + GIOCondition condition, GstMultiSocketSink * sink) { - int result; - GList *clients, *next; - gboolean try_again; - GstMultiFdSinkClass *fclass; - guint cookie; - - fclass = GST_MULTI_FD_SINK_GET_CLASS (sink); - - do { - try_again = FALSE; - - /* check for: - * - server socket input (ie, new client connections) - * - client socket input (ie, clients saying goodbye) - * - client socket output (ie, client reads) */ - GST_LOG_OBJECT (sink, "waiting on action on fdset"); - - result = gst_poll_wait (sink->fdset, sink->timeout != 0 ? sink->timeout : - GST_CLOCK_TIME_NONE); - - /* Handle the special case in which the sink is not receiving more buffers - * and will not disconnect inactive client in the streaming thread. */ - if (G_UNLIKELY (result == 0)) { - GstClockTime now; - GTimeVal nowtv; + GList *clink; + GstSocketClient *client; + gboolean ret = TRUE; - g_get_current_time (&nowtv); - now = GST_TIMEVAL_TO_TIME (nowtv); - - CLIENTS_LOCK (sink); - for (clients = sink->clients; clients; clients = next) { - GstTCPClient *client; - - client = (GstTCPClient *) clients->data; - next = g_list_next (clients); - if (sink->timeout > 0 - && now - client->last_activity_time > sink->timeout) { - client->status = GST_CLIENT_STATUS_SLOW; - gst_multi_fd_sink_remove_client_link (sink, clients); - } - } - CLIENTS_UNLOCK (sink); - return; - } else if (result < 0) { - GST_WARNING_OBJECT (sink, "wait failed: %s (%d)", g_strerror (errno), - errno); - if (errno == EBADF) { - /* ok, so one or more of the fds is invalid. We loop over them to find - * the ones that give an error to the F_GETFL fcntl. */ - CLIENTS_LOCK (sink); - restart: - cookie = sink->clients_cookie; - for (clients = sink->clients; clients; clients = next) { - GstTCPClient *client; - int fd; - long flags; - int res; - - if (cookie != sink->clients_cookie) { - GST_DEBUG_OBJECT (sink, "Cookie changed finding bad fd"); - goto restart; - } + CLIENTS_LOCK (sink); + clink = g_hash_table_lookup (sink->socket_hash, socket); + if (clink == NULL) { + ret = FALSE; + goto done; + } - client = (GstTCPClient *) clients->data; - next = g_list_next (clients); + client = clink->data; - fd = client->fd.fd; + if (client->status != GST_CLIENT_STATUS_FLUSHING + && client->status != GST_CLIENT_STATUS_OK) { + gst_multi_socket_sink_remove_client_link (sink, clink); + ret = FALSE; + goto done; + } - res = fcntl (fd, F_GETFL, &flags); - if (res == -1) { - GST_WARNING_OBJECT (sink, "fnctl failed for %d, removing: %s (%d)", - fd, g_strerror (errno), errno); - if (errno == EBADF) { - client->status = GST_CLIENT_STATUS_ERROR; - /* releases the CLIENTS lock */ - gst_multi_fd_sink_remove_client_link (sink, clients); - } - } - } - CLIENTS_UNLOCK (sink); - /* after this, go back in the select loop as the read/writefds - * are not valid */ - try_again = TRUE; - } else if (errno == EINTR) { - /* interrupted system call, just redo the wait */ - try_again = TRUE; - } else if (errno == EBUSY) { - /* the call to gst_poll_wait() was flushed */ - return; - } else { - /* this is quite bad... */ - GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL), - ("select failed: %s (%d)", g_strerror (errno), errno)); - return; - } - } else { - GST_LOG_OBJECT (sink, "wait done: %d sockets with events", result); + if ((condition & G_IO_ERR)) { + GST_WARNING_OBJECT (sink, "Socket %p has error", client->socket); + client->status = GST_CLIENT_STATUS_ERROR; + gst_multi_socket_sink_remove_client_link (sink, clink); + ret = FALSE; + goto done; + } else if ((condition & G_IO_HUP)) { + client->status = GST_CLIENT_STATUS_CLOSED; + gst_multi_socket_sink_remove_client_link (sink, clink); + ret = FALSE; + goto done; + } else if ((condition & G_IO_IN) || (condition & G_IO_PRI)) { + /* handle client read */ + if (!gst_multi_socket_sink_handle_client_read (sink, client)) { + gst_multi_socket_sink_remove_client_link (sink, clink); + ret = FALSE; + goto done; } - } while (try_again); - - /* subclasses can check fdset with this virtual function */ - if (fclass->wait) - fclass->wait (sink, sink->fdset); + } else if ((condition & G_IO_OUT)) { + /* handle client write */ + if (!gst_multi_socket_sink_handle_client_write (sink, client)) { + gst_multi_socket_sink_remove_client_link (sink, clink); + ret = FALSE; + goto done; + } + } - /* Check the clients */ - CLIENTS_LOCK (sink); +done: + CLIENTS_UNLOCK (sink); -restart2: - cookie = sink->clients_cookie; - for (clients = sink->clients; clients; clients = next) { - GstTCPClient *client; + return ret; +} - if (sink->clients_cookie != cookie) { - GST_DEBUG_OBJECT (sink, "Restarting loop, cookie out of date"); - goto restart2; - } +static gboolean +gst_multi_socket_sink_timeout (GstMultiSocketSink * sink) +{ + GstClockTime now; + GTimeVal nowtv; + GList *clients; - client = (GstTCPClient *) clients->data; - next = g_list_next (clients); + g_get_current_time (&nowtv); + now = GST_TIMEVAL_TO_TIME (nowtv); - if (client->status != GST_CLIENT_STATUS_FLUSHING - && client->status != GST_CLIENT_STATUS_OK) { - gst_multi_fd_sink_remove_client_link (sink, clients); - continue; - } + CLIENTS_LOCK (sink); + for (clients = sink->clients; clients; clients = clients->next) { + GstSocketClient *client; - if (gst_poll_fd_has_closed (sink->fdset, &client->fd)) { - client->status = GST_CLIENT_STATUS_CLOSED; - gst_multi_fd_sink_remove_client_link (sink, clients); - continue; - } - if (gst_poll_fd_has_error (sink->fdset, &client->fd)) { - GST_WARNING_OBJECT (sink, "gst_poll_fd_has_error for %d", client->fd.fd); - client->status = GST_CLIENT_STATUS_ERROR; - gst_multi_fd_sink_remove_client_link (sink, clients); - continue; - } - if (gst_poll_fd_can_read (sink->fdset, &client->fd)) { - /* handle client read */ - if (!gst_multi_fd_sink_handle_client_read (sink, client)) { - gst_multi_fd_sink_remove_client_link (sink, clients); - continue; - } - } - if (gst_poll_fd_can_write (sink->fdset, &client->fd)) { - /* handle client write */ - if (!gst_multi_fd_sink_handle_client_write (sink, client)) { - gst_multi_fd_sink_remove_client_link (sink, clients); - continue; - } + client = clients->data; + if (sink->timeout > 0 && now - client->last_activity_time > sink->timeout) { + client->status = GST_CLIENT_STATUS_SLOW; + gst_multi_socket_sink_remove_client_link (sink, clients); } } CLIENTS_UNLOCK (sink); + + return FALSE; } /* we handle the client communication in another thread so that we do not block * the gstreamer thread while we select() on the client fds */ static gpointer -gst_multi_fd_sink_thread (GstMultiFdSink * sink) +gst_multi_socket_sink_thread (GstMultiSocketSink * sink) { + GSource *timeout = NULL; + while (sink->running) { - gst_multi_fd_sink_handle_clients (sink); + if (sink->timeout > 0) { + timeout = g_timeout_source_new (sink->timeout / GST_MSECOND); + + g_source_set_callback (timeout, + (GSourceFunc) gst_multi_socket_sink_timeout, gst_object_ref (sink), + (GDestroyNotify) gst_object_unref); + g_source_attach (timeout, sink->main_context); + } + + /* Returns after handling all pending events or when + * _wakeup() was called. In any case we have to add + * a new timeout because something happened. + */ + g_main_context_iteration (sink->main_context, TRUE); + + if (timeout) { + g_source_destroy (timeout); + g_source_unref (timeout); + } } + return NULL; } static GstFlowReturn -gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf) +gst_multi_socket_sink_render (GstBaseSink * bsink, GstBuffer * buf) { - GstMultiFdSink *sink; + GstMultiSocketSink *sink; gboolean in_caps; #if 0 GstCaps *bufcaps, *padcaps; #endif - sink = GST_MULTI_FD_SINK (bsink); + sink = GST_MULTI_SOCKET_SINK (bsink); - g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, GST_MULTI_FD_SINK_OPEN), - GST_FLOW_WRONG_STATE); + g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, + GST_MULTI_SOCKET_SINK_OPEN), GST_FLOW_WRONG_STATE); #if 0 /* since we check every buffer for streamheader caps, we need to make @@ -2563,7 +2318,7 @@ gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf) sink->streamheader = g_slist_append (sink->streamheader, buf); } else { /* queue the buffer, this is a regular data buffer. */ - gst_multi_fd_sink_queue_buffer (sink, buf); + gst_multi_socket_sink_queue_buffer (sink, buf); sink->bytes_to_serve += gst_buffer_get_size (buf); } @@ -2581,65 +2336,58 @@ no_caps: } static void -gst_multi_fd_sink_set_property (GObject * object, guint prop_id, +gst_multi_socket_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { - GstMultiFdSink *multifdsink; + GstMultiSocketSink *multisocketsink; - multifdsink = GST_MULTI_FD_SINK (object); + multisocketsink = GST_MULTI_SOCKET_SINK (object); switch (prop_id) { - case PROP_MODE: - multifdsink->mode = g_value_get_enum (value); - break; case PROP_BUFFERS_MAX: - multifdsink->units_max = g_value_get_int (value); + multisocketsink->units_max = g_value_get_int (value); break; case PROP_BUFFERS_SOFT_MAX: - multifdsink->units_soft_max = g_value_get_int (value); + multisocketsink->units_soft_max = g_value_get_int (value); break; case PROP_TIME_MIN: - multifdsink->time_min = g_value_get_int64 (value); + multisocketsink->time_min = g_value_get_int64 (value); break; case PROP_BYTES_MIN: - multifdsink->bytes_min = g_value_get_int (value); + multisocketsink->bytes_min = g_value_get_int (value); break; case PROP_BUFFERS_MIN: - multifdsink->buffers_min = g_value_get_int (value); + multisocketsink->buffers_min = g_value_get_int (value); break; case PROP_UNIT_TYPE: - multifdsink->unit_type = g_value_get_enum (value); + multisocketsink->unit_type = g_value_get_enum (value); break; case PROP_UNITS_MAX: - multifdsink->units_max = g_value_get_int64 (value); + multisocketsink->units_max = g_value_get_int64 (value); break; case PROP_UNITS_SOFT_MAX: - multifdsink->units_soft_max = g_value_get_int64 (value); + multisocketsink->units_soft_max = g_value_get_int64 (value); break; case PROP_RECOVER_POLICY: - multifdsink->recover_policy = g_value_get_enum (value); + multisocketsink->recover_policy = g_value_get_enum (value); break; case PROP_TIMEOUT: - multifdsink->timeout = g_value_get_uint64 (value); + multisocketsink->timeout = g_value_get_uint64 (value); break; case PROP_SYNC_METHOD: - multifdsink->def_sync_method = g_value_get_enum (value); + multisocketsink->def_sync_method = g_value_get_enum (value); break; - case PROP_BURST_UNIT: - multifdsink->def_burst_unit = g_value_get_enum (value); + case PROP_BURST_FORMAT: + multisocketsink->def_burst_format = g_value_get_enum (value); break; case PROP_BURST_VALUE: - multifdsink->def_burst_value = g_value_get_uint64 (value); - break; - case PROP_QOS_DSCP: - multifdsink->qos_dscp = g_value_get_int (value); - setup_dscp (multifdsink); + multisocketsink->def_burst_value = g_value_get_uint64 (value); break; case PROP_HANDLE_READ: - multifdsink->handle_read = g_value_get_boolean (value); + multisocketsink->handle_read = g_value_get_boolean (value); break; case PROP_RESEND_STREAMHEADER: - multifdsink->resend_streamheader = g_value_get_boolean (value); + multisocketsink->resend_streamheader = g_value_get_boolean (value); break; default: @@ -2649,82 +2397,77 @@ gst_multi_fd_sink_set_property (GObject * object, guint prop_id, } static void -gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value, - GParamSpec * pspec) +gst_multi_socket_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) { - GstMultiFdSink *multifdsink; + GstMultiSocketSink *multisocketsink; - multifdsink = GST_MULTI_FD_SINK (object); + multisocketsink = GST_MULTI_SOCKET_SINK (object); switch (prop_id) { - case PROP_MODE: - g_value_set_enum (value, multifdsink->mode); - break; case PROP_BUFFERS_MAX: - g_value_set_int (value, multifdsink->units_max); + g_value_set_int (value, multisocketsink->units_max); break; case PROP_BUFFERS_SOFT_MAX: - g_value_set_int (value, multifdsink->units_soft_max); + g_value_set_int (value, multisocketsink->units_soft_max); break; case PROP_TIME_MIN: - g_value_set_int64 (value, multifdsink->time_min); + g_value_set_int64 (value, multisocketsink->time_min); break; case PROP_BYTES_MIN: - g_value_set_int (value, multifdsink->bytes_min); + g_value_set_int (value, multisocketsink->bytes_min); break; case PROP_BUFFERS_MIN: - g_value_set_int (value, multifdsink->buffers_min); + g_value_set_int (value, multisocketsink->buffers_min); break; case PROP_BUFFERS_QUEUED: - g_value_set_uint (value, multifdsink->buffers_queued); + g_value_set_uint (value, multisocketsink->buffers_queued); break; case PROP_BYTES_QUEUED: - g_value_set_uint (value, multifdsink->bytes_queued); + g_value_set_uint (value, multisocketsink->bytes_queued); break; case PROP_TIME_QUEUED: - g_value_set_uint64 (value, multifdsink->time_queued); + g_value_set_uint64 (value, multisocketsink->time_queued); break; case PROP_UNIT_TYPE: - g_value_set_enum (value, multifdsink->unit_type); + g_value_set_enum (value, multisocketsink->unit_type); break; case PROP_UNITS_MAX: - g_value_set_int64 (value, multifdsink->units_max); + g_value_set_int64 (value, multisocketsink->units_max); break; case PROP_UNITS_SOFT_MAX: - g_value_set_int64 (value, multifdsink->units_soft_max); + g_value_set_int64 (value, multisocketsink->units_soft_max); break; case PROP_RECOVER_POLICY: - g_value_set_enum (value, multifdsink->recover_policy); + g_value_set_enum (value, multisocketsink->recover_policy); break; case PROP_TIMEOUT: - g_value_set_uint64 (value, multifdsink->timeout); + g_value_set_uint64 (value, multisocketsink->timeout); break; case PROP_SYNC_METHOD: - g_value_set_enum (value, multifdsink->def_sync_method); + g_value_set_enum (value, multisocketsink->def_sync_method); break; case PROP_BYTES_TO_SERVE: - g_value_set_uint64 (value, multifdsink->bytes_to_serve); + g_value_set_uint64 (value, multisocketsink->bytes_to_serve); break; case PROP_BYTES_SERVED: - g_value_set_uint64 (value, multifdsink->bytes_served); + g_value_set_uint64 (value, multisocketsink->bytes_served); break; - case PROP_BURST_UNIT: - g_value_set_enum (value, multifdsink->def_burst_unit); + case PROP_BURST_FORMAT: + g_value_set_enum (value, multisocketsink->def_burst_format); break; case PROP_BURST_VALUE: - g_value_set_uint64 (value, multifdsink->def_burst_value); - break; - case PROP_QOS_DSCP: - g_value_set_int (value, multifdsink->qos_dscp); + g_value_set_uint64 (value, multisocketsink->def_burst_value); break; case PROP_HANDLE_READ: - g_value_set_boolean (value, multifdsink->handle_read); + g_value_set_boolean (value, multisocketsink->handle_read); break; case PROP_RESEND_STREAMHEADER: - g_value_set_boolean (value, multifdsink->resend_streamheader); + g_value_set_boolean (value, multisocketsink->resend_streamheader); break; - case PROP_NUM_FDS: - g_value_set_uint (value, g_hash_table_size (multifdsink->fd_hash)); + case PROP_NUM_SOCKETS: + g_value_set_uint (value, + g_hash_table_size (multisocketsink->socket_hash)); break; default: @@ -2736,20 +2479,38 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value, /* create a socket for sending to remote machine */ static gboolean -gst_multi_fd_sink_start (GstBaseSink * bsink) +gst_multi_socket_sink_start (GstBaseSink * bsink) { - GstMultiFdSinkClass *fclass; - GstMultiFdSink *this; + GstMultiSocketSinkClass *fclass; + GstMultiSocketSink *this; + GList *clients; - if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_FD_SINK_OPEN)) + if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_SOCKET_SINK_OPEN)) return TRUE; - this = GST_MULTI_FD_SINK (bsink); - fclass = GST_MULTI_FD_SINK_GET_CLASS (this); + this = GST_MULTI_SOCKET_SINK (bsink); + fclass = GST_MULTI_SOCKET_SINK_GET_CLASS (this); + + GST_INFO_OBJECT (this, "starting"); + + this->main_context = g_main_context_new (); - GST_INFO_OBJECT (this, "starting in mode %d", this->mode); - if ((this->fdset = gst_poll_new (TRUE)) == NULL) - goto socket_pair; + CLIENTS_LOCK (this); + for (clients = this->clients; clients; clients = clients->next) { + GstSocketClient *client; + + client = clients->data; + if (client->source) + continue; + client->source = + g_socket_create_source (client->socket, + G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, this->cancellable); + g_source_set_callback (client->source, + (GSourceFunc) gst_multi_socket_sink_socket_condition, + gst_object_ref (this), (GDestroyNotify) gst_object_unref); + g_source_attach (client->source, this->main_context); + } + CLIENTS_UNLOCK (this); this->streamheader = NULL; this->bytes_to_serve = 0; @@ -2762,49 +2523,43 @@ gst_multi_fd_sink_start (GstBaseSink * bsink) this->running = TRUE; #if !GLIB_CHECK_VERSION (2, 31, 0) - this->thread = g_thread_create ((GThreadFunc) gst_multi_fd_sink_thread, + this->thread = g_thread_create ((GThreadFunc) gst_multi_socket_sink_thread, this, TRUE, NULL); #else - this->thread = g_thread_new ("multifdsink", - (GThreadFunc) gst_multi_fd_sink_thread, this); + this->thread = g_thread_new ("multisocketsink", + (GThreadFunc) gst_multi_socket_sink_thread, this); #endif - GST_OBJECT_FLAG_SET (this, GST_MULTI_FD_SINK_OPEN); + GST_OBJECT_FLAG_SET (this, GST_MULTI_SOCKET_SINK_OPEN); return TRUE; - - /* ERRORS */ -socket_pair: - { - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL), - GST_ERROR_SYSTEM); - return FALSE; - } } static gboolean -multifdsink_hash_remove (gpointer key, gpointer value, gpointer data) +multisocketsink_hash_remove (gpointer key, gpointer value, gpointer data) { return TRUE; } static gboolean -gst_multi_fd_sink_stop (GstBaseSink * bsink) +gst_multi_socket_sink_stop (GstBaseSink * bsink) { - GstMultiFdSinkClass *fclass; - GstMultiFdSink *this; + GstMultiSocketSinkClass *fclass; + GstMultiSocketSink *this; GstBuffer *buf; - int i; + gint i; - this = GST_MULTI_FD_SINK (bsink); - fclass = GST_MULTI_FD_SINK_GET_CLASS (this); + this = GST_MULTI_SOCKET_SINK (bsink); + fclass = GST_MULTI_SOCKET_SINK_GET_CLASS (this); - if (!GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_FD_SINK_OPEN)) + if (!GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_SOCKET_SINK_OPEN)) return TRUE; this->running = FALSE; - gst_poll_set_flushing (this->fdset, TRUE); + if (this->main_context) + g_main_context_wakeup (this->main_context); + if (this->thread) { GST_DEBUG_OBJECT (this, "joining thread"); g_thread_join (this->thread); @@ -2813,7 +2568,7 @@ gst_multi_fd_sink_stop (GstBaseSink * bsink) } /* free the clients */ - gst_multi_fd_sink_clear (this); + gst_multi_socket_sink_clear (this); if (this->streamheader) { g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL); @@ -2824,11 +2579,13 @@ gst_multi_fd_sink_stop (GstBaseSink * bsink) if (fclass->close) fclass->close (this); - if (this->fdset) { - gst_poll_free (this->fdset); - this->fdset = NULL; + if (this->main_context) { + g_main_context_unref (this->main_context); + this->main_context = NULL; } - g_hash_table_foreach_remove (this->fd_hash, multifdsink_hash_remove, this); + + g_hash_table_foreach_remove (this->socket_hash, multisocketsink_hash_remove, + this); /* remove all queued buffers */ if (this->bufqueue) { @@ -2843,18 +2600,19 @@ gst_multi_fd_sink_stop (GstBaseSink * bsink) } /* freeing the array is done in _finalize */ } - GST_OBJECT_FLAG_UNSET (this, GST_MULTI_FD_SINK_OPEN); + GST_OBJECT_FLAG_UNSET (this, GST_MULTI_SOCKET_SINK_OPEN); return TRUE; } static GstStateChangeReturn -gst_multi_fd_sink_change_state (GstElement * element, GstStateChange transition) +gst_multi_socket_sink_change_state (GstElement * element, + GstStateChange transition) { - GstMultiFdSink *sink; + GstMultiSocketSink *sink; GstStateChangeReturn ret; - sink = GST_MULTI_FD_SINK (element); + sink = GST_MULTI_SOCKET_SINK (element); /* we disallow changing the state from the streaming thread */ if (g_thread_self () == sink->thread) @@ -2863,7 +2621,7 @@ gst_multi_fd_sink_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_NULL_TO_READY: - if (!gst_multi_fd_sink_start (GST_BASE_SINK (sink))) + if (!gst_multi_socket_sink_start (GST_BASE_SINK (sink))) goto start_failed; break; case GST_STATE_CHANGE_READY_TO_PAUSED: @@ -2882,7 +2640,7 @@ gst_multi_fd_sink_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_PAUSED_TO_READY: break; case GST_STATE_CHANGE_READY_TO_NULL: - gst_multi_fd_sink_stop (GST_BASE_SINK (sink)); + gst_multi_socket_sink_stop (GST_BASE_SINK (sink)); break; default: break; @@ -2896,3 +2654,32 @@ start_failed: return GST_STATE_CHANGE_FAILURE; } } + +static gboolean +gst_multi_socket_sink_unlock (GstBaseSink * bsink) +{ + GstMultiSocketSink *sink; + + sink = GST_MULTI_SOCKET_SINK (bsink); + + GST_DEBUG_OBJECT (sink, "set to flushing"); + g_cancellable_cancel (sink->cancellable); + if (sink->main_context) + g_main_context_wakeup (sink->main_context); + + return TRUE; +} + +/* will be called only between calls to start() and stop() */ +static gboolean +gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink) +{ + GstMultiSocketSink *sink; + + sink = GST_MULTI_SOCKET_SINK (bsink); + + GST_DEBUG_OBJECT (sink, "unset flushing"); + g_cancellable_reset (sink->cancellable); + + return TRUE; +} diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultisocketsink.h similarity index 61% rename from gst/tcp/gstmultifdsink.h rename to gst/tcp/gstmultisocketsink.h index b0465af..051b4d5 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultisocketsink.h @@ -1,6 +1,8 @@ /* GStreamer * Copyright (C) <1999> Erik Walthinsen * Copyright (C) <2004> Thomas Vander Stichele + * Copyright (C) <2011> Collabora Ltd. + * Author: Sebastian Dröge * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -19,38 +21,37 @@ */ -#ifndef __GST_MULTI_FD_SINK_H__ -#define __GST_MULTI_FD_SINK_H__ +#ifndef __GST_MULTI_SOCKET_SINK_H__ +#define __GST_MULTI_SOCKET_SINK_H__ #include #include +#include G_BEGIN_DECLS -#include "gsttcp.h" +#define GST_TYPE_MULTI_SOCKET_SINK \ + (gst_multi_socket_sink_get_type()) +#define GST_MULTI_SOCKET_SINK(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_MULTI_SOCKET_SINK,GstMultiSocketSink)) +#define GST_MULTI_SOCKET_SINK_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_MULTI_SOCKET_SINK,GstMultiSocketSinkClass)) +#define GST_IS_MULTI_SOCKET_SINK(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_MULTI_SOCKET_SINK)) +#define GST_IS_MULTI_SOCKET_SINK_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_MULTI_SOCKET_SINK)) +#define GST_MULTI_SOCKET_SINK_GET_CLASS(klass) \ + (G_TYPE_INSTANCE_GET_CLASS ((klass), GST_TYPE_MULTI_SOCKET_SINK, GstMultiSocketSinkClass)) -#define GST_TYPE_MULTI_FD_SINK \ - (gst_multi_fd_sink_get_type()) -#define GST_MULTI_FD_SINK(obj) \ - (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_MULTI_FD_SINK,GstMultiFdSink)) -#define GST_MULTI_FD_SINK_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_MULTI_FD_SINK,GstMultiFdSinkClass)) -#define GST_IS_MULTI_FD_SINK(obj) \ - (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_MULTI_FD_SINK)) -#define GST_IS_MULTI_FD_SINK_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_MULTI_FD_SINK)) -#define GST_MULTI_FD_SINK_GET_CLASS(klass) \ - (G_TYPE_INSTANCE_GET_CLASS ((klass), GST_TYPE_MULTI_FD_SINK, GstMultiFdSinkClass)) - -typedef struct _GstMultiFdSink GstMultiFdSink; -typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass; +typedef struct _GstMultiSocketSink GstMultiSocketSink; +typedef struct _GstMultiSocketSinkClass GstMultiSocketSinkClass; typedef enum { - GST_MULTI_FD_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0), + GST_MULTI_SOCKET_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0), - GST_MULTI_FD_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2) -} GstMultiFdSinkFlags; + GST_MULTI_SOCKET_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2) +} GstMultiSocketSinkFlags; /** * GstRecoverPolicy: @@ -96,23 +97,6 @@ typedef enum } GstSyncMethod; /** - * GstTCPUnitType: - * @GST_TCP_UNIT_TYPE_UNDEFINED: undefined - * @GST_TCP_UNIT_TYPE_BUFFERS : buffers - * @GST_TCP_UNIT_TYPE_TIME : timeunits (in nanoseconds) - * @GST_TCP_UNIT_TYPE_BYTES : bytes - * - * The units used to specify limits. - */ -typedef enum -{ - GST_TCP_UNIT_TYPE_UNDEFINED, - GST_TCP_UNIT_TYPE_BUFFERS, - GST_TCP_UNIT_TYPE_TIME, - GST_TCP_UNIT_TYPE_BYTES -} GstTCPUnitType; - -/** * GstClientStatus: * @GST_CLIENT_STATUS_OK : client is ok * @GST_CLIENT_STATUS_CLOSED : client closed the socket @@ -123,7 +107,7 @@ typedef enum * @GST_CLIENT_STATUS_FLUSHING : client is flushing out the remaining buffers. * * This specifies the reason why a client was removed from - * multifdsink and is received in the "client-removed" signal. + * multisocketsink and is received in the "client-removed" signal. */ typedef enum { @@ -139,30 +123,29 @@ typedef enum /* structure for a client */ typedef struct { - GstPollFD fd; + GSocket *socket; + GSource *source; gint bufpos; /* position of this client in the global queue */ gint flushcount; /* the remaining number of buffers to flush out or -1 if the client is not flushing. */ GstClientStatus status; - gboolean is_socket; GSList *sending; /* the buffers we need to send */ gint bufoffset; /* offset in the first buffer */ gboolean discont; - gboolean caps_sent; gboolean new_connection; gboolean currently_removing; /* method to sync client when connecting */ GstSyncMethod sync_method; - GstTCPUnitType burst_min_unit; + GstFormat burst_min_format; guint64 burst_min_value; - GstTCPUnitType burst_max_unit; + GstFormat burst_max_format; guint64 burst_max_value; GstCaps *caps; /* caps of last queued buffer */ @@ -176,19 +159,19 @@ typedef struct { guint64 avg_queue_size; guint64 first_buffer_ts; guint64 last_buffer_ts; -} GstTCPClient; +} GstSocketClient; -#define CLIENTS_LOCK_INIT(fdsink) (g_static_rec_mutex_init(&fdsink->clientslock)) -#define CLIENTS_LOCK_FREE(fdsink) (g_static_rec_mutex_free(&fdsink->clientslock)) -#define CLIENTS_LOCK(fdsink) (g_static_rec_mutex_lock(&fdsink->clientslock)) -#define CLIENTS_UNLOCK(fdsink) (g_static_rec_mutex_unlock(&fdsink->clientslock)) +#define CLIENTS_LOCK_INIT(socketsink) (g_static_rec_mutex_init(&socketsink->clientslock)) +#define CLIENTS_LOCK_FREE(socketsink) (g_static_rec_mutex_free(&socketsink->clientslock)) +#define CLIENTS_LOCK(socketsink) (g_static_rec_mutex_lock(&socketsink->clientslock)) +#define CLIENTS_UNLOCK(socketsink) (g_static_rec_mutex_unlock(&socketsink->clientslock)) /** - * GstMultiFdSink: + * GstMultiSocketSink: * - * The multifdsink object structure. + * The multisocketsink object structure. */ -struct _GstMultiFdSink { +struct _GstMultiSocketSink { GstBaseSink element; /*< private >*/ @@ -197,17 +180,16 @@ struct _GstMultiFdSink { GStaticRecMutex clientslock; /* lock to protect the clients list */ GList *clients; /* list of clients we are serving */ - GHashTable *fd_hash; /* index on fd to client */ + GHashTable *socket_hash; /* index on socket to client */ guint clients_cookie; /* Cookie to detect changes to the clients list */ - gint mode; - GstPoll *fdset; + GMainContext *main_context; + GCancellable *cancellable; GSList *streamheader; /* GSList of GstBuffers to use as streamheader */ gboolean previous_buffer_in_caps; guint mtu; - gint qos_dscp; gboolean handle_read; GArray *bufqueue; /* global queue of buffers */ @@ -217,14 +199,14 @@ struct _GstMultiFdSink { /* these values are used to check if a client is reading fast * enough and to control receovery */ - GstTCPUnitType unit_type;/* the type of the units */ + GstFormat unit_type;/* the format of the units */ gint64 units_max; /* max units to queue for a client */ gint64 units_soft_max; /* max units a client can lag before recovery starts */ GstRecoverPolicy recover_policy; GstClockTime timeout; /* max amount of nanoseconds to remain idle */ GstSyncMethod def_sync_method; /* what method to use for connecting clients */ - GstTCPUnitType def_burst_unit; + GstFormat def_burst_format; guint64 def_burst_value; /* these values are used to control the amount of data @@ -244,42 +226,41 @@ struct _GstMultiFdSink { guint8 header_flags; }; -struct _GstMultiFdSinkClass { +struct _GstMultiSocketSinkClass { GstBaseSinkClass parent_class; /* element methods */ - void (*add) (GstMultiFdSink *sink, int fd); - void (*add_full) (GstMultiFdSink *sink, int fd, GstSyncMethod sync, - GstTCPUnitType format, guint64 value, - GstTCPUnitType max_unit, guint64 max_value); - void (*remove) (GstMultiFdSink *sink, int fd); - void (*remove_flush) (GstMultiFdSink *sink, int fd); - void (*clear) (GstMultiFdSink *sink); - GValueArray* (*get_stats) (GstMultiFdSink *sink, int fd); + void (*add) (GstMultiSocketSink *sink, GSocket *socket); + void (*add_full) (GstMultiSocketSink *sink, GSocket *socket, GstSyncMethod sync, + GstFormat format, guint64 value, + GstFormat max_format, guint64 max_value); + void (*remove) (GstMultiSocketSink *sink, GSocket *socket); + void (*remove_flush) (GstMultiSocketSink *sink, GSocket *socket); + void (*clear) (GstMultiSocketSink *sink); + GstStructure* (*get_stats) (GstMultiSocketSink *sink, GSocket *socket); /* vtable */ - gboolean (*init) (GstMultiFdSink *sink); - gboolean (*wait) (GstMultiFdSink *sink, GstPoll *set); - gboolean (*close) (GstMultiFdSink *sink); - void (*removed) (GstMultiFdSink *sink, int fd); + gboolean (*init) (GstMultiSocketSink *sink); + gboolean (*close) (GstMultiSocketSink *sink); + void (*removed) (GstMultiSocketSink *sink, GSocket *socket); /* signals */ - void (*client_added) (GstElement *element, gint fd); - void (*client_removed) (GstElement *element, gint fd, GstClientStatus status); - void (*client_fd_removed) (GstElement *element, gint fd); + void (*client_added) (GstElement *element, GSocket *socket); + void (*client_removed) (GstElement *element, GSocket *socket, GstClientStatus status); + void (*client_socket_removed) (GstElement *element, GSocket *socket); }; -GType gst_multi_fd_sink_get_type (void); +GType gst_multi_socket_sink_get_type (void); -void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd); -void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync, - GstTCPUnitType min_unit, guint64 min_value, - GstTCPUnitType max_unit, guint64 max_value); -void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd); -void gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, int fd); -void gst_multi_fd_sink_clear (GstMultiFdSink *sink); -GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, int fd); +void gst_multi_socket_sink_add (GstMultiSocketSink *sink, GSocket *socket); +void gst_multi_socket_sink_add_full (GstMultiSocketSink *sink, GSocket *socket, GstSyncMethod sync, + GstFormat min_format, guint64 min_value, + GstFormat max_format, guint64 max_value); +void gst_multi_socket_sink_remove (GstMultiSocketSink *sink, GSocket *socket); +void gst_multi_socket_sink_remove_flush (GstMultiSocketSink *sink, GSocket *socket); +void gst_multi_socket_sink_clear (GstMultiSocketSink *sink); +GstStructure* gst_multi_socket_sink_get_stats (GstMultiSocketSink *sink, GSocket *socket); G_END_DECLS -#endif /* __GST_MULTI_FD_SINK_H__ */ +#endif /* __GST_MULTI_SOCKET_SINK_H__ */ diff --git a/gst/tcp/gsttcp-marshal.list b/gst/tcp/gsttcp-marshal.list index 0d7208e..07ecb9e 100644 --- a/gst/tcp/gsttcp-marshal.list +++ b/gst/tcp/gsttcp-marshal.list @@ -1,5 +1,4 @@ VOID:STRING,UINT -VOID:INT -VOID:INT,BOXED -VOID:INT,ENUM,INT,UINT64,INT,UINT64 -BOXED:INT +VOID:OBJECT,ENUM +VOID:OBJECT,ENUM,ENUM,UINT64,ENUM,UINT64 +BOXED:OBJECT diff --git a/gst/tcp/gsttcp.h b/gst/tcp/gsttcp.h index c588daa..f785fec 100644 --- a/gst/tcp/gsttcp.h +++ b/gst/tcp/gsttcp.h @@ -25,7 +25,6 @@ #include "gsttcp-enumtypes.h" #include -#undef GST_DISABLE_DEPRECATED #define TCP_HIGHEST_PORT 65535 #define TCP_DEFAULT_HOST "localhost" diff --git a/gst/tcp/gsttcpplugin.c b/gst/tcp/gsttcpplugin.c index 02c71b3..d5b2866 100644 --- a/gst/tcp/gsttcpplugin.c +++ b/gst/tcp/gsttcpplugin.c @@ -25,7 +25,7 @@ #include "gsttcpclientsink.h" #include "gsttcpserversrc.h" #include "gsttcpserversink.h" -#include "gstmultifdsink.h" +#include "gstmultisocketsink.h" GST_DEBUG_CATEGORY (tcp_debug); @@ -44,8 +44,8 @@ plugin_init (GstPlugin * plugin) if (!gst_element_register (plugin, "tcpserversrc", GST_RANK_NONE, GST_TYPE_TCP_SERVER_SRC)) return FALSE; - if (!gst_element_register (plugin, "multifdsink", GST_RANK_NONE, - GST_TYPE_MULTI_FD_SINK)) + if (!gst_element_register (plugin, "multisocketsink", GST_RANK_NONE, + GST_TYPE_MULTI_SOCKET_SINK)) return FALSE; GST_DEBUG_CATEGORY_INIT (tcp_debug, "tcp", 0, "TCP calls"); diff --git a/gst/tcp/gsttcpplugin.h b/gst/tcp/gsttcpplugin.h deleted file mode 100644 index 38b91be..0000000 --- a/gst/tcp/gsttcpplugin.h +++ /dev/null @@ -1,40 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - - -#ifndef __GST_TCP_H__ -#define __GST_TCP_H__ - -#ifdef __cplusplus -extern "C" -{ -#endif /* __cplusplus */ - - typedef enum - { - CONTROL_ZERO, - CONTROL_NONE, - CONTROL_TCP - } Gst_TCP_Control; - -#ifdef __cplusplus -} -#endif /* __cplusplus */ - -#endif /* __GST_TCP_H__ */ diff --git a/gst/tcp/gsttcpserversink.c b/gst/tcp/gsttcpserversink.c index a654b84..b3df834 100644 --- a/gst/tcp/gsttcpserversink.c +++ b/gst/tcp/gsttcpserversink.c @@ -41,10 +41,6 @@ #include -#ifdef HAVE_FIONREAD_IN_SYS_FILIO -#include -#endif - #include "gsttcp.h" #include "gsttcpserversink.h" #include "gsttcp-marshal.h" @@ -56,18 +52,17 @@ GST_DEBUG_CATEGORY_STATIC (tcpserversink_debug); enum { - ARG_0, - ARG_HOST, - ARG_PORT, + PROP_0, + PROP_HOST, + PROP_PORT, }; static void gst_tcp_server_sink_finalize (GObject * gobject); -static gboolean gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink, - GstPoll * set); -static gboolean gst_tcp_server_sink_init_send (GstMultiFdSink * this); -static gboolean gst_tcp_server_sink_close (GstMultiFdSink * this); -static void gst_tcp_server_sink_removed (GstMultiFdSink * sink, int fd); +static gboolean gst_tcp_server_sink_init_send (GstMultiSocketSink * this); +static gboolean gst_tcp_server_sink_close (GstMultiSocketSink * this); +static void gst_tcp_server_sink_removed (GstMultiSocketSink * sink, + GSocket * socket); static void gst_tcp_server_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); @@ -75,27 +70,28 @@ static void gst_tcp_server_sink_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); #define gst_tcp_server_sink_parent_class parent_class -G_DEFINE_TYPE (GstTCPServerSink, gst_tcp_server_sink, GST_TYPE_MULTI_FD_SINK); +G_DEFINE_TYPE (GstTCPServerSink, gst_tcp_server_sink, + GST_TYPE_MULTI_SOCKET_SINK); static void gst_tcp_server_sink_class_init (GstTCPServerSinkClass * klass) { GObjectClass *gobject_class; GstElementClass *gstelement_class; - GstMultiFdSinkClass *gstmultifdsink_class; + GstMultiSocketSinkClass *gstmultifdsink_class; gobject_class = (GObjectClass *) klass; gstelement_class = (GstElementClass *) klass; - gstmultifdsink_class = (GstMultiFdSinkClass *) klass; + gstmultifdsink_class = (GstMultiSocketSinkClass *) klass; gobject_class->set_property = gst_tcp_server_sink_set_property; gobject_class->get_property = gst_tcp_server_sink_get_property; gobject_class->finalize = gst_tcp_server_sink_finalize; - g_object_class_install_property (gobject_class, ARG_HOST, + g_object_class_install_property (gobject_class, PROP_HOST, g_param_spec_string ("host", "host", "The host/IP to send the packets to", TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, ARG_PORT, + g_object_class_install_property (gobject_class, PROP_PORT, g_param_spec_int ("port", "port", "The port to send the packets to", 0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); @@ -106,7 +102,6 @@ gst_tcp_server_sink_class_init (GstTCPServerSinkClass * klass) "Thomas Vander Stichele "); gstmultifdsink_class->init = gst_tcp_server_sink_init_send; - gstmultifdsink_class->wait = gst_tcp_server_sink_handle_wait; gstmultifdsink_class->close = gst_tcp_server_sink_close; gstmultifdsink_class->removed = gst_tcp_server_sink_removed; @@ -121,7 +116,7 @@ gst_tcp_server_sink_init (GstTCPServerSink * this) /* this->mtu = 1500; */ this->host = g_strdup (TCP_DEFAULT_HOST); - this->server_sock.fd = -1; + this->server_socket = NULL; } static void @@ -129,7 +124,11 @@ gst_tcp_server_sink_finalize (GObject * gobject) { GstTCPServerSink *this = GST_TCP_SERVER_SINK (gobject); + if (this->server_socket) + g_object_unref (this->server_socket); + this->server_socket = NULL; g_free (this->host); + this->host = NULL; G_OBJECT_CLASS (parent_class)->finalize (gobject); } @@ -139,26 +138,31 @@ gst_tcp_server_sink_finalize (GObject * gobject) static gboolean gst_tcp_server_sink_handle_server_read (GstTCPServerSink * sink) { - /* new client */ - int client_sock_fd; - struct sockaddr_in client_address; - socklen_t client_address_len; - - /* For some stupid reason, client_address and client_address_len has to be - * zeroed */ - memset (&client_address, 0, sizeof (client_address)); - client_address_len = 0; - - client_sock_fd = - accept (sink->server_sock.fd, (struct sockaddr *) &client_address, - &client_address_len); - if (client_sock_fd == -1) + GSocket *client_socket; + GError *err = NULL; + + /* wait on server socket for connections */ + client_socket = + g_socket_accept (sink->server_socket, sink->element.cancellable, &err); + if (!client_socket) goto accept_failed; - gst_multi_fd_sink_add (GST_MULTI_FD_SINK (sink), client_sock_fd); + gst_multi_socket_sink_add (GST_MULTI_SOCKET_SINK (sink), client_socket); + +#ifndef GST_DISABLE_GST_DEBUG + { + GInetSocketAddress *addr = + G_INET_SOCKET_ADDRESS (g_socket_get_remote_address (client_socket, + NULL)); + gchar *ip = + g_inet_address_to_string (g_inet_socket_address_get_address (addr)); - GST_DEBUG_OBJECT (sink, "added new client ip %s with fd %d", - inet_ntoa (client_address.sin_addr), client_sock_fd); + GST_DEBUG_OBJECT (sink, "added new client ip %s:%u with socket %p", + ip, g_inet_socket_address_get_port (addr), client_socket); + + g_free (ip); + } +#endif return TRUE; @@ -166,45 +170,47 @@ gst_tcp_server_sink_handle_server_read (GstTCPServerSink * sink) accept_failed: { GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, (NULL), - ("Could not accept client on server socket %d: %s (%d)", - sink->server_sock.fd, g_strerror (errno), errno)); + ("Could not accept client on server socket %p: %s", + sink->server_socket, err->message)); + g_clear_error (&err); return FALSE; } } static void -gst_tcp_server_sink_removed (GstMultiFdSink * sink, int fd) +gst_tcp_server_sink_removed (GstMultiSocketSink * sink, GSocket * socket) { #ifndef GST_DISABLE_GST_DEBUG GstTCPServerSink *this = GST_TCP_SERVER_SINK (sink); #endif + GError *err = NULL; - GST_LOG_OBJECT (this, "closing fd %d", fd); - if (close (fd) < 0) { - GST_WARNING_OBJECT (this, "error closing fd %d: %s", fd, - g_strerror (errno)); + GST_DEBUG_OBJECT (this, "closing socket"); + + if (!g_socket_close (socket, &err)) { + GST_ERROR_OBJECT (this, "Failed to close socket: %s", err->message); + g_clear_error (&err); } } static gboolean -gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink, GstPoll * set) +gst_tcp_server_sink_socket_condition (GSocket * socket, GIOCondition condition, + GstTCPServerSink * sink) { - GstTCPServerSink *this = GST_TCP_SERVER_SINK (sink); - - if (gst_poll_fd_can_read (set, &this->server_sock)) { - /* handle new client connection on server socket */ - if (!gst_tcp_server_sink_handle_server_read (this)) - goto connection_failed; + if ((condition & G_IO_ERR)) { + goto error; + } else if ((condition & G_IO_IN) || (condition & G_IO_PRI)) { + if (!gst_tcp_server_sink_handle_server_read (sink)) + return FALSE; } + return TRUE; - /* ERRORS */ -connection_failed: - { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("client connection failed: %s", g_strerror (errno))); - return FALSE; - } +error: + GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL), + ("client connection failed")); + + return FALSE; } static void @@ -217,7 +223,7 @@ gst_tcp_server_sink_set_property (GObject * object, guint prop_id, sink = GST_TCP_SERVER_SINK (object); switch (prop_id) { - case ARG_HOST: + case PROP_HOST: if (!g_value_get_string (value)) { g_warning ("host property cannot be NULL"); break; @@ -225,10 +231,9 @@ gst_tcp_server_sink_set_property (GObject * object, guint prop_id, g_free (sink->host); sink->host = g_strdup (g_value_get_string (value)); break; - case ARG_PORT: + case PROP_PORT: sink->server_port = g_value_get_int (value); break; - default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -245,13 +250,12 @@ gst_tcp_server_sink_get_property (GObject * object, guint prop_id, sink = GST_TCP_SERVER_SINK (object); switch (prop_id) { - case ARG_HOST: + case PROP_HOST: g_value_set_string (value, sink->host); break; - case ARG_PORT: + case PROP_PORT: g_value_set_int (value, sink->server_port); break; - default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -261,111 +265,152 @@ gst_tcp_server_sink_get_property (GObject * object, guint prop_id, /* create a socket for sending to remote machine */ static gboolean -gst_tcp_server_sink_init_send (GstMultiFdSink * parent) +gst_tcp_server_sink_init_send (GstMultiSocketSink * parent) { - int ret; GstTCPServerSink *this = GST_TCP_SERVER_SINK (parent); - - /* create sending server socket */ - if ((this->server_sock.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) + GError *err = NULL; + GInetAddress *addr; + GSocketAddress *saddr; + + /* create the server listener socket */ + this->server_socket = + g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_STREAM, + G_SOCKET_PROTOCOL_TCP, &err); + if (!this->server_socket) goto no_socket; - GST_DEBUG_OBJECT (this, "opened sending server socket with fd %d", - this->server_sock.fd); + GST_DEBUG_OBJECT (this, "opened sending server socket with socket %p", + this->server_socket); + + /* look up name if we need to */ + addr = g_inet_address_new_from_string (this->host); + if (!addr) { + GResolver *resolver = g_resolver_get_default (); + GList *results; - /* make address reusable */ - ret = 1; - if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_REUSEADDR, - (void *) &ret, sizeof (ret)) < 0) - goto reuse_failed; + results = + g_resolver_lookup_by_name (resolver, this->host, + this->element.cancellable, &err); + if (!results) + goto name_resolve; + addr = G_INET_ADDRESS (g_object_ref (results->data)); - /* keep connection alive; avoids SIGPIPE during write */ - ret = 1; - if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_KEEPALIVE, - (void *) &ret, sizeof (ret)) < 0) - goto keepalive_failed; + g_resolver_free_addresses (results); + g_object_unref (resolver); + } +#ifndef GST_DISABLE_GST_DEBUG + { + gchar *ip = g_inet_address_to_string (addr); + + GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip); + g_free (ip); + } +#endif - /* name the socket */ - memset (&this->server_sin, 0, sizeof (this->server_sin)); - this->server_sin.sin_family = AF_INET; /* network socket */ - this->server_sin.sin_port = htons (this->server_port); /* on port */ - this->server_sin.sin_addr.s_addr = htonl (INADDR_ANY); /* for hosts */ + g_socket_set_blocking (this->server_socket, FALSE); /* bind it */ + saddr = g_inet_socket_address_new (addr, this->server_port); GST_DEBUG_OBJECT (this, "binding server socket to address"); - ret = bind (this->server_sock.fd, (struct sockaddr *) &this->server_sin, - sizeof (this->server_sin)); - if (ret) + if (!g_socket_bind (this->server_socket, saddr, TRUE, &err)) goto bind_failed; - /* set the server socket to nonblocking */ - fcntl (this->server_sock.fd, F_SETFL, O_NONBLOCK); + GST_DEBUG_OBJECT (this, "listening on server socket"); + g_socket_set_listen_backlog (this->server_socket, TCP_BACKLOG); - GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d", - this->server_sock.fd, TCP_BACKLOG); - if (listen (this->server_sock.fd, TCP_BACKLOG) == -1) + if (!g_socket_listen (this->server_socket, &err)) goto listen_failed; GST_DEBUG_OBJECT (this, - "listened on server socket %d, returning from connection setup", - this->server_sock.fd); - - gst_poll_add_fd (parent->fdset, &this->server_sock); - gst_poll_fd_ctl_read (parent->fdset, &this->server_sock, TRUE); + "listened on server socket %p, returning from connection setup", + this->server_socket); + + this->server_source = + g_socket_create_source (this->server_socket, + G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, + this->element.cancellable); + g_source_set_callback (this->server_source, + (GSourceFunc) gst_tcp_server_sink_socket_condition, gst_object_ref (this), + (GDestroyNotify) gst_object_unref); + g_source_attach (this->server_source, this->element.main_context); return TRUE; /* ERRORS */ no_socket: { - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE, (NULL), GST_ERROR_SYSTEM); + GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), + ("Failed to create socket: %s", err->message)); + g_clear_error (&err); return FALSE; } -reuse_failed: +name_resolve: { - gst_tcp_socket_close (&this->server_sock); - GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL), - ("Could not setsockopt: %s", g_strerror (errno))); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + GST_DEBUG_OBJECT (this, "Cancelled name resolval"); + } else { + GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), + ("Failed to resolve host '%s': %s", this->host, err->message)); + } + g_clear_error (&err); + gst_tcp_server_sink_close (&this->element); return FALSE; } -keepalive_failed: +bind_failed: { - gst_tcp_socket_close (&this->server_sock); - GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL), - ("Could not setsockopt: %s", g_strerror (errno))); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + GST_DEBUG_OBJECT (this, "Cancelled binding"); + } else { + GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), + ("Failed to bind on host '%s:%d': %s", this->host, this->server_port, + err->message)); + } + g_clear_error (&err); + g_object_unref (saddr); + g_object_unref (addr); + gst_tcp_server_sink_close (&this->element); return FALSE; } listen_failed: { - gst_tcp_socket_close (&this->server_sock); - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), - ("Could not listen on server socket: %s", g_strerror (errno))); - return FALSE; - } -bind_failed: - { - gst_tcp_socket_close (&this->server_sock); - switch (errno) { - default: - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), - ("bind on port %d failed: %s", this->server_port, - g_strerror (errno))); - break; + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + GST_DEBUG_OBJECT (this, "Cancelled listening"); + } else { + GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), + ("Failed to listen on host '%s:%d': %s", this->host, + this->server_port, err->message)); } + g_clear_error (&err); + g_object_unref (saddr); + g_object_unref (addr); + gst_tcp_server_sink_close (&this->element); return FALSE; } } static gboolean -gst_tcp_server_sink_close (GstMultiFdSink * parent) +gst_tcp_server_sink_close (GstMultiSocketSink * parent) { GstTCPServerSink *this = GST_TCP_SERVER_SINK (parent); - if (this->server_sock.fd != -1) { - gst_poll_remove_fd (parent->fdset, &this->server_sock); + if (this->server_source) { + g_source_destroy (this->server_source); + g_source_unref (this->server_source); + this->server_source = NULL; + } + + if (this->server_socket) { + GError *err = NULL; + + GST_DEBUG_OBJECT (this, "closing socket"); - close (this->server_sock.fd); - this->server_sock.fd = -1; + if (!g_socket_close (this->server_socket, &err)) { + GST_ERROR_OBJECT (this, "Failed to close socket: %s", err->message); + g_clear_error (&err); + } + g_object_unref (this->server_socket); + this->server_socket = NULL; } + return TRUE; } diff --git a/gst/tcp/gsttcpserversink.h b/gst/tcp/gsttcpserversink.h index ac8846d..0a67ea0 100644 --- a/gst/tcp/gsttcpserversink.h +++ b/gst/tcp/gsttcpserversink.h @@ -24,23 +24,11 @@ #include +#include G_BEGIN_DECLS -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "gstmultifdsink.h" +#include "gstmultisocketsink.h" #define GST_TYPE_TCP_SERVER_SINK \ (gst_tcp_server_sink_get_type()) @@ -68,19 +56,17 @@ typedef enum { * Opaque data structure. */ struct _GstTCPServerSink { - GstMultiFdSink element; + GstMultiSocketSink element; /* server information */ - int server_port; + gint server_port; gchar *host; - struct sockaddr_in server_sin; - - /* socket */ - GstPollFD server_sock; + GSocket *server_socket; + GSource *server_source; }; struct _GstTCPServerSinkClass { - GstMultiFdSinkClass parent_class; + GstMultiSocketSinkClass parent_class; }; GType gst_tcp_server_sink_get_type (void); -- 2.7.4