From b26d44501c2e7e720cd4108aec7b835fe8555dab Mon Sep 17 00:00:00 2001 From: George Kiagiadakis Date: Mon, 24 Apr 2017 16:55:22 +0300 Subject: [PATCH] shout2send: use non-blocking I/O and a configurable network operations timeout This allows timing out on network errors much earlier (currently it takes ~15min to timeout) and we can still unlock and change state in the meantime. https://bugzilla.gnome.org/show_bug.cgi?id=571722 --- ext/shout2/gstshout2.c | 128 +++++++++++++++++++++++++++++++++++++++++++++++-- ext/shout2/gstshout2.h | 7 +++ 2 files changed, 131 insertions(+), 4 deletions(-) diff --git a/ext/shout2/gstshout2.c b/ext/shout2/gstshout2.c index e28155f..64be764 100644 --- a/ext/shout2/gstshout2.c +++ b/ext/shout2/gstshout2.c @@ -70,7 +70,10 @@ enum ARG_PROTOCOL, /* Protocol to connect with */ ARG_MOUNT, /* mountpoint of stream (icecast only) */ - ARG_URL /* the stream's homepage URL */ + ARG_URL, /* the stream's homepage URL */ + + ARG_TIMEOUT /* The max amount of time to wait for + network activity */ }; #define DEFAULT_IP "127.0.0.1" @@ -84,6 +87,7 @@ enum #define DEFAULT_MOUNT "" #define DEFAULT_URL "" #define DEFAULT_PROTOCOL SHOUT2SEND_PROTOCOL_HTTP +#define DEFAULT_TIMEOUT 10000 #ifdef SHOUT_FORMAT_WEBM #define WEBM_CAPS "; video/webm; audio/webm" @@ -207,6 +211,12 @@ gst_shout2send_class_init (GstShout2sendClass * klass) g_param_spec_string ("url", "url", "the stream's homepage URL", DEFAULT_URL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT, + g_param_spec_uint ("timeout", "timeout", + "Max amount of time to wait for network activity, in milliseconds", + 1, G_MAXUINT, DEFAULT_TIMEOUT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /* signals */ gst_shout2send_signals[SIGNAL_CONNECTION_PROBLEM] = g_signal_new ("connection-problem", G_TYPE_FROM_CLASS (klass), @@ -253,6 +263,7 @@ gst_shout2send_init (GstShout2send * shout2send) shout2send->url = g_strdup (DEFAULT_URL); shout2send->protocol = DEFAULT_PROTOCOL; shout2send->ispublic = DEFAULT_PUBLIC; + shout2send->timeout = DEFAULT_TIMEOUT; shout2send->format = -1; shout2send->tags = gst_tag_list_new_empty (); @@ -518,20 +529,53 @@ set_failed: static GstFlowReturn gst_shout2send_connect (GstShout2send * sink) { + GstFlowReturn fret = GST_FLOW_OK; + gint ret; + GstClockTime start_ts; + GST_DEBUG_OBJECT (sink, "Connection format is: %d", sink->format); if (sink->format == -1) goto no_caps; + if (shout_set_nonblocking (sink->conn, 1) != SHOUTERR_SUCCESS) + goto could_not_set_nonblocking; + if (shout_set_format (sink->conn, sink->format) != SHOUTERR_SUCCESS) goto could_not_set_format; - if (shout_open (sink->conn) != SHOUTERR_SUCCESS) + GST_DEBUG_OBJECT (sink, "connecting"); + + start_ts = gst_util_get_timestamp (); + ret = shout_open (sink->conn); + + /* wait for connection or timeout */ + while (ret == SHOUTERR_BUSY) { + if (gst_util_get_timestamp () - start_ts > sink->timeout * GST_MSECOND) { + goto connection_timeout; + } + if (gst_poll_wait (sink->timer, 10 * GST_MSECOND) == -1) { + GST_LOG_OBJECT (sink, "unlocked"); + + fret = gst_base_sink_wait_preroll (GST_BASE_SINK (sink)); + if (fret != GST_FLOW_OK) + goto done; + } + ret = shout_get_connected (sink->conn); + } + + if (ret != SHOUTERR_CONNECTED && ret != SHOUTERR_SUCCESS) goto could_not_connect; GST_DEBUG_OBJECT (sink, "connected to server"); sink->connected = TRUE; + /* initialize sending rate monitoring */ + sink->prev_queuelen = 0; + sink->data_sent = 0; + sink->stalled = TRUE; + sink->datasent_reset_ts = sink->stalled_ts = gst_util_get_timestamp (); + /* let's set metadata */ if (sink->songmetadata) { shout_metadata_t *pmetadata; @@ -543,7 +587,8 @@ gst_shout2send_connect (GstShout2send * sink) shout_metadata_free (pmetadata); } - return GST_FLOW_OK; +done: + return fret; /* ERRORS */ no_caps: @@ -553,6 +598,14 @@ no_caps: return GST_FLOW_NOT_NEGOTIATED; } +could_not_set_nonblocking: + { + GST_ELEMENT_ERROR (sink, LIBRARY, SETTINGS, (NULL), + ("Error configuring libshout to use non-blocking i/o: %s", + shout_get_error (sink->conn))); + return GST_FLOW_ERROR; + } + could_not_set_format: { GST_ELEMENT_ERROR (sink, LIBRARY, SETTINGS, (NULL), @@ -569,6 +622,15 @@ could_not_connect: shout_get_errno (sink->conn)); return GST_FLOW_ERROR; } + +connection_timeout: + { + GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, + (_("Could not connect to server")), ("connection timed out")); + g_signal_emit (sink, gst_shout2send_signals[SIGNAL_CONNECTION_PROBLEM], 0, + shout_get_errno (sink->conn)); + return GST_FLOW_ERROR; + } } static gboolean @@ -628,6 +690,8 @@ gst_shout2send_render (GstBaseSink * basesink, GstBuffer * buf) gint delay; GstFlowReturn fret = GST_FLOW_OK; GstMapInfo map; + GstClockTime now; + ssize_t queuelen; sink = GST_SHOUT2SEND (basesink); @@ -655,13 +719,54 @@ gst_shout2send_render (GstBaseSink * basesink, GstBuffer * buf) GST_LOG_OBJECT (sink, "we're %d msec late", -delay); } + /* accumulate how much data have actually been sent + * to the network since the last call to shout_send() */ + queuelen = shout_queuelen (sink->conn); + if (sink->prev_queuelen > 0) + sink->data_sent += sink->prev_queuelen - queuelen; + gst_buffer_map (buf, &map, GST_MAP_READ); - GST_LOG_OBJECT (sink, "sending %u bytes of data", (guint) map.size); + + /* add map.size instead of re-reading the queue length because + * the data may actually be sent immediately */ + sink->prev_queuelen = queuelen + map.size; + + GST_LOG_OBJECT (sink, "sending %u bytes of data, queue length now is %" + G_GUINT64_FORMAT, (guint) map.size, sink->prev_queuelen); + ret = shout_send (sink->conn, map.data, map.size); + gst_buffer_unmap (buf, &map); if (ret != SHOUTERR_SUCCESS) goto send_error; + now = gst_util_get_timestamp (); + if (now - sink->datasent_reset_ts >= 500 * GST_MSECOND) { + guint64 send_rate; + + send_rate = gst_util_uint64_scale (sink->data_sent, GST_SECOND, + now - sink->datasent_reset_ts); + + if (send_rate == 0 && !sink->stalled) { + sink->stalled = TRUE; + sink->stalled_ts = now; + } else if (send_rate > 0 && sink->stalled) { + sink->stalled = FALSE; + } + + sink->data_sent = 0; + sink->datasent_reset_ts = now; + + GST_DEBUG_OBJECT (sink, "sending rate is %" G_GUINT64_FORMAT " bps, " + "stalled %d, stalled_ts %" GST_TIME_FORMAT, send_rate, sink->stalled, + GST_TIME_ARGS (sink->stalled_ts)); + + if (sink->stalled && now - sink->stalled_ts >= sink->timeout * GST_MSECOND) { + GST_WARNING_OBJECT (sink, "network send queue is stalled for too long"); + goto network_error; + } + } + done: return fret; @@ -675,6 +780,15 @@ send_error: shout_get_errno (sink->conn)); return GST_FLOW_ERROR; } + +network_error: + { + GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL), + ("network timeout reached")); + g_signal_emit (sink, gst_shout2send_signals[SIGNAL_CONNECTION_PROBLEM], 0, + SHOUTERR_BUSY); + return GST_FLOW_ERROR; + } } static void @@ -727,6 +841,9 @@ gst_shout2send_set_property (GObject * object, guint prop_id, g_free (shout2send->url); shout2send->url = g_strdup (g_value_get_string (value)); break; + case ARG_TIMEOUT: + shout2send->timeout = g_value_get_uint (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -775,6 +892,9 @@ gst_shout2send_get_property (GObject * object, guint prop_id, case ARG_URL: /* the stream's homepage URL */ g_value_set_string (value, shout2send->url); break; + case ARG_TIMEOUT: + g_value_set_uint (value, shout2send->timeout); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; diff --git a/ext/shout2/gstshout2.h b/ext/shout2/gstshout2.h index 2222bac..eaed730 100644 --- a/ext/shout2/gstshout2.h +++ b/ext/shout2/gstshout2.h @@ -46,6 +46,12 @@ struct _GstShout2send { shout_t *conn; + guint64 prev_queuelen; + guint64 data_sent; + GstClockTime datasent_reset_ts; + gboolean stalled; + GstClockTime stalled_ts; + gchar *ip; guint port; gchar *password; @@ -61,6 +67,7 @@ struct _GstShout2send { gchar *songartist; gchar *songtitle; int format; + uint timeout; GstTagList* tags; }; -- 2.7.4