rtpvrawpay: don't use buffer lists if everything fits into one buffer
[platform/upstream/gst-plugins-good.git] / ext / shout2 / gstshout2.c
index b703ecc..3d3f652 100644 (file)
  *
  * You should have received a copy of the GNU Library General Public
  * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-shout2send
+ *
+ * shout2send pushes a media stream to an Icecast server
+ *
+ * <refsect2>
+ * <title>Example launch line</title>
+ * |[
+ * gst-launch-1.0 uridecodebin uri=file:///path/to/audiofile ! audioconvert ! vorbisenc ! oggmux ! shout2send mount=/stream.ogg port=8000 username=source password=somepassword ip=server_IP_address_or_hostname
+ * ]| This pipeline demuxes, decodes, re-encodes and re-muxes an audio
+ * media file into oggvorbis and sends the resulting stream to an Icecast
+ * server. Properties mount, port, username and password are all server-config
+ * dependent.
+ * </refsect2>
  */
 
 #ifdef HAVE_CONFIG_H
@@ -35,14 +51,14 @@ GST_DEBUG_CATEGORY_STATIC (shout2_debug);
 
 enum
 {
-  SIGNAL_CONNECTION_PROBLEM,    /* 0.11 FIXME: remove this */
+  SIGNAL_CONNECTION_PROBLEM,    /* FIXME 2.0: remove this */
   LAST_SIGNAL
 };
 
 enum
 {
   ARG_0,
-  ARG_IP,                       /* the ip of the server */
+  ARG_IP,                       /* the IP address or hostname of the server */
   ARG_PORT,                     /* the encoder port number on the server */
   ARG_PASSWORD,                 /* the encoder password on the server */
   ARG_USERNAME,                 /* the encoder username on the server */
@@ -54,7 +70,10 @@ enum
   ARG_PROTOCOL,                 /* Protocol to connect with */
 
   ARG_MOUNT,                    /* mountpoint of stream (icecast only) */
-  ARG_URL                       /* Url of stream (I'm guessing) */
+  ARG_URL,                      /* the stream's homepage URL */
+
+  ARG_TIMEOUT                   /* The max amount of time to wait for
+                                   network activity */
 };
 
 #define DEFAULT_IP           "127.0.0.1"
@@ -68,16 +87,17 @@ enum
 #define DEFAULT_MOUNT        ""
 #define DEFAULT_URL          ""
 #define DEFAULT_PROTOCOL     SHOUT2SEND_PROTOCOL_HTTP
+#define DEFAULT_TIMEOUT      10000
 
 #ifdef SHOUT_FORMAT_WEBM
-#define WEBM_CAPS "; video/webm"
+#define WEBM_CAPS "; video/webm; audio/webm"
 #else
 #define WEBM_CAPS ""
 #endif
 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
     GST_PAD_SINK,
     GST_PAD_ALWAYS,
-    GST_STATIC_CAPS ("application/ogg; "
+    GST_STATIC_CAPS ("application/ogg; audio/ogg; video/ogg; "
         "audio/mpeg, mpegversion = (int) 1, layer = (int) [ 1, 3 ]" WEBM_CAPS));
 
 static void gst_shout2send_finalize (GstShout2send * shout2send);
@@ -142,8 +162,9 @@ gst_shout2send_class_init (GstShout2sendClass * klass)
   gobject_class->get_property = gst_shout2send_get_property;
   gobject_class->finalize = (GObjectFinalizeFunc) gst_shout2send_finalize;
 
+  /* FIXME: 2.0 Should probably change this prop name to "server" */
   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_IP,
-      g_param_spec_string ("ip", "ip", "ip", DEFAULT_IP,
+      g_param_spec_string ("ip", "ip", "IP address or hostname", DEFAULT_IP,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
       g_param_spec_int ("port", "port", "port", 1, G_MAXUSHORT, DEFAULT_PORT,
@@ -187,7 +208,13 @@ gst_shout2send_class_init (GstShout2sendClass * klass)
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_URL,
-      g_param_spec_string ("url", "url", "url", DEFAULT_URL,
+      g_param_spec_string ("url", "url", "the stream's homepage URL",
+          DEFAULT_URL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT,
+      g_param_spec_uint ("timeout", "timeout",
+          "Max amount of time to wait for network activity, in milliseconds",
+          1, G_MAXUINT, DEFAULT_TIMEOUT,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   /* signals */
@@ -206,8 +233,7 @@ gst_shout2send_class_init (GstShout2sendClass * klass)
   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_shout2send_event);
   gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_shout2send_setcaps);
 
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&sink_template));
+  gst_element_class_add_static_pad_template (gstelement_class, &sink_template);
 
   gst_element_class_set_static_metadata (gstelement_class,
       "Icecast network sink",
@@ -224,7 +250,7 @@ gst_shout2send_init (GstShout2send * shout2send)
 {
   gst_base_sink_set_sync (GST_BASE_SINK (shout2send), FALSE);
 
-  shout2send->timer = gst_poll_new_timer ();
+  shout2send->timer = gst_poll_new (TRUE);
 
   shout2send->ip = g_strdup (DEFAULT_IP);
   shout2send->port = DEFAULT_PORT;
@@ -237,10 +263,11 @@ gst_shout2send_init (GstShout2send * shout2send)
   shout2send->url = g_strdup (DEFAULT_URL);
   shout2send->protocol = DEFAULT_PROTOCOL;
   shout2send->ispublic = DEFAULT_PUBLIC;
+  shout2send->timeout = DEFAULT_TIMEOUT;
 
+  shout2send->format = -1;
   shout2send->tags = gst_tag_list_new_empty ();
   shout2send->conn = NULL;
-  shout2send->audio_format = SHOUT_FORMAT_VORBIS;
   shout2send->connected = FALSE;
   shout2send->songmetadata = NULL;
   shout2send->songartist = NULL;
@@ -259,7 +286,7 @@ gst_shout2send_finalize (GstShout2send * shout2send)
   g_free (shout2send->mount);
   g_free (shout2send->url);
 
-  gst_tag_list_free (shout2send->tags);
+  gst_tag_list_unref (shout2send->tags);
 
   gst_poll_free (shout2send->timer);
 
@@ -348,7 +375,7 @@ gst_shout2send_set_metadata (GstShout2send * shout2send)
     shout_metadata_free (pmetadata);
   }
 
-  gst_tag_list_free (copy);
+  gst_tag_list_unref (copy);
 }
 #endif
 
@@ -365,8 +392,8 @@ gst_shout2send_event (GstBaseSink * sink, GstEvent * event)
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_TAG:{
-      /* vorbis audio doesnt need metadata setting on the icecast level, only mp3 */
-      if (shout2send->tags && shout2send->audio_format == SHOUT_FORMAT_MP3) {
+      /* vorbis audio doesn't need metadata setting on the icecast level, only mp3 */
+      if (shout2send->tags && shout2send->format == SHOUT_FORMAT_MP3) {
         GstTagList *list;
 
         gst_event_parse_tag (event, &list);
@@ -433,10 +460,8 @@ gst_shout2send_start (GstBaseSink * basesink)
   if (shout_set_protocol (sink->conn, proto) != SHOUTERR_SUCCESS)
     goto set_failed;
 
-  /* --- FIXME: shout requires an ip, and fails if it is given a host. */
-  /* may want to put convert_to_ip(shout2send->ip) here */
   cur_prop = "ip";
-  GST_DEBUG_OBJECT (sink, "setting ip: %s", sink->ip);
+  GST_DEBUG_OBJECT (sink, "setting IP/hostname: %s", sink->ip);
   if (shout_set_host (sink->conn, sink->ip) != SHOUTERR_SUCCESS)
     goto set_failed;
 
@@ -477,7 +502,7 @@ gst_shout2send_start (GstBaseSink * basesink)
     goto set_failed;
 
   cur_prop = "username";
-  GST_DEBUG_OBJECT (sink, "setting %s: %s", cur_prop, "source");
+  GST_DEBUG_OBJECT (sink, "setting %s: %s", cur_prop, sink->username);
   if (shout_set_user (sink->conn, sink->username) != SHOUTERR_SUCCESS)
     goto set_failed;
 
@@ -501,27 +526,56 @@ set_failed:
   }
 }
 
-static gboolean
+static GstFlowReturn
 gst_shout2send_connect (GstShout2send * sink)
 {
-  const char *format =
-      (sink->audio_format == SHOUT_FORMAT_VORBIS) ? "vorbis" :
-      ((sink->audio_format == SHOUT_FORMAT_MP3) ? "mp3" : "unknown");
-#ifdef SHOUT_FORMAT_WEBM
-  if (sink->audio_format == SHOUT_FORMAT_WEBM)
-    format = "webm";
-#endif
-  GST_DEBUG_OBJECT (sink, "Connection format is: %s", format);
+  GstFlowReturn fret = GST_FLOW_OK;
+  gint ret;
+  GstClockTime start_ts;
+
+  GST_DEBUG_OBJECT (sink, "Connection format is: %d", sink->format);
+
+  if (sink->format == -1)
+    goto no_caps;
+
+  if (shout_set_nonblocking (sink->conn, 1) != SHOUTERR_SUCCESS)
+    goto could_not_set_nonblocking;
 
-  if (shout_set_format (sink->conn, sink->audio_format) != SHOUTERR_SUCCESS)
+  if (shout_set_format (sink->conn, sink->format) != SHOUTERR_SUCCESS)
     goto could_not_set_format;
 
-  if (shout_open (sink->conn) != SHOUTERR_SUCCESS)
+  GST_DEBUG_OBJECT (sink, "connecting");
+
+  start_ts = gst_util_get_timestamp ();
+  ret = shout_open (sink->conn);
+
+  /* wait for connection or timeout */
+  while (ret == SHOUTERR_BUSY) {
+    if (gst_util_get_timestamp () - start_ts > sink->timeout * GST_MSECOND) {
+      goto connection_timeout;
+    }
+    if (gst_poll_wait (sink->timer, 10 * GST_MSECOND) == -1) {
+      GST_LOG_OBJECT (sink, "unlocked");
+
+      fret = gst_base_sink_wait_preroll (GST_BASE_SINK (sink));
+      if (fret != GST_FLOW_OK)
+        goto done;
+    }
+    ret = shout_get_connected (sink->conn);
+  }
+
+  if (ret != SHOUTERR_CONNECTED && ret != SHOUTERR_SUCCESS)
     goto could_not_connect;
 
   GST_DEBUG_OBJECT (sink, "connected to server");
   sink->connected = TRUE;
 
+  /* initialize sending rate monitoring */
+  sink->prev_queuelen = 0;
+  sink->data_sent = 0;
+  sink->stalled = TRUE;
+  sink->datasent_reset_ts = sink->stalled_ts = gst_util_get_timestamp ();
+
   /* let's set metadata */
   if (sink->songmetadata) {
     shout_metadata_t *pmetadata;
@@ -533,14 +587,30 @@ gst_shout2send_connect (GstShout2send * sink)
     shout_metadata_free (pmetadata);
   }
 
-  return TRUE;
+done:
+  return fret;
 
 /* ERRORS */
+no_caps:
+  {
+    GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL),
+        ("No input caps received."));
+    return GST_FLOW_NOT_NEGOTIATED;
+  }
+
+could_not_set_nonblocking:
+  {
+    GST_ELEMENT_ERROR (sink, LIBRARY, SETTINGS, (NULL),
+        ("Error configuring libshout to use non-blocking i/o: %s",
+            shout_get_error (sink->conn)));
+    return GST_FLOW_ERROR;
+  }
+
 could_not_set_format:
   {
     GST_ELEMENT_ERROR (sink, LIBRARY, SETTINGS, (NULL),
         ("Error setting connection format: %s", shout_get_error (sink->conn)));
-    return FALSE;
+    return GST_FLOW_ERROR;
   }
 
 could_not_connect:
@@ -550,7 +620,16 @@ could_not_connect:
         ("shout_open() failed: err=%s", shout_get_error (sink->conn)));
     g_signal_emit (sink, gst_shout2send_signals[SIGNAL_CONNECTION_PROBLEM], 0,
         shout_get_errno (sink->conn));
-    return FALSE;
+    return GST_FLOW_ERROR;
+  }
+
+connection_timeout:
+  {
+    GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE,
+        (_("Could not connect to server")), ("connection timed out"));
+    g_signal_emit (sink, gst_shout2send_signals[SIGNAL_CONNECTION_PROBLEM], 0,
+        shout_get_errno (sink->conn));
+    return GST_FLOW_ERROR;
   }
 }
 
@@ -572,6 +651,7 @@ gst_shout2send_stop (GstBaseSink * basesink)
   }
 
   sink->connected = FALSE;
+  sink->format = -1;
 
   return TRUE;
 }
@@ -608,16 +688,20 @@ gst_shout2send_render (GstBaseSink * basesink, GstBuffer * buf)
   GstShout2send *sink;
   glong ret;
   gint delay;
-  GstFlowReturn fret;
+  GstFlowReturn fret = GST_FLOW_OK;
   GstMapInfo map;
+  GstClockTime now;
+  ssize_t queuelen;
 
   sink = GST_SHOUT2SEND (basesink);
 
-  /* presumably we connect here because we need to know the format before
-   * we can set up the connection, which we don't know yet in _start() */
+  /* we connect here because we need to know the format before we can set up
+   * the connection, which we don't know yet in _start(), and also because we
+   * don't want to block the application thread */
   if (!sink->connected) {
-    if (!gst_shout2send_connect (sink))
-      return GST_FLOW_ERROR;
+    fret = gst_shout2send_connect (sink);
+    if (fret != GST_FLOW_OK)
+      goto done;
   }
 
   delay = shout_delay (sink->conn);
@@ -629,20 +713,63 @@ gst_shout2send_render (GstBaseSink * basesink, GstBuffer * buf)
 
       fret = gst_base_sink_wait_preroll (basesink);
       if (fret != GST_FLOW_OK)
-        return fret;
+        goto done;
     }
   } else {
     GST_LOG_OBJECT (sink, "we're %d msec late", -delay);
   }
 
+  /* accumulate how much data have actually been sent
+   * to the network since the last call to shout_send() */
+  queuelen = shout_queuelen (sink->conn);
+  if (sink->prev_queuelen > 0)
+    sink->data_sent += sink->prev_queuelen - queuelen;
+
   gst_buffer_map (buf, &map, GST_MAP_READ);
-  GST_LOG_OBJECT (sink, "sending %u bytes of data", (guint) map.size);
+
+  /* add map.size instead of re-reading the queue length because
+   * the data may actually be sent immediately */
+  sink->prev_queuelen = queuelen + map.size;
+
+  GST_LOG_OBJECT (sink, "sending %u bytes of data, queue length now is %"
+      G_GUINT64_FORMAT, (guint) map.size, sink->prev_queuelen);
+
   ret = shout_send (sink->conn, map.data, map.size);
+
   gst_buffer_unmap (buf, &map);
   if (ret != SHOUTERR_SUCCESS)
     goto send_error;
 
-  return GST_FLOW_OK;
+  now = gst_util_get_timestamp ();
+  if (now - sink->datasent_reset_ts >= 500 * GST_MSECOND) {
+    guint64 send_rate;
+
+    send_rate = gst_util_uint64_scale (sink->data_sent, GST_SECOND,
+        now - sink->datasent_reset_ts);
+
+    if (send_rate == 0 && !sink->stalled) {
+      sink->stalled = TRUE;
+      sink->stalled_ts = now;
+    } else if (send_rate > 0 && sink->stalled) {
+      sink->stalled = FALSE;
+    }
+
+    sink->data_sent = 0;
+    sink->datasent_reset_ts = now;
+
+    GST_DEBUG_OBJECT (sink, "sending rate is %" G_GUINT64_FORMAT " bps, "
+        "stalled %d, stalled_ts %" GST_TIME_FORMAT, send_rate, sink->stalled,
+        GST_TIME_ARGS (sink->stalled_ts));
+
+    if (sink->stalled && now - sink->stalled_ts >= sink->timeout * GST_MSECOND) {
+      GST_WARNING_OBJECT (sink, "network send queue is stalled for too long");
+      goto network_error;
+    }
+  }
+
+done:
+
+  return fret;
 
 /* ERRORS */
 send_error:
@@ -653,6 +780,15 @@ send_error:
         shout_get_errno (sink->conn));
     return GST_FLOW_ERROR;
   }
+
+network_error:
+  {
+    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
+        ("network timeout reached"));
+    g_signal_emit (sink, gst_shout2send_signals[SIGNAL_CONNECTION_PROBLEM], 0,
+        SHOUTERR_BUSY);
+    return GST_FLOW_ERROR;
+  }
 }
 
 static void
@@ -665,54 +801,49 @@ gst_shout2send_set_property (GObject * object, guint prop_id,
   switch (prop_id) {
 
     case ARG_IP:
-      if (shout2send->ip)
-        g_free (shout2send->ip);
+      g_free (shout2send->ip);
       shout2send->ip = g_strdup (g_value_get_string (value));
       break;
     case ARG_PORT:
       shout2send->port = g_value_get_int (value);
       break;
     case ARG_PASSWORD:
-      if (shout2send->password)
-        g_free (shout2send->password);
+      g_free (shout2send->password);
       shout2send->password = g_strdup (g_value_get_string (value));
       break;
     case ARG_USERNAME:
-      if (shout2send->username)
-        g_free (shout2send->username);
+      g_free (shout2send->username);
       shout2send->username = g_strdup (g_value_get_string (value));
       break;
     case ARG_PUBLIC:
       shout2send->ispublic = g_value_get_boolean (value);
       break;
     case ARG_STREAMNAME:       /* Name of the stream */
-      if (shout2send->streamname)
-        g_free (shout2send->streamname);
+      g_free (shout2send->streamname);
       shout2send->streamname = g_strdup (g_value_get_string (value));
       break;
     case ARG_DESCRIPTION:      /* Description of the stream */
-      if (shout2send->description)
-        g_free (shout2send->description);
+      g_free (shout2send->description);
       shout2send->description = g_strdup (g_value_get_string (value));
       break;
     case ARG_GENRE:            /* Genre of the stream */
-      if (shout2send->genre)
-        g_free (shout2send->genre);
+      g_free (shout2send->genre);
       shout2send->genre = g_strdup (g_value_get_string (value));
       break;
     case ARG_PROTOCOL:         /* protocol to connect with */
       shout2send->protocol = g_value_get_enum (value);
       break;
     case ARG_MOUNT:            /* mountpoint of stream (icecast only) */
-      if (shout2send->mount)
-        g_free (shout2send->mount);
+      g_free (shout2send->mount);
       shout2send->mount = g_strdup (g_value_get_string (value));
       break;
-    case ARG_URL:              /* Url of the stream (I'm guessing) */
-      if (shout2send->url)
-        g_free (shout2send->url);
+    case ARG_URL:              /* the stream's homepage URL */
+      g_free (shout2send->url);
       shout2send->url = g_strdup (g_value_get_string (value));
       break;
+    case ARG_TIMEOUT:
+      shout2send->timeout = g_value_get_uint (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -758,9 +889,12 @@ gst_shout2send_get_property (GObject * object, guint prop_id,
     case ARG_MOUNT:            /* mountpoint of stream (icecast only) */
       g_value_set_string (value, shout2send->mount);
       break;
-    case ARG_URL:              /* Url of stream (I'm guessing) */
+    case ARG_URL:              /* the stream's homepage URL */
       g_value_set_string (value, shout2send->url);
       break;
+    case ARG_TIMEOUT:
+      g_value_set_uint (value, shout2send->timeout);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -781,12 +915,12 @@ gst_shout2send_setcaps (GstBaseSink * basesink, GstCaps * caps)
   GST_DEBUG_OBJECT (shout2send, "mimetype of caps given is: %s", mimetype);
 
   if (!strcmp (mimetype, "audio/mpeg")) {
-    shout2send->audio_format = SHOUT_FORMAT_MP3;
-  } else if (!strcmp (mimetype, "application/ogg")) {
-    shout2send->audio_format = SHOUT_FORMAT_VORBIS;
+    shout2send->format = SHOUT_FORMAT_MP3;
+  } else if (g_str_has_suffix (mimetype, "/ogg")) {
+    shout2send->format = SHOUT_FORMAT_OGG;
 #ifdef SHOUT_FORMAT_WEBM
-  } else if (!strcmp (mimetype, "video/webm")) {
-    shout2send->audio_format = SHOUT_FORMAT_WEBM;
+  } else if (g_str_has_suffix (mimetype, "/webm")) {
+    shout2send->format = SHOUT_FORMAT_WEBM;
 #endif
   } else {
     ret = FALSE;
@@ -799,7 +933,6 @@ static gboolean
 plugin_init (GstPlugin * plugin)
 {
 #ifdef ENABLE_NLS
-  setlocale (LC_ALL, "");
   bindtextdomain (GETTEXT_PACKAGE, LOCALEDIR);
   bind_textdomain_codeset (GETTEXT_PACKAGE, "UTF-8");
 #endif /* ENABLE_NLS */
@@ -810,7 +943,6 @@ plugin_init (GstPlugin * plugin)
 
 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
     GST_VERSION_MINOR,
-    shout2send,
+    shout2,
     "Sends data to an icecast server using libshout2",
-    plugin_init,
-    VERSION, "LGPL", "libshout2", "http://www.icecast.org/download.html")
+    plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)