ARG_PROTOCOL, /* Protocol to connect with */
ARG_MOUNT, /* mountpoint of stream (icecast only) */
- ARG_URL /* the stream's homepage URL */
+ ARG_URL, /* the stream's homepage URL */
+
+ ARG_TIMEOUT /* The max amount of time to wait for
+ network activity */
};
#define DEFAULT_IP "127.0.0.1"
#define DEFAULT_MOUNT ""
#define DEFAULT_URL ""
#define DEFAULT_PROTOCOL SHOUT2SEND_PROTOCOL_HTTP
+#define DEFAULT_TIMEOUT 10000
#ifdef SHOUT_FORMAT_WEBM
#define WEBM_CAPS "; video/webm; audio/webm"
g_param_spec_string ("url", "url", "the stream's homepage URL",
DEFAULT_URL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT,
+ g_param_spec_uint ("timeout", "timeout",
+ "Max amount of time to wait for network activity, in milliseconds",
+ 1, G_MAXUINT, DEFAULT_TIMEOUT,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
/* signals */
gst_shout2send_signals[SIGNAL_CONNECTION_PROBLEM] =
g_signal_new ("connection-problem", G_TYPE_FROM_CLASS (klass),
gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_shout2send_event);
gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_shout2send_setcaps);
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&sink_template));
+ gst_element_class_add_static_pad_template (gstelement_class, &sink_template);
gst_element_class_set_static_metadata (gstelement_class,
"Icecast network sink",
{
gst_base_sink_set_sync (GST_BASE_SINK (shout2send), FALSE);
- shout2send->timer = gst_poll_new_timer ();
+ shout2send->timer = gst_poll_new (TRUE);
shout2send->ip = g_strdup (DEFAULT_IP);
shout2send->port = DEFAULT_PORT;
shout2send->url = g_strdup (DEFAULT_URL);
shout2send->protocol = DEFAULT_PROTOCOL;
shout2send->ispublic = DEFAULT_PUBLIC;
+ shout2send->timeout = DEFAULT_TIMEOUT;
shout2send->format = -1;
shout2send->tags = gst_tag_list_new_empty ();
goto set_failed;
cur_prop = "username";
- GST_DEBUG_OBJECT (sink, "setting %s: %s", cur_prop, "source");
+ GST_DEBUG_OBJECT (sink, "setting %s: %s", cur_prop, sink->username);
if (shout_set_user (sink->conn, sink->username) != SHOUTERR_SUCCESS)
goto set_failed;
static GstFlowReturn
gst_shout2send_connect (GstShout2send * sink)
{
+ GstFlowReturn fret = GST_FLOW_OK;
+ gint ret;
+ GstClockTime start_ts;
+
GST_DEBUG_OBJECT (sink, "Connection format is: %d", sink->format);
if (sink->format == -1)
goto no_caps;
+ if (shout_set_nonblocking (sink->conn, 1) != SHOUTERR_SUCCESS)
+ goto could_not_set_nonblocking;
+
if (shout_set_format (sink->conn, sink->format) != SHOUTERR_SUCCESS)
goto could_not_set_format;
- if (shout_open (sink->conn) != SHOUTERR_SUCCESS)
+ GST_DEBUG_OBJECT (sink, "connecting");
+
+ start_ts = gst_util_get_timestamp ();
+ ret = shout_open (sink->conn);
+
+ /* wait for connection or timeout */
+ while (ret == SHOUTERR_BUSY) {
+ if (gst_util_get_timestamp () - start_ts > sink->timeout * GST_MSECOND) {
+ goto connection_timeout;
+ }
+ if (gst_poll_wait (sink->timer, 10 * GST_MSECOND) == -1) {
+ GST_LOG_OBJECT (sink, "unlocked");
+
+ fret = gst_base_sink_wait_preroll (GST_BASE_SINK (sink));
+ if (fret != GST_FLOW_OK)
+ goto done;
+ }
+ ret = shout_get_connected (sink->conn);
+ }
+
+ if (ret != SHOUTERR_CONNECTED && ret != SHOUTERR_SUCCESS)
goto could_not_connect;
GST_DEBUG_OBJECT (sink, "connected to server");
sink->connected = TRUE;
+ /* initialize sending rate monitoring */
+ sink->prev_queuelen = 0;
+ sink->data_sent = 0;
+ sink->stalled = TRUE;
+ sink->datasent_reset_ts = sink->stalled_ts = gst_util_get_timestamp ();
+
/* let's set metadata */
if (sink->songmetadata) {
shout_metadata_t *pmetadata;
shout_metadata_free (pmetadata);
}
- return GST_FLOW_OK;
+done:
+ return fret;
/* ERRORS */
no_caps:
return GST_FLOW_NOT_NEGOTIATED;
}
+could_not_set_nonblocking:
+ {
+ GST_ELEMENT_ERROR (sink, LIBRARY, SETTINGS, (NULL),
+ ("Error configuring libshout to use non-blocking i/o: %s",
+ shout_get_error (sink->conn)));
+ return GST_FLOW_ERROR;
+ }
+
could_not_set_format:
{
GST_ELEMENT_ERROR (sink, LIBRARY, SETTINGS, (NULL),
shout_get_errno (sink->conn));
return GST_FLOW_ERROR;
}
+
+connection_timeout:
+ {
+ GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE,
+ (_("Could not connect to server")), ("connection timed out"));
+ g_signal_emit (sink, gst_shout2send_signals[SIGNAL_CONNECTION_PROBLEM], 0,
+ shout_get_errno (sink->conn));
+ return GST_FLOW_ERROR;
+ }
}
static gboolean
gint delay;
GstFlowReturn fret = GST_FLOW_OK;
GstMapInfo map;
+ GstClockTime now;
+ ssize_t queuelen;
sink = GST_SHOUT2SEND (basesink);
GST_LOG_OBJECT (sink, "we're %d msec late", -delay);
}
+ /* accumulate how much data have actually been sent
+ * to the network since the last call to shout_send() */
+ queuelen = shout_queuelen (sink->conn);
+ if (sink->prev_queuelen > 0)
+ sink->data_sent += sink->prev_queuelen - queuelen;
+
gst_buffer_map (buf, &map, GST_MAP_READ);
- GST_LOG_OBJECT (sink, "sending %u bytes of data", (guint) map.size);
+
+ /* add map.size instead of re-reading the queue length because
+ * the data may actually be sent immediately */
+ sink->prev_queuelen = queuelen + map.size;
+
+ GST_LOG_OBJECT (sink, "sending %u bytes of data, queue length now is %"
+ G_GUINT64_FORMAT, (guint) map.size, sink->prev_queuelen);
+
ret = shout_send (sink->conn, map.data, map.size);
+
gst_buffer_unmap (buf, &map);
if (ret != SHOUTERR_SUCCESS)
goto send_error;
+ now = gst_util_get_timestamp ();
+ if (now - sink->datasent_reset_ts >= 500 * GST_MSECOND) {
+ guint64 send_rate;
+
+ send_rate = gst_util_uint64_scale (sink->data_sent, GST_SECOND,
+ now - sink->datasent_reset_ts);
+
+ if (send_rate == 0 && !sink->stalled) {
+ sink->stalled = TRUE;
+ sink->stalled_ts = now;
+ } else if (send_rate > 0 && sink->stalled) {
+ sink->stalled = FALSE;
+ }
+
+ sink->data_sent = 0;
+ sink->datasent_reset_ts = now;
+
+ GST_DEBUG_OBJECT (sink, "sending rate is %" G_GUINT64_FORMAT " bps, "
+ "stalled %d, stalled_ts %" GST_TIME_FORMAT, send_rate, sink->stalled,
+ GST_TIME_ARGS (sink->stalled_ts));
+
+ if (sink->stalled && now - sink->stalled_ts >= sink->timeout * GST_MSECOND) {
+ GST_WARNING_OBJECT (sink, "network send queue is stalled for too long");
+ goto network_error;
+ }
+ }
+
done:
return fret;
shout_get_errno (sink->conn));
return GST_FLOW_ERROR;
}
+
+network_error:
+ {
+ GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
+ ("network timeout reached"));
+ g_signal_emit (sink, gst_shout2send_signals[SIGNAL_CONNECTION_PROBLEM], 0,
+ SHOUTERR_BUSY);
+ return GST_FLOW_ERROR;
+ }
}
static void
g_free (shout2send->url);
shout2send->url = g_strdup (g_value_get_string (value));
break;
+ case ARG_TIMEOUT:
+ shout2send->timeout = g_value_get_uint (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case ARG_URL: /* the stream's homepage URL */
g_value_set_string (value, shout2send->url);
break;
+ case ARG_TIMEOUT:
+ g_value_set_uint (value, shout2send->timeout);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
GST_VERSION_MINOR,
- shout2send,
+ shout2,
"Sends data to an icecast server using libshout2",
- plugin_init,
- VERSION, "LGPL", "libshout2", "http://www.icecast.org/download.html")
+ plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)