rtpvrawpay: don't use buffer lists if everything fits into one buffer
[platform/upstream/gst-plugins-good.git] / ext / shout2 / gstshout2.c
index e154ba8..3d3f652 100644 (file)
@@ -70,7 +70,10 @@ enum
   ARG_PROTOCOL,                 /* Protocol to connect with */
 
   ARG_MOUNT,                    /* mountpoint of stream (icecast only) */
-  ARG_URL                       /* the stream's homepage URL */
+  ARG_URL,                      /* the stream's homepage URL */
+
+  ARG_TIMEOUT                   /* The max amount of time to wait for
+                                   network activity */
 };
 
 #define DEFAULT_IP           "127.0.0.1"
@@ -84,6 +87,7 @@ enum
 #define DEFAULT_MOUNT        ""
 #define DEFAULT_URL          ""
 #define DEFAULT_PROTOCOL     SHOUT2SEND_PROTOCOL_HTTP
+#define DEFAULT_TIMEOUT      10000
 
 #ifdef SHOUT_FORMAT_WEBM
 #define WEBM_CAPS "; video/webm; audio/webm"
@@ -207,6 +211,12 @@ gst_shout2send_class_init (GstShout2sendClass * klass)
       g_param_spec_string ("url", "url", "the stream's homepage URL",
           DEFAULT_URL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT,
+      g_param_spec_uint ("timeout", "timeout",
+          "Max amount of time to wait for network activity, in milliseconds",
+          1, G_MAXUINT, DEFAULT_TIMEOUT,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   /* signals */
   gst_shout2send_signals[SIGNAL_CONNECTION_PROBLEM] =
       g_signal_new ("connection-problem", G_TYPE_FROM_CLASS (klass),
@@ -223,8 +233,7 @@ gst_shout2send_class_init (GstShout2sendClass * klass)
   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_shout2send_event);
   gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_shout2send_setcaps);
 
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&sink_template));
+  gst_element_class_add_static_pad_template (gstelement_class, &sink_template);
 
   gst_element_class_set_static_metadata (gstelement_class,
       "Icecast network sink",
@@ -241,7 +250,7 @@ gst_shout2send_init (GstShout2send * shout2send)
 {
   gst_base_sink_set_sync (GST_BASE_SINK (shout2send), FALSE);
 
-  shout2send->timer = gst_poll_new_timer ();
+  shout2send->timer = gst_poll_new (TRUE);
 
   shout2send->ip = g_strdup (DEFAULT_IP);
   shout2send->port = DEFAULT_PORT;
@@ -254,6 +263,7 @@ gst_shout2send_init (GstShout2send * shout2send)
   shout2send->url = g_strdup (DEFAULT_URL);
   shout2send->protocol = DEFAULT_PROTOCOL;
   shout2send->ispublic = DEFAULT_PUBLIC;
+  shout2send->timeout = DEFAULT_TIMEOUT;
 
   shout2send->format = -1;
   shout2send->tags = gst_tag_list_new_empty ();
@@ -492,7 +502,7 @@ gst_shout2send_start (GstBaseSink * basesink)
     goto set_failed;
 
   cur_prop = "username";
-  GST_DEBUG_OBJECT (sink, "setting %s: %s", cur_prop, "source");
+  GST_DEBUG_OBJECT (sink, "setting %s: %s", cur_prop, sink->username);
   if (shout_set_user (sink->conn, sink->username) != SHOUTERR_SUCCESS)
     goto set_failed;
 
@@ -519,20 +529,53 @@ set_failed:
 static GstFlowReturn
 gst_shout2send_connect (GstShout2send * sink)
 {
+  GstFlowReturn fret = GST_FLOW_OK;
+  gint ret;
+  GstClockTime start_ts;
+
   GST_DEBUG_OBJECT (sink, "Connection format is: %d", sink->format);
 
   if (sink->format == -1)
     goto no_caps;
 
+  if (shout_set_nonblocking (sink->conn, 1) != SHOUTERR_SUCCESS)
+    goto could_not_set_nonblocking;
+
   if (shout_set_format (sink->conn, sink->format) != SHOUTERR_SUCCESS)
     goto could_not_set_format;
 
-  if (shout_open (sink->conn) != SHOUTERR_SUCCESS)
+  GST_DEBUG_OBJECT (sink, "connecting");
+
+  start_ts = gst_util_get_timestamp ();
+  ret = shout_open (sink->conn);
+
+  /* wait for connection or timeout */
+  while (ret == SHOUTERR_BUSY) {
+    if (gst_util_get_timestamp () - start_ts > sink->timeout * GST_MSECOND) {
+      goto connection_timeout;
+    }
+    if (gst_poll_wait (sink->timer, 10 * GST_MSECOND) == -1) {
+      GST_LOG_OBJECT (sink, "unlocked");
+
+      fret = gst_base_sink_wait_preroll (GST_BASE_SINK (sink));
+      if (fret != GST_FLOW_OK)
+        goto done;
+    }
+    ret = shout_get_connected (sink->conn);
+  }
+
+  if (ret != SHOUTERR_CONNECTED && ret != SHOUTERR_SUCCESS)
     goto could_not_connect;
 
   GST_DEBUG_OBJECT (sink, "connected to server");
   sink->connected = TRUE;
 
+  /* initialize sending rate monitoring */
+  sink->prev_queuelen = 0;
+  sink->data_sent = 0;
+  sink->stalled = TRUE;
+  sink->datasent_reset_ts = sink->stalled_ts = gst_util_get_timestamp ();
+
   /* let's set metadata */
   if (sink->songmetadata) {
     shout_metadata_t *pmetadata;
@@ -544,7 +587,8 @@ gst_shout2send_connect (GstShout2send * sink)
     shout_metadata_free (pmetadata);
   }
 
-  return GST_FLOW_OK;
+done:
+  return fret;
 
 /* ERRORS */
 no_caps:
@@ -554,6 +598,14 @@ no_caps:
     return GST_FLOW_NOT_NEGOTIATED;
   }
 
+could_not_set_nonblocking:
+  {
+    GST_ELEMENT_ERROR (sink, LIBRARY, SETTINGS, (NULL),
+        ("Error configuring libshout to use non-blocking i/o: %s",
+            shout_get_error (sink->conn)));
+    return GST_FLOW_ERROR;
+  }
+
 could_not_set_format:
   {
     GST_ELEMENT_ERROR (sink, LIBRARY, SETTINGS, (NULL),
@@ -570,6 +622,15 @@ could_not_connect:
         shout_get_errno (sink->conn));
     return GST_FLOW_ERROR;
   }
+
+connection_timeout:
+  {
+    GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE,
+        (_("Could not connect to server")), ("connection timed out"));
+    g_signal_emit (sink, gst_shout2send_signals[SIGNAL_CONNECTION_PROBLEM], 0,
+        shout_get_errno (sink->conn));
+    return GST_FLOW_ERROR;
+  }
 }
 
 static gboolean
@@ -629,6 +690,8 @@ gst_shout2send_render (GstBaseSink * basesink, GstBuffer * buf)
   gint delay;
   GstFlowReturn fret = GST_FLOW_OK;
   GstMapInfo map;
+  GstClockTime now;
+  ssize_t queuelen;
 
   sink = GST_SHOUT2SEND (basesink);
 
@@ -656,13 +719,54 @@ gst_shout2send_render (GstBaseSink * basesink, GstBuffer * buf)
     GST_LOG_OBJECT (sink, "we're %d msec late", -delay);
   }
 
+  /* accumulate how much data have actually been sent
+   * to the network since the last call to shout_send() */
+  queuelen = shout_queuelen (sink->conn);
+  if (sink->prev_queuelen > 0)
+    sink->data_sent += sink->prev_queuelen - queuelen;
+
   gst_buffer_map (buf, &map, GST_MAP_READ);
-  GST_LOG_OBJECT (sink, "sending %u bytes of data", (guint) map.size);
+
+  /* add map.size instead of re-reading the queue length because
+   * the data may actually be sent immediately */
+  sink->prev_queuelen = queuelen + map.size;
+
+  GST_LOG_OBJECT (sink, "sending %u bytes of data, queue length now is %"
+      G_GUINT64_FORMAT, (guint) map.size, sink->prev_queuelen);
+
   ret = shout_send (sink->conn, map.data, map.size);
+
   gst_buffer_unmap (buf, &map);
   if (ret != SHOUTERR_SUCCESS)
     goto send_error;
 
+  now = gst_util_get_timestamp ();
+  if (now - sink->datasent_reset_ts >= 500 * GST_MSECOND) {
+    guint64 send_rate;
+
+    send_rate = gst_util_uint64_scale (sink->data_sent, GST_SECOND,
+        now - sink->datasent_reset_ts);
+
+    if (send_rate == 0 && !sink->stalled) {
+      sink->stalled = TRUE;
+      sink->stalled_ts = now;
+    } else if (send_rate > 0 && sink->stalled) {
+      sink->stalled = FALSE;
+    }
+
+    sink->data_sent = 0;
+    sink->datasent_reset_ts = now;
+
+    GST_DEBUG_OBJECT (sink, "sending rate is %" G_GUINT64_FORMAT " bps, "
+        "stalled %d, stalled_ts %" GST_TIME_FORMAT, send_rate, sink->stalled,
+        GST_TIME_ARGS (sink->stalled_ts));
+
+    if (sink->stalled && now - sink->stalled_ts >= sink->timeout * GST_MSECOND) {
+      GST_WARNING_OBJECT (sink, "network send queue is stalled for too long");
+      goto network_error;
+    }
+  }
+
 done:
 
   return fret;
@@ -676,6 +780,15 @@ send_error:
         shout_get_errno (sink->conn));
     return GST_FLOW_ERROR;
   }
+
+network_error:
+  {
+    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
+        ("network timeout reached"));
+    g_signal_emit (sink, gst_shout2send_signals[SIGNAL_CONNECTION_PROBLEM], 0,
+        SHOUTERR_BUSY);
+    return GST_FLOW_ERROR;
+  }
 }
 
 static void
@@ -728,6 +841,9 @@ gst_shout2send_set_property (GObject * object, guint prop_id,
       g_free (shout2send->url);
       shout2send->url = g_strdup (g_value_get_string (value));
       break;
+    case ARG_TIMEOUT:
+      shout2send->timeout = g_value_get_uint (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -776,6 +892,9 @@ gst_shout2send_get_property (GObject * object, guint prop_id,
     case ARG_URL:              /* the stream's homepage URL */
       g_value_set_string (value, shout2send->url);
       break;
+    case ARG_TIMEOUT:
+      g_value_set_uint (value, shout2send->timeout);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -824,7 +943,6 @@ plugin_init (GstPlugin * plugin)
 
 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
     GST_VERSION_MINOR,
-    shout2send,
+    shout2,
     "Sends data to an icecast server using libshout2",
-    plugin_init,
-    VERSION, "LGPL", "libshout2", "http://www.icecast.org/download.html")
+    plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)