curl: refactor curlsink, rename to curlhttpsink
authorPatricia Muscalu <patricia@axis.com>
Mon, 23 Jan 2012 08:00:47 +0000 (09:00 +0100)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Sat, 12 May 2012 10:48:03 +0000 (11:48 +0100)
Split into base, tls and http classes.

https://bugzilla.gnome.org/show_bug.cgi?id=653741

13 files changed:
configure.ac
docs/plugins/Makefile.am
ext/curl/Makefile.am
ext/curl/gstcurl.c
ext/curl/gstcurlbasesink.c [moved from ext/curl/gstcurlsink.c with 52% similarity]
ext/curl/gstcurlbasesink.h [new file with mode: 0644]
ext/curl/gstcurlhttpsink.c [new file with mode: 0644]
ext/curl/gstcurlhttpsink.h [new file with mode: 0644]
ext/curl/gstcurlsink.h [deleted file]
ext/curl/gstcurltlssink.c [new file with mode: 0644]
ext/curl/gstcurltlssink.h [new file with mode: 0644]
tests/check/Makefile.am
tests/check/elements/curlhttpsink.c [new file with mode: 0644]

index 3988134..75fe665 100644 (file)
@@ -310,7 +310,7 @@ GST_PLUGINS_NONPORTED=" aiff \
  sdi siren speed subenc stereo tta videofilters \
  videomeasure videosignal vmnc \
  decklink fbdev linsys vcd \
- apexsink cdaudio cog curl dc1394 dirac directfb resindvd \
+ apexsink cdaudio cog dc1394 dirac directfb resindvd \
  gsettings jp2k ladspa mimic \
  musepack musicbrainz nas neon ofa openal opencv rsvg sdl sndfile soundtouch spandsp spc timidity \
  directsound direct3d directdraw direct3d9 acm wininet \
index c34f435..8f10bfc 100644 (file)
@@ -61,11 +61,17 @@ IGNORE_CFILES =
 EXAMPLE_CFILES = \
         $(top_srcdir)/ext/directfb/dfb-example.c
 
+#      $(top_srcdir)/ext/curl/gstcurlfilesink.h
+#      $(top_srcdir)/ext/curl/gstcurlftpsink.h
+#      $(top_srcdir)/ext/curl/gstcurlsmtpsink.h
+
 EXTRA_HFILES = \
        $(top_srcdir)/ext/assrender/gstassrender.h \
        $(top_srcdir)/ext/celt/gstceltdec.h \
        $(top_srcdir)/ext/celt/gstceltenc.h \
-       $(top_srcdir)/ext/curl/gstcurlsink.h \
+       $(top_srcdir)/ext/curl/gstcurlbasesink.h \
+       $(top_srcdir)/ext/curl/gstcurlhttpsink.h \
+       $(top_srcdir)/ext/curl/gstcurltlssink.h \
        $(top_srcdir)/ext/dc1394/gstdc1394.h \
        $(top_srcdir)/ext/directfb/dfbvideosink.h \
        $(top_srcdir)/ext/dts/gstdtsdec.h \
index 42fbe31..6a0a156 100644 (file)
@@ -1,16 +1,22 @@
 plugin_LTLIBRARIES = libgstcurl.la
 
-libgstcurl_la_SOURCES = gstcurl.c gstcurlsink.c
+libgstcurl_la_SOURCES = gstcurl.c \
+                       gstcurlbasesink.c \
+                       gstcurltlssink.c \
+                       gstcurlhttpsink.c
 libgstcurl_la_CFLAGS = \
        $(GST_PLUGINS_BAD_CFLAGS) \
        $(GST_BASE_CFLAGS) \
        $(GST_CFLAGS) \
        $(CURL_CFLAGS)
 libgstcurl_la_LIBADD = \
+       $(GST_PLUGINS_BASE_LIBS) \
        $(GST_BASE_LIBS) \
        $(GST_LIBS) \
        $(CURL_LIBS)
 libgstcurl_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
 libgstcurl_la_LIBTOOLFLAGS = --tag=disable-static
 
-noinst_HEADERS = gstcurlsink.h
+noinst_HEADERS = gstcurlbasesink.h \
+                gstcurltlssink.h \
+                gstcurlhttpsink.h
index 598dd2d..c1dc8f4 100644 (file)
 #include <config.h>
 #endif
 
-#include "gstcurlsink.h"
+#include "gstcurlbasesink.h"
+#include "gstcurltlssink.h"
+#include "gstcurlhttpsink.h"
 
 static gboolean
 plugin_init (GstPlugin * plugin)
 {
 
-  if (!gst_element_register (plugin, "curlsink", GST_RANK_NONE,
-          GST_TYPE_CURL_SINK))
+  if (!gst_element_register (plugin, "curlhttpsink", GST_RANK_NONE,
+          GST_TYPE_CURL_HTTP_SINK))
     return FALSE;
 
   return TRUE;
similarity index 52%
rename from ext/curl/gstcurlsink.c
rename to ext/curl/gstcurlbasesink.c
index 2ed5b20..680be02 100644 (file)
@@ -28,7 +28,7 @@
  * <refsect2>
  * <title>Example launch line (upload a JPEG file to an HTTP server)</title>
  * |[
- * gst-launch filesrc filesrc location=image.jpg ! jpegparse ! curlsink  \
+ * gst-launch filesrc location=image.jpg ! jpegparse ! curlsink  \
  *     file-name=image.jpg  \
  *     location=http://192.168.0.1:8080/cgi-bin/patupload.cgi/  \
  *     user=test passwd=test  \
 #include <sys/stat.h>
 #include <fcntl.h>
 
-#include "gstcurlsink.h"
-
-#include "gst/glib-compat-private.h"
+#include "gstcurlbasesink.h"
 
 /* Default values */
-#define GST_CAT_DEFAULT                gst_curl_sink_debug
+#define GST_CAT_DEFAULT                gst_curl_base_sink_debug
 #define DEFAULT_URL                    "localhost:5555"
 #define DEFAULT_TIMEOUT                30
-#define DEFAULT_PROXY_PORT             3128
 #define DEFAULT_QOS_DSCP               0
-#define DEFAULT_ACCEPT_SELF_SIGNED     FALSE
-#define DEFAULT_USE_CONTENT_LENGTH     FALSE
 
 #define DSCP_MIN                       0
 #define DSCP_MAX                       63
-#define RESPONSE_100_CONTINUE          100
-#define RESPONSE_CONNECT_PROXY         200
+
 
 /* Plugin specific settings */
 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
@@ -79,7 +73,7 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
     GST_PAD_ALWAYS,
     GST_STATIC_CAPS_ANY);
 
-GST_DEBUG_CATEGORY_STATIC (gst_curl_sink_debug);
+GST_DEBUG_CATEGORY_STATIC (gst_curl_base_sink_debug);
 
 enum
 {
@@ -87,104 +81,96 @@ enum
   PROP_LOCATION,
   PROP_USER_NAME,
   PROP_USER_PASSWD,
-  PROP_PROXY,
-  PROP_PROXY_PORT,
-  PROP_PROXY_USER_NAME,
-  PROP_PROXY_USER_PASSWD,
   PROP_FILE_NAME,
   PROP_TIMEOUT,
-  PROP_QOS_DSCP,
-  PROP_ACCEPT_SELF_SIGNED,
-  PROP_USE_CONTENT_LENGTH,
-  PROP_CONTENT_TYPE
+  PROP_QOS_DSCP
 };
-static gboolean proxy_auth = FALSE;
-static gboolean proxy_conn_established = FALSE;
 
 /* Object class function declarations */
-static void gst_curl_sink_finalize (GObject * gobject);
-static void gst_curl_sink_set_property (GObject * object, guint prop_id,
+static void gst_curl_base_sink_finalize (GObject * gobject);
+static void gst_curl_base_sink_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
-static void gst_curl_sink_get_property (GObject * object, guint prop_id,
+static void gst_curl_base_sink_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec);
 
 /* BaseSink class function declarations */
-static GstFlowReturn gst_curl_sink_render (GstBaseSink * bsink,
+static GstFlowReturn gst_curl_base_sink_render (GstBaseSink * bsink,
     GstBuffer * buf);
-static gboolean gst_curl_sink_event (GstBaseSink * bsink, GstEvent * event);
-static gboolean gst_curl_sink_start (GstBaseSink * bsink);
-static gboolean gst_curl_sink_stop (GstBaseSink * bsink);
-static gboolean gst_curl_sink_unlock (GstBaseSink * bsink);
-static gboolean gst_curl_sink_unlock_stop (GstBaseSink * bsink);
+static gboolean gst_curl_base_sink_event (GstBaseSink * bsink,
+    GstEvent * event);
+static gboolean gst_curl_base_sink_start (GstBaseSink * bsink);
+static gboolean gst_curl_base_sink_stop (GstBaseSink * bsink);
+static gboolean gst_curl_base_sink_unlock (GstBaseSink * bsink);
+static gboolean gst_curl_base_sink_unlock_stop (GstBaseSink * bsink);
 
 /* private functions */
-static gboolean gst_curl_sink_transfer_setup_unlocked (GstCurlSink * sink);
-static gboolean gst_curl_sink_transfer_set_options_unlocked (GstCurlSink
-    * sink);
-static gboolean gst_curl_sink_transfer_start_unlocked (GstCurlSink * sink);
-static void gst_curl_sink_transfer_cleanup (GstCurlSink * sink);
-static size_t gst_curl_sink_transfer_read_cb (void *ptr, size_t size,
+
+static gboolean gst_curl_base_sink_transfer_setup_unlocked
+    (GstCurlBaseSink * sink);
+static gboolean gst_curl_base_sink_transfer_start_unlocked
+    (GstCurlBaseSink * sink);
+static void gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink);
+static size_t gst_curl_base_sink_transfer_read_cb (void *ptr, size_t size,
     size_t nmemb, void *stream);
-static size_t gst_curl_sink_transfer_write_cb (void *ptr, size_t size,
+static size_t gst_curl_base_sink_transfer_write_cb (void *ptr, size_t size,
     size_t nmemb, void *stream);
-static GstFlowReturn gst_curl_sink_handle_transfer (GstCurlSink * sink);
-static int gst_curl_sink_transfer_socket_cb (void *clientp,
+static size_t gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink,
+    void *curl_ptr, size_t block_size, guint * last_chunk);
+static int gst_curl_base_sink_transfer_socket_cb (void *clientp,
     curl_socket_t curlfd, curlsocktype purpose);
-static gpointer gst_curl_sink_transfer_thread_func (gpointer data);
-static CURLcode gst_curl_sink_transfer_check (GstCurlSink * sink);
-static gint gst_curl_sink_setup_dscp_unlocked (GstCurlSink * sink);
-
-static gboolean gst_curl_sink_wait_for_data_unlocked (GstCurlSink * sink);
-static void gst_curl_sink_new_file_notify_unlocked (GstCurlSink * sink);
-static void gst_curl_sink_transfer_thread_notify_unlocked (GstCurlSink * sink);
-static void gst_curl_sink_transfer_thread_close_unlocked (GstCurlSink * sink);
-static void gst_curl_sink_wait_for_transfer_thread_to_send_unlocked (GstCurlSink
-    * sink);
-static void gst_curl_sink_data_sent_notify_unlocked (GstCurlSink * sink);
+static gpointer gst_curl_base_sink_transfer_thread_func (gpointer data);
+static gint gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink);
+static CURLcode gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink);
+
+static gboolean gst_curl_base_sink_wait_for_data_unlocked
+    (GstCurlBaseSink * sink);
+static void gst_curl_base_sink_new_file_notify_unlocked
+    (GstCurlBaseSink * sink);
+static void gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked
+    (GstCurlBaseSink * sink);
+static void gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink);
+static void gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink);
+static void gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink);
+
+static void handle_transfer (GstCurlBaseSink * sink);
+static size_t transfer_data_buffer (void *curl_ptr, TransferBuffer * buf,
+    size_t max_bytes_to_send, guint * last_chunk);
+
+#define parent_class gst_curl_base_sink_parent_class
+G_DEFINE_TYPE (GstCurlBaseSink, gst_curl_base_sink, GST_TYPE_BASE_SINK);
 
 static void
-_do_init (GType type)
+gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass)
 {
-  GST_DEBUG_CATEGORY_INIT (gst_curl_sink_debug, "curlsink", 0,
-      "curl sink element");
-}
-
-GST_BOILERPLATE_FULL (GstCurlSink, gst_curl_sink, GstBaseSink,
-    GST_TYPE_BASE_SINK, _do_init);
+  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+  GstBaseSinkClass *gstbasesink_class = (GstBaseSinkClass *) klass;
+  GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
 
-static void
-gst_curl_sink_base_init (gpointer g_class)
-{
-  GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
+  GST_DEBUG_CATEGORY_INIT (gst_curl_base_sink_debug, "curlbasesink", 0,
+      "curl base sink element");
+  GST_DEBUG_OBJECT (klass, "class_init");
 
-  gst_element_class_add_pad_template (element_class,
-      gst_static_pad_template_get (&sinktemplate));
   gst_element_class_set_details_simple (element_class,
-      "Curl sink",
+      "Curl base sink",
       "Sink/Network",
       "Upload data over the network to a server using libcurl",
       "Patricia Muscalu <patricia@axis.com>");
-}
 
-static void
-gst_curl_sink_class_init (GstCurlSinkClass * klass)
-{
-  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
-  GstBaseSinkClass *gstbasesink_class = (GstBaseSinkClass *) klass;
-
-  GST_DEBUG_OBJECT (klass, "class_init");
-
-  gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_curl_sink_event);
-  gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_curl_sink_render);
-  gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_curl_sink_start);
-  gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_curl_sink_stop);
-  gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_sink_unlock);
+  gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_curl_base_sink_event);
+  gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_curl_base_sink_render);
+  gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_curl_base_sink_start);
+  gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_curl_base_sink_stop);
+  gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock);
   gstbasesink_class->unlock_stop =
-      GST_DEBUG_FUNCPTR (gst_curl_sink_unlock_stop);
-  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_sink_finalize);
+      GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock_stop);
+  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_base_sink_finalize);
+
+  gobject_class->set_property = gst_curl_base_sink_set_property;
+  gobject_class->get_property = gst_curl_base_sink_get_property;
 
-  gobject_class->set_property = gst_curl_sink_set_property;
-  gobject_class->get_property = gst_curl_sink_get_property;
+  klass->handle_transfer = handle_transfer;
+  klass->transfer_read_cb = gst_curl_base_sink_transfer_read_cb;
+  klass->transfer_data_buffer = gst_curl_base_sink_transfer_data_buffer;
 
   /* FIXME: check against souphttpsrc and use same names for same properties */
   g_object_class_install_property (gobject_class, PROP_LOCATION,
@@ -199,21 +185,6 @@ gst_curl_sink_class_init (GstCurlSinkClass * klass)
       g_param_spec_string ("passwd", "User password",
           "User password to use for server authentication", NULL,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_PROXY,
-      g_param_spec_string ("proxy", "Proxy", "HTTP proxy server URI", NULL,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_PROXY_PORT,
-      g_param_spec_int ("proxy-port", "Proxy port",
-          "HTTP proxy server port", 0, G_MAXINT, DEFAULT_PROXY_PORT,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_PROXY_USER_NAME,
-      g_param_spec_string ("proxy-user", "Proxy user name",
-          "Proxy user name to use for proxy authentication",
-          NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_PROXY_USER_PASSWD,
-      g_param_spec_string ("proxy-passwd", "Proxy user password",
-          "Proxy user password to use for proxy authentication",
-          NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_FILE_NAME,
       g_param_spec_string ("file-name", "Base file name",
           "The base file name for the uploaded images", NULL,
@@ -229,74 +200,48 @@ gst_curl_sink_class_init (GstCurlSinkClass * klass)
           "Quality of Service, differentiated services code point (0 default)",
           DSCP_MIN, DSCP_MAX, DEFAULT_QOS_DSCP,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_ACCEPT_SELF_SIGNED,
-      g_param_spec_boolean ("accept-self-signed",
-          "Accept self-signed certificates",
-          "Accept self-signed SSL/TLS certificates",
-          DEFAULT_ACCEPT_SELF_SIGNED,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_USE_CONTENT_LENGTH,
-      g_param_spec_boolean ("use-content-length", "Use content length header",
-          "Use the Content-Length HTTP header instead of "
-          "Transfer-Encoding header", DEFAULT_USE_CONTENT_LENGTH,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_CONTENT_TYPE,
-      g_param_spec_string ("content-type", "Content type",
-          "The mime type of the body of the request", NULL,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&sinktemplate));
 }
 
 static void
-gst_curl_sink_init (GstCurlSink * sink, GstCurlSinkClass * klass)
+gst_curl_base_sink_init (GstCurlBaseSink * sink)
 {
   sink->transfer_buf = g_malloc (sizeof (TransferBuffer));
   sink->transfer_cond = g_malloc (sizeof (TransferCondition));
   sink->transfer_cond->cond = g_cond_new ();
   sink->transfer_cond->data_sent = FALSE;
   sink->transfer_cond->data_available = FALSE;
+  sink->transfer_cond->wait_for_response = FALSE;
   sink->timeout = DEFAULT_TIMEOUT;
-  sink->proxy_port = DEFAULT_PROXY_PORT;
   sink->qos_dscp = DEFAULT_QOS_DSCP;
   sink->url = g_strdup (DEFAULT_URL);
-  sink->header_list = NULL;
-  sink->accept_self_signed = DEFAULT_ACCEPT_SELF_SIGNED;
-  sink->use_content_length = DEFAULT_USE_CONTENT_LENGTH;
   sink->transfer_thread_close = FALSE;
   sink->new_file = TRUE;
-  sink->proxy_headers_set = FALSE;
-  sink->content_type = NULL;
+  sink->flow_ret = GST_FLOW_OK;
+  sink->is_live = FALSE;
 }
 
 static void
-gst_curl_sink_finalize (GObject * gobject)
+gst_curl_base_sink_finalize (GObject * gobject)
 {
-  GstCurlSink *this = GST_CURL_SINK (gobject);
+  GstCurlBaseSink *this = GST_CURL_BASE_SINK (gobject);
 
   GST_DEBUG ("finalizing curlsink");
   if (this->transfer_thread != NULL) {
     g_thread_join (this->transfer_thread);
   }
 
-  gst_curl_sink_transfer_cleanup (this);
+  gst_curl_base_sink_transfer_cleanup (this);
   g_cond_free (this->transfer_cond->cond);
   g_free (this->transfer_cond);
-
   g_free (this->transfer_buf);
 
   g_free (this->url);
   g_free (this->user);
   g_free (this->passwd);
-  g_free (this->proxy);
-  g_free (this->proxy_user);
-  g_free (this->proxy_passwd);
   g_free (this->file_name);
-  g_free (this->content_type);
-
-  if (this->header_list) {
-    curl_slist_free_all (this->header_list);
-    this->header_list = NULL;
-  }
-
   if (this->fdset != NULL) {
     gst_poll_free (this->fdset);
     this->fdset = NULL;
@@ -304,30 +249,72 @@ gst_curl_sink_finalize (GObject * gobject)
   G_OBJECT_CLASS (parent_class)->finalize (gobject);
 }
 
+void
+gst_curl_base_sink_transfer_thread_notify_unlocked (GstCurlBaseSink * sink)
+{
+  GST_LOG ("more data to send");
+
+  sink->transfer_cond->data_available = TRUE;
+  sink->transfer_cond->data_sent = FALSE;
+  sink->transfer_cond->wait_for_response = TRUE;
+  g_cond_signal (sink->transfer_cond->cond);
+}
+
+void
+gst_curl_base_sink_transfer_thread_close (GstCurlBaseSink * sink)
+{
+  GST_OBJECT_LOCK (sink);
+  GST_LOG_OBJECT (sink, "setting transfer thread close flag");
+  sink->transfer_thread_close = TRUE;
+  g_cond_signal (sink->transfer_cond->cond);
+  GST_OBJECT_UNLOCK (sink);
+
+  if (sink->transfer_thread != NULL) {
+    GST_LOG_OBJECT (sink, "waiting for transfer thread to finish");
+    g_thread_join (sink->transfer_thread);
+    sink->transfer_thread = NULL;
+  }
+}
+
+void
+gst_curl_base_sink_set_live (GstCurlBaseSink * sink, gboolean live)
+{
+  g_return_if_fail (GST_IS_CURL_BASE_SINK (sink));
+
+  GST_OBJECT_LOCK (sink);
+  sink->is_live = live;
+  GST_OBJECT_UNLOCK (sink);
+}
+
+gboolean
+gst_curl_base_sink_is_live (GstCurlBaseSink * sink)
+{
+  gboolean result;
+
+  g_return_val_if_fail (GST_IS_CURL_BASE_SINK (sink), FALSE);
+
+  GST_OBJECT_LOCK (sink);
+  result = sink->is_live;
+  GST_OBJECT_UNLOCK (sink);
+
+  return result;
+}
+
 static GstFlowReturn
-gst_curl_sink_render (GstBaseSink * bsink, GstBuffer * buf)
+gst_curl_base_sink_render (GstBaseSink * bsink, GstBuffer * buf)
 {
-  GstCurlSink *sink = GST_CURL_SINK (bsink);
+  GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink);
+  GstMapInfo map;
   guint8 *data;
   size_t size;
   GstFlowReturn ret;
 
   GST_LOG ("enter render");
 
-  sink = GST_CURL_SINK (bsink);
-  data = GST_BUFFER_DATA (buf);
-  size = GST_BUFFER_SIZE (buf);
-
-  if (sink->content_type == NULL) {
-    GstCaps *caps;
-    GstStructure *structure;
-    const gchar *mime_type;
-
-    caps = buf->caps;
-    structure = gst_caps_get_structure (caps, 0);
-    mime_type = gst_structure_get_name (structure);
-    sink->content_type = g_strdup (mime_type);
-  }
+  sink = GST_CURL_BASE_SINK (bsink);
+  gst_buffer_map (buf, &map, GST_MAP_READ);
+  data = map.data;
+  size = map.size;
 
   GST_OBJECT_LOCK (sink);
 
@@ -341,7 +328,7 @@ gst_curl_sink_render (GstBaseSink * bsink, GstBuffer * buf)
 
   /* if there is no transfer thread created, lets create one */
   if (sink->transfer_thread == NULL) {
-    if (!gst_curl_sink_transfer_start_unlocked (sink)) {
+    if (!gst_curl_base_sink_transfer_start_unlocked (sink)) {
       sink->flow_ret = GST_FLOW_ERROR;
       goto done;
     }
@@ -351,16 +338,17 @@ gst_curl_sink_render (GstBaseSink * bsink, GstBuffer * buf)
   sink->transfer_buf->ptr = data;
   sink->transfer_buf->len = size;
   sink->transfer_buf->offset = 0;
-  gst_curl_sink_transfer_thread_notify_unlocked (sink);
+  gst_curl_base_sink_transfer_thread_notify_unlocked (sink);
 
   /* wait for the transfer thread to send the data. This will be notified
    * either when transfer is completed by the curl read callback or by
    * the thread function if an error has occured. */
-  gst_curl_sink_wait_for_transfer_thread_to_send_unlocked (sink);
+  gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked (sink);
 
 done:
   ret = sink->flow_ret;
   GST_OBJECT_UNLOCK (sink);
+  gst_buffer_unmap (buf, &map);
 
   GST_LOG ("exit render");
 
@@ -368,19 +356,22 @@ done:
 }
 
 static gboolean
-gst_curl_sink_event (GstBaseSink * bsink, GstEvent * event)
+gst_curl_base_sink_event (GstBaseSink * bsink, GstEvent * event)
 {
-  GstCurlSink *sink = GST_CURL_SINK (bsink);
+  GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink);
+  GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
 
   switch (event->type) {
     case GST_EVENT_EOS:
       GST_DEBUG_OBJECT (sink, "received EOS");
-      GST_OBJECT_LOCK (sink);
-      gst_curl_sink_transfer_thread_close_unlocked (sink);
-      GST_OBJECT_UNLOCK (sink);
-      if (sink->transfer_thread != NULL) {
-        g_thread_join (sink->transfer_thread);
-        sink->transfer_thread = NULL;
+      gst_curl_base_sink_transfer_thread_close (sink);
+      gst_curl_base_sink_wait_for_response (sink);
+      break;
+    case GST_EVENT_CAPS:
+      if (klass->set_mime_type) {
+        GstCaps *caps;
+        gst_event_parse_caps (event, &caps);
+        klass->set_mime_type (sink, caps);
       }
       break;
     default:
@@ -390,11 +381,19 @@ gst_curl_sink_event (GstBaseSink * bsink, GstEvent * event)
 }
 
 static gboolean
-gst_curl_sink_start (GstBaseSink * bsink)
+gst_curl_base_sink_start (GstBaseSink * bsink)
 {
-  GstCurlSink *sink;
+  GstCurlBaseSink *sink;
+
+  sink = GST_CURL_BASE_SINK (bsink);
 
-  sink = GST_CURL_SINK (bsink);
+  /* reset flags */
+  sink->transfer_cond->data_sent = FALSE;
+  sink->transfer_cond->data_available = FALSE;
+  sink->transfer_cond->wait_for_response = FALSE;
+  sink->transfer_thread_close = FALSE;
+  sink->new_file = TRUE;
+  sink->flow_ret = GST_FLOW_OK;
 
   if ((sink->fdset = gst_poll_new (TRUE)) == NULL) {
     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE,
@@ -406,13 +405,11 @@ gst_curl_sink_start (GstBaseSink * bsink)
 }
 
 static gboolean
-gst_curl_sink_stop (GstBaseSink * bsink)
+gst_curl_base_sink_stop (GstBaseSink * bsink)
 {
-  GstCurlSink *sink = GST_CURL_SINK (bsink);
+  GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink);
 
-  GST_OBJECT_LOCK (sink);
-  gst_curl_sink_transfer_thread_close_unlocked (sink);
-  GST_OBJECT_UNLOCK (sink);
+  gst_curl_base_sink_transfer_thread_close (sink);
   if (sink->fdset != NULL) {
     gst_poll_free (sink->fdset);
     sink->fdset = NULL;
@@ -422,11 +419,11 @@ gst_curl_sink_stop (GstBaseSink * bsink)
 }
 
 static gboolean
-gst_curl_sink_unlock (GstBaseSink * bsink)
+gst_curl_base_sink_unlock (GstBaseSink * bsink)
 {
-  GstCurlSink *sink;
+  GstCurlBaseSink *sink;
 
-  sink = GST_CURL_SINK (bsink);
+  sink = GST_CURL_BASE_SINK (bsink);
 
   GST_LOG_OBJECT (sink, "Flushing");
   gst_poll_set_flushing (sink->fdset, TRUE);
@@ -435,11 +432,11 @@ gst_curl_sink_unlock (GstBaseSink * bsink)
 }
 
 static gboolean
-gst_curl_sink_unlock_stop (GstBaseSink * bsink)
+gst_curl_base_sink_unlock_stop (GstBaseSink * bsink)
 {
-  GstCurlSink *sink;
+  GstCurlBaseSink *sink;
 
-  sink = GST_CURL_SINK (bsink);
+  sink = GST_CURL_BASE_SINK (bsink);
 
   GST_LOG_OBJECT (sink, "No longer flushing");
   gst_poll_set_flushing (sink->fdset, FALSE);
@@ -448,14 +445,14 @@ gst_curl_sink_unlock_stop (GstBaseSink * bsink)
 }
 
 static void
-gst_curl_sink_set_property (GObject * object, guint prop_id,
+gst_curl_base_sink_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec)
 {
-  GstCurlSink *sink;
+  GstCurlBaseSink *sink;
   GstState cur_state;
 
-  g_return_if_fail (GST_IS_CURL_SINK (object));
-  sink = GST_CURL_SINK (object);
+  g_return_if_fail (GST_IS_CURL_BASE_SINK (object));
+  sink = GST_CURL_BASE_SINK (object);
 
   gst_element_get_state (GST_ELEMENT (sink), &cur_state, NULL, 0);
   if (cur_state != GST_STATE_PLAYING && cur_state != GST_STATE_PAUSED) {
@@ -477,25 +474,6 @@ gst_curl_sink_set_property (GObject * object, guint prop_id,
         sink->passwd = g_value_dup_string (value);
         GST_DEBUG_OBJECT (sink, "passwd set to %s", sink->passwd);
         break;
-      case PROP_PROXY:
-        g_free (sink->proxy);
-        sink->proxy = g_value_dup_string (value);
-        GST_DEBUG_OBJECT (sink, "proxy set to %s", sink->proxy);
-        break;
-      case PROP_PROXY_PORT:
-        sink->proxy_port = g_value_get_int (value);
-        GST_DEBUG_OBJECT (sink, "proxy port set to %d", sink->proxy_port);
-        break;
-      case PROP_PROXY_USER_NAME:
-        g_free (sink->proxy_user);
-        sink->proxy_user = g_value_dup_string (value);
-        GST_DEBUG_OBJECT (sink, "proxy user set to %s", sink->proxy_user);
-        break;
-      case PROP_PROXY_USER_PASSWD:
-        g_free (sink->proxy_passwd);
-        sink->proxy_passwd = g_value_dup_string (value);
-        GST_DEBUG_OBJECT (sink, "proxy password set to %s", sink->proxy_passwd);
-        break;
       case PROP_FILE_NAME:
         g_free (sink->file_name);
         sink->file_name = g_value_dup_string (value);
@@ -507,24 +485,9 @@ gst_curl_sink_set_property (GObject * object, guint prop_id,
         break;
       case PROP_QOS_DSCP:
         sink->qos_dscp = g_value_get_int (value);
-        gst_curl_sink_setup_dscp_unlocked (sink);
+        gst_curl_base_sink_setup_dscp_unlocked (sink);
         GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
         break;
-      case PROP_ACCEPT_SELF_SIGNED:
-        sink->accept_self_signed = g_value_get_boolean (value);
-        GST_DEBUG_OBJECT (sink, "accept_self_signed set to %d",
-            sink->accept_self_signed);
-        break;
-      case PROP_USE_CONTENT_LENGTH:
-        sink->use_content_length = g_value_get_boolean (value);
-        GST_DEBUG_OBJECT (sink, "use_content_length set to %d",
-            sink->use_content_length);
-        break;
-      case PROP_CONTENT_TYPE:
-        g_free (sink->content_type);
-        sink->content_type = g_value_dup_string (value);
-        GST_DEBUG_OBJECT (sink, "content type set to %s", sink->content_type);
-        break;
       default:
         GST_DEBUG_OBJECT (sink, "invalid property id %d", prop_id);
         break;
@@ -543,7 +506,7 @@ gst_curl_sink_set_property (GObject * object, guint prop_id,
       g_free (sink->file_name);
       sink->file_name = g_value_dup_string (value);
       GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
-      gst_curl_sink_new_file_notify_unlocked (sink);
+      gst_curl_base_sink_new_file_notify_unlocked (sink);
       break;
     case PROP_TIMEOUT:
       sink->timeout = g_value_get_int (value);
@@ -551,14 +514,9 @@ gst_curl_sink_set_property (GObject * object, guint prop_id,
       break;
     case PROP_QOS_DSCP:
       sink->qos_dscp = g_value_get_int (value);
-      gst_curl_sink_setup_dscp_unlocked (sink);
+      gst_curl_base_sink_setup_dscp_unlocked (sink);
       GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
       break;
-    case PROP_CONTENT_TYPE:
-      g_free (sink->content_type);
-      sink->content_type = g_value_dup_string (value);
-      GST_DEBUG_OBJECT (sink, "content type set to %s", sink->content_type);
-      break;
     default:
       GST_WARNING_OBJECT (sink, "cannot set property when PLAYING");
       break;
@@ -568,13 +526,13 @@ gst_curl_sink_set_property (GObject * object, guint prop_id,
 }
 
 static void
-gst_curl_sink_get_property (GObject * object, guint prop_id,
+gst_curl_base_sink_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec)
 {
-  GstCurlSink *sink;
+  GstCurlBaseSink *sink;
 
-  g_return_if_fail (GST_IS_CURL_SINK (object));
-  sink = GST_CURL_SINK (object);
+  g_return_if_fail (GST_IS_CURL_BASE_SINK (object));
+  sink = GST_CURL_BASE_SINK (object);
 
   switch (prop_id) {
     case PROP_LOCATION:
@@ -586,18 +544,6 @@ gst_curl_sink_get_property (GObject * object, guint prop_id,
     case PROP_USER_PASSWD:
       g_value_set_string (value, sink->passwd);
       break;
-    case PROP_PROXY:
-      g_value_set_string (value, sink->proxy);
-      break;
-    case PROP_PROXY_PORT:
-      g_value_set_int (value, sink->proxy_port);
-      break;
-    case PROP_PROXY_USER_NAME:
-      g_value_set_string (value, sink->proxy_user);
-      break;
-    case PROP_PROXY_USER_PASSWD:
-      g_value_set_string (value, sink->proxy_passwd);
-      break;
     case PROP_FILE_NAME:
       g_value_set_string (value, sink->file_name);
       break;
@@ -607,197 +553,184 @@ gst_curl_sink_get_property (GObject * object, guint prop_id,
     case PROP_QOS_DSCP:
       g_value_set_int (value, sink->qos_dscp);
       break;
-    case PROP_ACCEPT_SELF_SIGNED:
-      g_value_set_boolean (value, sink->accept_self_signed);
-      break;
-    case PROP_USE_CONTENT_LENGTH:
-      g_value_set_boolean (value, sink->use_content_length);
-      break;
-    case PROP_CONTENT_TYPE:
-      g_value_set_string (value, sink->content_type);
-      break;
     default:
       GST_DEBUG_OBJECT (sink, "invalid property id");
       break;
   }
 }
 
-static void
-gst_curl_sink_set_http_header_unlocked (GstCurlSink * sink)
-{
-  gchar *tmp;
-
-  if (sink->header_list) {
-    curl_slist_free_all (sink->header_list);
-    sink->header_list = NULL;
-  }
-
-  if (proxy_auth && !sink->proxy_headers_set && !proxy_conn_established) {
-    sink->header_list =
-        curl_slist_append (sink->header_list, "Content-Length: 0");
-    sink->proxy_headers_set = TRUE;
-    goto set_headers;
-  }
-  if (sink->use_content_length) {
-    /* if content length is used we assume that every buffer is one
-     * entire file, which is the case when uploading several jpegs */
-    tmp = g_strdup_printf ("Content-Length: %d", (int) sink->transfer_buf->len);
-    sink->header_list = curl_slist_append (sink->header_list, tmp);
-    g_free (tmp);
-  } else {
-    /* when sending a POST request to a HTTP 1.1 server, you can send data
-     * without knowing the size before starting the POST if you use chunked
-     * encoding */
-    sink->header_list = curl_slist_append (sink->header_list,
-        "Transfer-Encoding: chunked");
-  }
-
-  tmp = g_strdup_printf ("Content-Type: %s", sink->content_type);
-  sink->header_list = curl_slist_append (sink->header_list, tmp);
-  g_free (tmp);
-
-set_headers:
-
-  tmp = g_strdup_printf ("Content-Disposition: attachment; filename="
-      "\"%s\"", sink->file_name);
-  sink->header_list = curl_slist_append (sink->header_list, tmp);
-  g_free (tmp);
-  curl_easy_setopt (sink->curl, CURLOPT_HTTPHEADER, sink->header_list);
-}
-
 static gboolean
-gst_curl_sink_transfer_set_options_unlocked (GstCurlSink * sink)
+gst_curl_base_sink_transfer_set_common_options_unlocked (GstCurlBaseSink * sink)
 {
+  GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
+
 #ifdef DEBUG
   curl_easy_setopt (sink->curl, CURLOPT_VERBOSE, 1);
 #endif
 
   curl_easy_setopt (sink->curl, CURLOPT_URL, sink->url);
+
   curl_easy_setopt (sink->curl, CURLOPT_CONNECTTIMEOUT, sink->timeout);
 
+  /* using signals in a multithreaded application is dangeous */
+  curl_easy_setopt (sink->curl, CURLOPT_NOSIGNAL, 1);
+
+  /* socket settings */
   curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTDATA, sink);
   curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTFUNCTION,
-      gst_curl_sink_transfer_socket_cb);
+      gst_curl_base_sink_transfer_socket_cb);
 
+  curl_easy_setopt (sink->curl, CURLOPT_READFUNCTION, klass->transfer_read_cb);
+  curl_easy_setopt (sink->curl, CURLOPT_READDATA, sink);
+  curl_easy_setopt (sink->curl, CURLOPT_WRITEFUNCTION,
+      gst_curl_base_sink_transfer_write_cb);
+  curl_easy_setopt (sink->curl, CURLOPT_WRITEDATA, sink);
+
+  return TRUE;
+}
+
+static gboolean
+gst_curl_base_sink_transfer_set_options_unlocked (GstCurlBaseSink * sink)
+{
+  gboolean res = FALSE;
+  GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
+
+  gst_curl_base_sink_transfer_set_common_options_unlocked (sink);
+
+  /* authentication settings */
   if (sink->user != NULL && strlen (sink->user)) {
     curl_easy_setopt (sink->curl, CURLOPT_USERNAME, sink->user);
     curl_easy_setopt (sink->curl, CURLOPT_PASSWORD, sink->passwd);
-    curl_easy_setopt (sink->curl, CURLOPT_HTTPAUTH, CURLAUTH_ANY);
   }
 
-  if (sink->accept_self_signed && g_str_has_prefix (sink->url, "https")) {
-    /* TODO verify the authenticity of the peer's certificate */
-    curl_easy_setopt (sink->curl, CURLOPT_SSL_VERIFYPEER, 0L);
-    /* TODO check the servers's claimed identity */
-    curl_easy_setopt (sink->curl, CURLOPT_SSL_VERIFYHOST, 0L);
+  if (klass->set_options_unlocked) {
+    res = klass->set_options_unlocked (sink);
   }
 
-  /* proxy settings */
-  if (sink->proxy != NULL && strlen (sink->proxy)) {
-    if (curl_easy_setopt (sink->curl, CURLOPT_PROXY, sink->proxy)
-        != CURLE_OK) {
-      return FALSE;
-    }
-    if (curl_easy_setopt (sink->curl, CURLOPT_PROXYPORT, sink->proxy_port)
-        != CURLE_OK) {
-      return FALSE;
-    }
-    if (sink->proxy_user != NULL &&
-        strlen (sink->proxy_user) &&
-        sink->proxy_passwd != NULL && strlen (sink->proxy_passwd)) {
-      curl_easy_setopt (sink->curl, CURLOPT_PROXYUSERNAME, sink->proxy_user);
-      curl_easy_setopt (sink->curl, CURLOPT_PROXYPASSWORD, sink->proxy_passwd);
-      curl_easy_setopt (sink->curl, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
-      proxy_auth = TRUE;
-    }
-    /* tunnel all operations through a given HTTP proxy */
-    if (curl_easy_setopt (sink->curl, CURLOPT_HTTPPROXYTUNNEL, 1L)
-        != CURLE_OK) {
-      return FALSE;
-    }
-  }
+  return res;
+}
 
-  /* POST options */
-  curl_easy_setopt (sink->curl, CURLOPT_POST, 1L);
+static size_t
+transfer_data_buffer (void *curl_ptr, TransferBuffer * buf,
+    size_t max_bytes_to_send, guint * last_chunk)
+{
+  guint buf_len = buf->len;
+  size_t bytes_to_send = MIN (max_bytes_to_send, buf->len);
+
+  memcpy ((guint8 *) curl_ptr, buf->ptr + buf->offset, bytes_to_send);
+  buf->offset = buf->offset + bytes_to_send;
+  buf->len = buf->len - bytes_to_send;
+
+  /* the last data chunk */
+  if (bytes_to_send == buf_len) {
+    buf->offset = 0;
+    buf->len = 0;
+    *last_chunk = 1;
+  }
 
-  curl_easy_setopt (sink->curl, CURLOPT_READFUNCTION,
-      gst_curl_sink_transfer_read_cb);
-  curl_easy_setopt (sink->curl, CURLOPT_READDATA, sink);
-  curl_easy_setopt (sink->curl, CURLOPT_WRITEFUNCTION,
-      gst_curl_sink_transfer_write_cb);
+  GST_LOG ("sent : %" G_GSIZE_FORMAT, bytes_to_send);
 
-  return TRUE;
+  return bytes_to_send;
 }
 
 static size_t
-gst_curl_sink_transfer_read_cb (void *curl_ptr, size_t size, size_t nmemb,
-    void *stream)
+gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink,
+    void *curl_ptr, size_t block_size, guint * last_chunk)
 {
-  GstCurlSink *sink;
   TransferBuffer *buffer;
-  size_t max_bytes_to_send;
-  guint buf_len;
+  size_t bytes_to_send;
 
-  sink = (GstCurlSink *) stream;
+  buffer = sink->transfer_buf;
+  GST_LOG ("write buf len=%" G_GSIZE_FORMAT ", offset=%" G_GSIZE_FORMAT,
+      buffer->len, buffer->offset);
+
+  if (buffer->len <= 0) {
+    GST_WARNING ("got zero- or negative-length buffer");
 
-  /* wait for data to come available, if new file or thread close is set
-   * then zero will be returned to indicate end of current transfer */
-  GST_OBJECT_LOCK (sink);
-  if (gst_curl_sink_wait_for_data_unlocked (sink) == FALSE) {
-    GST_LOG ("returning 0, no more data to send in this file");
-    GST_OBJECT_UNLOCK (sink);
     return 0;
   }
-  GST_OBJECT_UNLOCK (sink);
 
+  /* more data in buffer(s) */
+  bytes_to_send = transfer_data_buffer (curl_ptr, sink->transfer_buf,
+      block_size, last_chunk);
 
-  max_bytes_to_send = size * nmemb;
-  buffer = sink->transfer_buf;
+  return bytes_to_send;
+}
 
-  buf_len = buffer->len;
-  GST_LOG ("write buf len=%" G_GSIZE_FORMAT ", offset=%" G_GSIZE_FORMAT,
-      buffer->len, buffer->offset);
+static size_t
+gst_curl_base_sink_transfer_read_cb (void *curl_ptr, size_t size, size_t nmemb,
+    void *stream)
+{
+  GstCurlBaseSink *sink;
+  GstCurlBaseSinkClass *klass;
+  size_t max_bytes_to_send;
+  size_t bytes_to_send;
+  guint last_chunk = 0;
 
-  /* more data in buffer */
-  if (buffer->len > 0) {
-    size_t bytes_to_send = MIN (max_bytes_to_send, buf_len);
+  sink = (GstCurlBaseSink *) stream;
+  klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
 
-    memcpy ((guint8 *) curl_ptr, buffer->ptr + buffer->offset, bytes_to_send);
+  max_bytes_to_send = size * nmemb;
 
-    buffer->offset = buffer->offset + bytes_to_send;
-    buffer->len = buffer->len - bytes_to_send;
+  /* wait for data to come available, if new file or thread close is set
+   * then zero will be returned to indicate end of current transfer */
+  GST_OBJECT_LOCK (sink);
+  if (gst_curl_base_sink_wait_for_data_unlocked (sink) == FALSE) {
+    if (klass->flush_data_unlocked) {
+      bytes_to_send = klass->flush_data_unlocked (sink, curl_ptr,
+          max_bytes_to_send, sink->new_file);
 
-    /* the last data chunk */
-    if (bytes_to_send == buf_len) {
-      buffer->ptr = NULL;
-      buffer->offset = 0;
-      buffer->len = 0;
-      GST_OBJECT_LOCK (sink);
-      gst_curl_sink_data_sent_notify_unlocked (sink);
       GST_OBJECT_UNLOCK (sink);
+
+      return bytes_to_send;
     }
 
-    GST_LOG ("sent : %" G_GSIZE_FORMAT, bytes_to_send);
+    GST_OBJECT_UNLOCK (sink);
+    GST_LOG ("returning 0, no more data to send in this file");
 
-    return bytes_to_send;
-  } else {
-    GST_WARNING ("got zero-length buffer");
     return 0;
   }
+
+  GST_OBJECT_UNLOCK (sink);
+
+  bytes_to_send = klass->transfer_data_buffer (sink, curl_ptr,
+      max_bytes_to_send, &last_chunk);
+
+  /* the last data chunk */
+  if (last_chunk) {
+    gst_curl_base_sink_data_sent_notify (sink);
+  }
+
+  return bytes_to_send;
 }
 
 static size_t
-gst_curl_sink_transfer_write_cb (void G_GNUC_UNUSED * ptr, size_t size,
+gst_curl_base_sink_transfer_write_cb (void G_GNUC_UNUSED * ptr, size_t size,
     size_t nmemb, void G_GNUC_UNUSED * stream)
 {
+  GstCurlBaseSink *sink;
+  GstCurlBaseSinkClass *klass;
   size_t realsize = size * nmemb;
 
+  sink = (GstCurlBaseSink *) stream;
+  klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
+
+  if (klass->transfer_verify_response_code) {
+    if (!klass->transfer_verify_response_code (sink)) {
+      GST_DEBUG_OBJECT (sink, "response error");
+      GST_OBJECT_LOCK (sink);
+      sink->flow_ret = GST_FLOW_ERROR;
+      GST_OBJECT_UNLOCK (sink);
+    }
+  }
+
   GST_DEBUG ("response %s", (gchar *) ptr);
+
   return realsize;
 }
 
-static CURLcode
-gst_curl_sink_transfer_check (GstCurlSink * sink)
+CURLcode
+gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink)
 {
   CURLcode code = CURLE_OK;
   CURL *easy;
@@ -824,16 +757,16 @@ gst_curl_sink_transfer_check (GstCurlSink * sink)
   return code;
 }
 
-static GstFlowReturn
-gst_curl_sink_handle_transfer (GstCurlSink * sink)
+static void
+handle_transfer (GstCurlBaseSink * sink)
 {
+  GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
   gint retval;
+  gint activated_fds;
   gint running_handles;
   gint timeout;
   CURLMcode m_code;
   CURLcode e_code;
-  glong resp = -1;
-  glong resp_proxy = -1;
 
   GST_OBJECT_LOCK (sink);
   timeout = sink->timeout;
@@ -847,137 +780,78 @@ gst_curl_sink_handle_transfer (GstCurlSink * sink)
   } while (m_code == CURLM_CALL_MULTI_PERFORM);
 
   while (running_handles && (m_code == CURLM_OK)) {
-    if (!proxy_conn_established && (resp_proxy != RESPONSE_CONNECT_PROXY)
-        && proxy_auth) {
-      curl_easy_getinfo (sink->curl, CURLINFO_HTTP_CONNECTCODE, &resp_proxy);
-      if (resp_proxy == RESPONSE_CONNECT_PROXY) {
-        GST_LOG ("received HTTP/1.0 200 Connection Established");
-        /* Workaround: redefine HTTP headers before connecting to HTTP server.
-         * When talking to proxy, the Content-Length: 0 is send with the request.
-         */
-        curl_multi_remove_handle (sink->multi_handle, sink->curl);
-        gst_curl_sink_set_http_header_unlocked (sink);
-        curl_multi_add_handle (sink->multi_handle, sink->curl);
-        proxy_conn_established = TRUE;
-      }
+    if (klass->transfer_prepare_poll_wait) {
+      klass->transfer_prepare_poll_wait (sink);
     }
 
-    retval = gst_poll_wait (sink->fdset, timeout * GST_SECOND);
-    if (G_UNLIKELY (retval == -1)) {
+    activated_fds = gst_poll_wait (sink->fdset, timeout * GST_SECOND);
+    if (G_UNLIKELY (activated_fds == -1)) {
       if (errno == EAGAIN || errno == EINTR) {
         GST_DEBUG_OBJECT (sink, "interrupted by signal");
       } else if (errno == EBUSY) {
-        goto poll_stopped;
+        GST_DEBUG_OBJECT (sink, "poll stopped");
+        retval = GST_FLOW_EOS;
+        goto fail;
       } else {
-        goto poll_error;
+        GST_DEBUG_OBJECT (sink, "poll failed: %s", g_strerror (errno));
+        GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("poll failed"), (NULL));
+        retval = GST_FLOW_ERROR;
+        goto fail;
       }
-    } else if (G_UNLIKELY (retval == 0)) {
-      GST_DEBUG ("timeout");
-      goto poll_timeout;
+    } else if (G_UNLIKELY (activated_fds == 0)) {
+      GST_DEBUG_OBJECT (sink, "poll timed out");
+      GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("poll timed out"), (NULL));
+      retval = GST_FLOW_ERROR;
+      goto fail;
     }
 
     /* readable/writable sockets */
     do {
       m_code = curl_multi_perform (sink->multi_handle, &running_handles);
     } while (m_code == CURLM_CALL_MULTI_PERFORM);
-
-    if (resp != RESPONSE_100_CONTINUE) {
-      curl_easy_getinfo (sink->curl, CURLINFO_RESPONSE_CODE, &resp);
-    }
-  }
-
-  if (resp != RESPONSE_100_CONTINUE) {
-    /* No 100 Continue response received. Using POST with HTTP 1.1 implies
-     * the use of a "Expect: 100-continue" header. If the server doesn't
-     * send HTTP/1.1 100 Continue, libcurl will not call transfer_read_cb
-     * in order to send POST data.
-     */
-    goto no_100_continue_response;
   }
 
   if (m_code != CURLM_OK) {
-    goto curl_multi_error;
-  }
-
-  /* problems still might have occurred on individual transfers even when
-   * curl_multi_perform returns CURLM_OK */
-  if ((e_code = gst_curl_sink_transfer_check (sink)) != CURLE_OK) {
-    goto curl_easy_error;
-  }
-
-  /* check response code */
-  curl_easy_getinfo (sink->curl, CURLINFO_RESPONSE_CODE, &resp);
-  GST_DEBUG_OBJECT (sink, "response code: %ld", resp);
-  if (resp < 200 || resp >= 300) {
-    goto response_error;
-  }
-
-  return GST_FLOW_OK;
-
-poll_error:
-  {
-    GST_DEBUG_OBJECT (sink, "poll failed: %s", g_strerror (errno));
-    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("poll failed"), (NULL));
-    return GST_FLOW_ERROR;
-  }
-
-poll_stopped:
-  {
-    GST_DEBUG_OBJECT (sink, "poll stopped");
-    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("poll stopped"), (NULL));
-    return GST_FLOW_ERROR;
-  }
-
-poll_timeout:
-  {
-    GST_DEBUG_OBJECT (sink, "poll timed out");
-    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("poll timed out"), (NULL));
-    return GST_FLOW_ERROR;
-  }
-
-curl_multi_error:
-  {
     GST_DEBUG_OBJECT (sink, "curl multi error");
     GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("%s",
             curl_multi_strerror (m_code)), (NULL));
-    return GST_FLOW_ERROR;
+    retval = GST_FLOW_ERROR;
+    goto fail;
   }
 
-curl_easy_error:
-  {
+  /* problems still might have occurred on individual transfers even when
+   * curl_multi_perform returns CURLM_OK */
+  if ((e_code = gst_curl_base_sink_transfer_check (sink)) != CURLE_OK) {
     GST_DEBUG_OBJECT (sink, "curl easy error");
     GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("%s",
             curl_easy_strerror (e_code)), (NULL));
-    return GST_FLOW_ERROR;
+    retval = GST_FLOW_ERROR;
+    goto fail;
   }
 
-no_100_continue_response:
-  {
-    GST_DEBUG_OBJECT (sink, "100 continue response missing");
-    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("100 continue response missing"),
-        (NULL));
-    return GST_FLOW_ERROR;
-  }
+  gst_curl_base_sink_got_response_notify (sink);
 
-response_error:
-  {
-    GST_DEBUG_OBJECT (sink, "response error");
-    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("response error: %ld", resp),
-        (NULL));
-    return GST_FLOW_ERROR;
+  return;
+
+fail:
+  GST_OBJECT_LOCK (sink);
+  if (sink->flow_ret == GST_FLOW_OK) {
+    sink->flow_ret = retval;
   }
+  GST_OBJECT_UNLOCK (sink);
+  return;
 }
 
 /* This function gets called by libcurl after the socket() call but before
  * the connect() call. */
 static int
-gst_curl_sink_transfer_socket_cb (void *clientp, curl_socket_t curlfd,
+gst_curl_base_sink_transfer_socket_cb (void *clientp, curl_socket_t curlfd,
     curlsocktype G_GNUC_UNUSED purpose)
 {
-  GstCurlSink *sink;
+  GstCurlBaseSink *sink;
   gboolean ret = TRUE;
 
-  sink = (GstCurlSink *) clientp;
+  sink = (GstCurlBaseSink *) clientp;
 
   g_assert (sink);
 
@@ -996,7 +870,7 @@ gst_curl_sink_transfer_socket_cb (void *clientp, curl_socket_t curlfd,
   ret = ret && gst_poll_fd_ctl_read (sink->fdset, &sink->fd, TRUE);
   GST_DEBUG ("fd: %d", sink->fd.fd);
   GST_OBJECT_LOCK (sink);
-  gst_curl_sink_setup_dscp_unlocked (sink);
+  gst_curl_base_sink_setup_dscp_unlocked (sink);
   GST_OBJECT_UNLOCK (sink);
 
   /* success */
@@ -1008,7 +882,7 @@ gst_curl_sink_transfer_socket_cb (void *clientp, curl_socket_t curlfd,
 }
 
 static gboolean
-gst_curl_sink_transfer_start_unlocked (GstCurlSink * sink)
+gst_curl_base_sink_transfer_start_unlocked (GstCurlBaseSink * sink)
 {
   GError *error = NULL;
   gboolean ret = TRUE;
@@ -1017,8 +891,8 @@ gst_curl_sink_transfer_start_unlocked (GstCurlSink * sink)
   sink->transfer_thread_close = FALSE;
   sink->new_file = TRUE;
   sink->transfer_thread =
-      g_thread_create ((GThreadFunc) gst_curl_sink_transfer_thread_func, sink,
-      TRUE, &error);
+      g_thread_create ((GThreadFunc) gst_curl_base_sink_transfer_thread_func,
+      sink, TRUE, &error);
 
   if (sink->transfer_thread == NULL || error != NULL) {
     ret = FALSE;
@@ -1034,15 +908,16 @@ gst_curl_sink_transfer_start_unlocked (GstCurlSink * sink)
 }
 
 static gpointer
-gst_curl_sink_transfer_thread_func (gpointer data)
+gst_curl_base_sink_transfer_thread_func (gpointer data)
 {
-  GstCurlSink *sink = (GstCurlSink *) data;
-  GstFlowReturn ret = GST_FLOW_OK;
+  GstCurlBaseSink *sink = (GstCurlBaseSink *) data;
+  GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
+  GstFlowReturn ret;
   gboolean data_available;
 
   GST_LOG ("transfer thread started");
   GST_OBJECT_LOCK (sink);
-  if (!gst_curl_sink_transfer_setup_unlocked (sink)) {
+  if (!gst_curl_base_sink_transfer_setup_unlocked (sink)) {
     GST_DEBUG_OBJECT (sink, "curl setup error");
     GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("curl setup error"), (NULL));
     sink->flow_ret = GST_FLOW_ERROR;
@@ -1050,49 +925,76 @@ gst_curl_sink_transfer_thread_func (gpointer data)
   }
 
   while (!sink->transfer_thread_close && sink->flow_ret == GST_FLOW_OK) {
-    /* we are working on a new file, clearing flag and setting file
-     * name in http header */
+    /* we are working on a new file, clearing flag and setting a new file
+     * name */
     sink->new_file = FALSE;
 
     /* wait for data to arrive for this new file, if we get a new file name
      * again before getting data we will simply skip transfering anything
      * for this file and go directly to the new file */
-    data_available = gst_curl_sink_wait_for_data_unlocked (sink);
+    data_available = gst_curl_base_sink_wait_for_data_unlocked (sink);
     if (data_available) {
-      gst_curl_sink_set_http_header_unlocked (sink);
+      if (G_UNLIKELY (!klass->set_protocol_dynamic_options_unlocked (sink))) {
+        sink->flow_ret = GST_FLOW_ERROR;
+        GST_OBJECT_UNLOCK (sink);
+        GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, ("Unexpected state."),
+            (NULL));
+        GST_OBJECT_LOCK (sink);
+        goto done;
+      }
     }
 
     /* stay unlocked while handling the actual transfer */
     GST_OBJECT_UNLOCK (sink);
 
     if (data_available) {
-      curl_multi_add_handle (sink->multi_handle, sink->curl);
+      if (!gst_curl_base_sink_is_live (sink)) {
+        /* prepare transfer if needed */
+        if (klass->prepare_transfer) {
+          GST_OBJECT_LOCK (sink);
+          if (!klass->prepare_transfer (sink)) {
+            sink->flow_ret = GST_FLOW_ERROR;
+            goto done;
+          }
+          GST_OBJECT_UNLOCK (sink);
+        }
+        curl_multi_add_handle (sink->multi_handle, sink->curl);
+      }
 
       /* Start driving the transfer. */
-      ret = gst_curl_sink_handle_transfer (sink);
+      klass->handle_transfer (sink);
 
       /* easy handle will be possibly re-used for next transfer, thus it needs to
        * be removed from the multi stack and re-added again */
-      curl_multi_remove_handle (sink->multi_handle, sink->curl);
+      if (!gst_curl_base_sink_is_live (sink)) {
+        curl_multi_remove_handle (sink->multi_handle, sink->curl);
+      }
     }
 
     /* lock again before looping to check the thread closed flag */
     GST_OBJECT_LOCK (sink);
+  }
 
-    /* if we have transfered data, then set the return code */
-    if (data_available) {
-      sink->flow_ret = ret;
-    }
+  if (sink->is_live) {
+    curl_multi_remove_handle (sink->multi_handle, sink->curl);
   }
 
 done:
+  /* extract the error code so the lock does not have to be
+   * taken when calling the functions below that take the lock
+   * on their own */
+  ret = sink->flow_ret;
+  GST_OBJECT_UNLOCK (sink);
+
   /* if there is a flow error, always notify the render function so it
-   * can return the flow error up along the pipeline */
-  if (sink->flow_ret != GST_FLOW_OK) {
-    gst_curl_sink_data_sent_notify_unlocked (sink);
+   * can return the flow error up along the pipeline. as an error has
+   * occurred there is no response to receive, so notify the event function
+   * so it doesn't block indefinitely waiting for a response. */
+  if (ret != GST_FLOW_OK) {
+    gst_curl_base_sink_data_sent_notify (sink);
+    gst_curl_base_sink_got_response_notify (sink);
   }
 
-  GST_OBJECT_UNLOCK (sink);
   GST_DEBUG ("exit thread func - transfer thread close flag: %d",
       sink->transfer_thread_close);
 
@@ -1100,7 +1002,7 @@ done:
 }
 
 static gboolean
-gst_curl_sink_transfer_setup_unlocked (GstCurlSink * sink)
+gst_curl_base_sink_transfer_setup_unlocked (GstCurlBaseSink * sink)
 {
   g_assert (sink);
 
@@ -1112,7 +1014,7 @@ gst_curl_sink_transfer_setup_unlocked (GstCurlSink * sink)
     }
   }
 
-  if (!gst_curl_sink_transfer_set_options_unlocked (sink)) {
+  if (!gst_curl_base_sink_transfer_set_options_unlocked (sink)) {
     g_warning ("Failed to setup easy handle");
     GST_OBJECT_UNLOCK (sink);
     return FALSE;
@@ -1129,7 +1031,7 @@ gst_curl_sink_transfer_setup_unlocked (GstCurlSink * sink)
 }
 
 static void
-gst_curl_sink_transfer_cleanup (GstCurlSink * sink)
+gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink)
 {
   if (sink->curl != NULL) {
     if (sink->multi_handle != NULL) {
@@ -1146,7 +1048,7 @@ gst_curl_sink_transfer_cleanup (GstCurlSink * sink)
 }
 
 static gboolean
-gst_curl_sink_wait_for_data_unlocked (GstCurlSink * sink)
+gst_curl_base_sink_wait_for_data_unlocked (GstCurlBaseSink * sink)
 {
   gboolean data_available = FALSE;
 
@@ -1169,16 +1071,7 @@ gst_curl_sink_wait_for_data_unlocked (GstCurlSink * sink)
 }
 
 static void
-gst_curl_sink_transfer_thread_notify_unlocked (GstCurlSink * sink)
-{
-  GST_LOG ("more data to send");
-  sink->transfer_cond->data_available = TRUE;
-  sink->transfer_cond->data_sent = FALSE;
-  g_cond_signal (sink->transfer_cond->cond);
-}
-
-static void
-gst_curl_sink_new_file_notify_unlocked (GstCurlSink * sink)
+gst_curl_base_sink_new_file_notify_unlocked (GstCurlBaseSink * sink)
 {
   GST_LOG ("new file name");
   sink->new_file = TRUE;
@@ -1186,15 +1079,8 @@ gst_curl_sink_new_file_notify_unlocked (GstCurlSink * sink)
 }
 
 static void
-gst_curl_sink_transfer_thread_close_unlocked (GstCurlSink * sink)
-{
-  GST_LOG ("setting transfer thread close flag");
-  sink->transfer_thread_close = TRUE;
-  g_cond_signal (sink->transfer_cond->cond);
-}
-
-static void
-gst_curl_sink_wait_for_transfer_thread_to_send_unlocked (GstCurlSink * sink)
+    gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked
+    (GstCurlBaseSink * sink)
 {
   GST_LOG ("waiting for buffer send to complete");
 
@@ -1209,16 +1095,43 @@ gst_curl_sink_wait_for_transfer_thread_to_send_unlocked (GstCurlSink * sink)
 }
 
 static void
-gst_curl_sink_data_sent_notify_unlocked (GstCurlSink * sink)
+gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink)
 {
   GST_LOG ("transfer completed");
+  GST_OBJECT_LOCK (sink);
   sink->transfer_cond->data_available = FALSE;
   sink->transfer_cond->data_sent = TRUE;
   g_cond_signal (sink->transfer_cond->cond);
+  GST_OBJECT_UNLOCK (sink);
+}
+
+static void
+gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink)
+{
+  GST_LOG ("waiting for remote to send response code");
+
+  GST_OBJECT_LOCK (sink);
+  while (sink->transfer_cond->wait_for_response) {
+    g_cond_wait (sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
+  }
+  GST_OBJECT_UNLOCK (sink);
+
+  GST_LOG ("response code received");
+}
+
+static void
+gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink)
+{
+  GST_LOG ("got response code");
+
+  GST_OBJECT_LOCK (sink);
+  sink->transfer_cond->wait_for_response = FALSE;
+  g_cond_signal (sink->transfer_cond->cond);
+  GST_OBJECT_UNLOCK (sink);
 }
 
 static gint
-gst_curl_sink_setup_dscp_unlocked (GstCurlSink * sink)
+gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink)
 {
   gint tos;
   gint af;
diff --git a/ext/curl/gstcurlbasesink.h b/ext/curl/gstcurlbasesink.h
new file mode 100644 (file)
index 0000000..f40d3bc
--- /dev/null
@@ -0,0 +1,116 @@
+/* GStreamer
+ * Copyright (C) 2011 Axis Communications <dev-gstreamer@axis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __GST_CURL_BASE_SINK__
+#define __GST_CURL_BASE_SINK__
+
+#include <gst/gst.h>
+#include <gst/base/gstbasesink.h>
+#include <curl/curl.h>
+
+G_BEGIN_DECLS
+#define GST_TYPE_CURL_BASE_SINK \
+  (gst_curl_base_sink_get_type())
+#define GST_CURL_BASE_SINK(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_CURL_BASE_SINK, GstCurlBaseSink))
+#define GST_CURL_BASE_SINK_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_CURL_BASE_SINK, GstCurlBaseSinkClass))
+#define GST_CURL_BASE_SINK_GET_CLASS(obj) \
+  (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_CURL_BASE_SINK, GstCurlBaseSinkClass))
+#define GST_IS_CURL_BASE_SINK(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_CURL_BASE_SINK))
+#define GST_IS_CURL_BASE_SINK_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_CURL_BASE_SINK))
+typedef struct _GstCurlBaseSink GstCurlBaseSink;
+typedef struct _GstCurlBaseSinkClass GstCurlBaseSinkClass;
+
+typedef struct _TransferBuffer TransferBuffer;
+typedef struct _TransferCondition TransferCondition;
+
+struct _TransferBuffer
+{
+  guint8 *ptr;
+  size_t len;
+  size_t offset;
+};
+
+struct _TransferCondition
+{
+  GCond *cond;
+  gboolean data_sent;
+  gboolean data_available;
+  gboolean wait_for_response;
+};
+
+struct _GstCurlBaseSink
+{
+  GstBaseSink parent;
+
+  /*< private > */
+  CURLM *multi_handle;
+  CURL *curl;
+  GstPollFD fd;
+  GstPoll *fdset;
+  GThread *transfer_thread;
+  GstFlowReturn flow_ret;
+  TransferBuffer *transfer_buf;
+  TransferCondition *transfer_cond;
+  gint num_buffers_per_packet;
+  gint timeout;
+  gchar *url;
+  gchar *user;
+  gchar *passwd;
+  gchar *file_name;
+  guint qos_dscp;
+  gboolean transfer_thread_close;
+  gboolean new_file;
+  gboolean is_live;
+};
+
+struct _GstCurlBaseSinkClass
+{
+  GstBaseSinkClass parent_class;
+
+  /* vmethods */
+    gboolean (*set_protocol_dynamic_options_unlocked) (GstCurlBaseSink * sink);
+    gboolean (*set_options_unlocked) (GstCurlBaseSink * sink);
+  void (*set_mime_type) (GstCurlBaseSink * sink, GstCaps * caps);
+  void (*transfer_prepare_poll_wait) (GstCurlBaseSink * sink);
+    glong (*transfer_get_response_code) (GstCurlBaseSink * sink, glong resp);
+    gboolean (*transfer_verify_response_code) (GstCurlBaseSink * sink);
+    GstFlowReturn (*prepare_transfer) (GstCurlBaseSink * sink);
+  void (*handle_transfer) (GstCurlBaseSink * sink);
+    size_t (*transfer_read_cb) (void *curl_ptr, size_t size, size_t nmemb,
+      void *stream);
+    size_t (*transfer_data_buffer) (GstCurlBaseSink * sink, void *curl_ptr,
+      size_t block_size, guint * last_chunk);
+    size_t (*flush_data_unlocked) (GstCurlBaseSink * sink, void *curl_ptr,
+      size_t block_size, gboolean new_file);
+};
+
+GType gst_curl_base_sink_get_type (void);
+
+void gst_curl_base_sink_transfer_thread_notify_unlocked
+    (GstCurlBaseSink * sink);
+void gst_curl_base_sink_transfer_thread_close (GstCurlBaseSink * sink);
+void gst_curl_base_sink_set_live (GstCurlBaseSink * sink, gboolean live);
+gboolean gst_curl_base_sink_is_live (GstCurlBaseSink * sink);
+
+G_END_DECLS
+#endif
diff --git a/ext/curl/gstcurlhttpsink.c b/ext/curl/gstcurlhttpsink.c
new file mode 100644 (file)
index 0000000..7101434
--- /dev/null
@@ -0,0 +1,476 @@
+/* GStreamer
+ * Copyright (C) 2011 Axis Communications <dev-gstreamer@axis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/**
+ * SECTION:element-curlhttpsink
+ * @short_description: sink that uploads data to a server using libcurl
+ * @see_also:
+ *
+ * This is a network sink that uses libcurl as a client to upload data to
+ * an HTTP server.
+ *
+ * <refsect2>
+ * <title>Example launch line (upload a JPEG file to an HTTP server)</title>
+ * |[
+ * gst-launch filesrc location=image.jpg ! jpegparse ! curlhttpsink  \
+ *     file-name=image.jpg  \
+ *     location=http://192.168.0.1:8080/cgi-bin/patupload.cgi/  \
+ *     user=test passwd=test  \
+ *     content-type=image/jpeg  \
+ *     use-content-length=false
+ * ]|
+ * </refsect2>
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <curl/curl.h>
+#include <string.h>
+#include <stdio.h>
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <unistd.h>
+#include <netinet/ip.h>
+#include <netinet/tcp.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "gstcurltlssink.h"
+#include "gstcurlhttpsink.h"
+
+/* Default values */
+#define GST_CAT_DEFAULT                gst_curl_http_sink_debug
+#define DEFAULT_TIMEOUT                30
+#define DEFAULT_PROXY_PORT             3128
+#define DEFAULT_USE_CONTENT_LENGTH     FALSE
+
+#define RESPONSE_CONNECT_PROXY         200
+
+/* Plugin specific settings */
+
+GST_DEBUG_CATEGORY_STATIC (gst_curl_http_sink_debug);
+
+enum
+{
+  PROP_0,
+  PROP_PROXY,
+  PROP_PROXY_PORT,
+  PROP_PROXY_USER_NAME,
+  PROP_PROXY_USER_PASSWD,
+  PROP_USE_CONTENT_LENGTH,
+  PROP_CONTENT_TYPE
+};
+
+
+/* Object class function declarations */
+
+static void gst_curl_http_sink_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec);
+static void gst_curl_http_sink_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec);
+static void gst_curl_http_sink_finalize (GObject * gobject);
+static gboolean gst_curl_http_sink_set_header_unlocked
+    (GstCurlBaseSink * bcsink);
+static gboolean gst_curl_http_sink_set_options_unlocked
+    (GstCurlBaseSink * bcsink);
+static void gst_curl_http_sink_set_mime_type
+    (GstCurlBaseSink * bcsink, GstCaps * caps);
+static gboolean gst_curl_http_sink_transfer_verify_response_code
+    (GstCurlBaseSink * bcsink);
+static void gst_curl_http_sink_transfer_prepare_poll_wait
+    (GstCurlBaseSink * bcsink);
+
+#define gst_curl_http_sink_parent_class parent_class
+G_DEFINE_TYPE (GstCurlHttpSink, gst_curl_http_sink, GST_TYPE_CURL_TLS_SINK);
+
+/* private functions */
+
+static gboolean proxy_setup (GstCurlBaseSink * bcsink);
+
+static void
+gst_curl_http_sink_class_init (GstCurlHttpSinkClass * klass)
+{
+  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+  GstCurlBaseSinkClass *gstcurlbasesink_class = (GstCurlBaseSinkClass *) klass;
+  GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
+
+  GST_DEBUG_CATEGORY_INIT (gst_curl_http_sink_debug, "curlhttpsink", 0,
+      "curl http sink element");
+  GST_DEBUG_OBJECT (klass, "class_init");
+
+  gst_element_class_set_details_simple (element_class,
+      "Curl http sink",
+      "Sink/Network",
+      "Upload data over HTTP/HTTPS protocol using libcurl",
+      "Patricia Muscalu <patricia@axis.com>");
+
+  gstcurlbasesink_class->set_protocol_dynamic_options_unlocked =
+      gst_curl_http_sink_set_header_unlocked;
+  gstcurlbasesink_class->set_options_unlocked =
+      gst_curl_http_sink_set_options_unlocked;
+  gstcurlbasesink_class->set_mime_type = gst_curl_http_sink_set_mime_type;
+  gstcurlbasesink_class->transfer_verify_response_code =
+      gst_curl_http_sink_transfer_verify_response_code;
+  gstcurlbasesink_class->transfer_prepare_poll_wait =
+      gst_curl_http_sink_transfer_prepare_poll_wait;
+
+  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_http_sink_finalize);
+
+  gobject_class->set_property = gst_curl_http_sink_set_property;
+  gobject_class->get_property = gst_curl_http_sink_get_property;
+
+  g_object_class_install_property (gobject_class, PROP_PROXY,
+      g_param_spec_string ("proxy", "Proxy", "HTTP proxy server URI", NULL,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_PROXY_PORT,
+      g_param_spec_int ("proxy-port", "Proxy port",
+          "HTTP proxy server port", 0, G_MAXINT, DEFAULT_PROXY_PORT,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_PROXY_USER_NAME,
+      g_param_spec_string ("proxy-user", "Proxy user name",
+          "Proxy user name to use for proxy authentication",
+          NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_PROXY_USER_PASSWD,
+      g_param_spec_string ("proxy-passwd", "Proxy user password",
+          "Proxy user password to use for proxy authentication",
+          NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_USE_CONTENT_LENGTH,
+      g_param_spec_boolean ("use-content-length", "Use content length header",
+          "Use the Content-Length HTTP header instead of "
+          "Transfer-Encoding header", DEFAULT_USE_CONTENT_LENGTH,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_CONTENT_TYPE,
+      g_param_spec_string ("content-type", "Content type",
+          "The mime type of the body of the request", NULL,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+}
+
+static void
+gst_curl_http_sink_init (GstCurlHttpSink * sink)
+{
+  sink->header_list = NULL;
+  sink->use_content_length = DEFAULT_USE_CONTENT_LENGTH;
+  sink->content_type = NULL;
+
+  sink->proxy_port = DEFAULT_PROXY_PORT;
+  sink->proxy_headers_set = FALSE;
+  sink->proxy_auth = FALSE;
+  sink->use_proxy = FALSE;
+  sink->proxy_conn_established = FALSE;
+  sink->proxy_resp = -1;
+}
+
+static void
+gst_curl_http_sink_finalize (GObject * gobject)
+{
+  GstCurlHttpSink *this = GST_CURL_HTTP_SINK (gobject);
+
+  GST_DEBUG ("finalizing curlhttpsink");
+  g_free (this->proxy);
+  g_free (this->proxy_user);
+  g_free (this->proxy_passwd);
+  g_free (this->content_type);
+
+  if (this->header_list) {
+    curl_slist_free_all (this->header_list);
+    this->header_list = NULL;
+  }
+
+  G_OBJECT_CLASS (parent_class)->finalize (gobject);
+}
+
+static void
+gst_curl_http_sink_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstCurlHttpSink *sink;
+  GstState cur_state;
+
+  g_return_if_fail (GST_IS_CURL_HTTP_SINK (object));
+  sink = GST_CURL_HTTP_SINK (object);
+
+  gst_element_get_state (GST_ELEMENT (sink), &cur_state, NULL, 0);
+  if (cur_state != GST_STATE_PLAYING && cur_state != GST_STATE_PAUSED) {
+    GST_OBJECT_LOCK (sink);
+
+    switch (prop_id) {
+      case PROP_PROXY:
+        g_free (sink->proxy);
+        sink->proxy = g_value_dup_string (value);
+        GST_DEBUG_OBJECT (sink, "proxy set to %s", sink->proxy);
+        break;
+      case PROP_PROXY_PORT:
+        sink->proxy_port = g_value_get_int (value);
+        GST_DEBUG_OBJECT (sink, "proxy port set to %d", sink->proxy_port);
+        break;
+      case PROP_PROXY_USER_NAME:
+        g_free (sink->proxy_user);
+        sink->proxy_user = g_value_dup_string (value);
+        GST_DEBUG_OBJECT (sink, "proxy user set to %s", sink->proxy_user);
+        break;
+      case PROP_PROXY_USER_PASSWD:
+        g_free (sink->proxy_passwd);
+        sink->proxy_passwd = g_value_dup_string (value);
+        GST_DEBUG_OBJECT (sink, "proxy password set to %s", sink->proxy_passwd);
+        break;
+      case PROP_USE_CONTENT_LENGTH:
+        sink->use_content_length = g_value_get_boolean (value);
+        GST_DEBUG_OBJECT (sink, "use_content_length set to %d",
+            sink->use_content_length);
+        break;
+      case PROP_CONTENT_TYPE:
+        g_free (sink->content_type);
+        sink->content_type = g_value_dup_string (value);
+        GST_DEBUG_OBJECT (sink, "content type set to %s", sink->content_type);
+        break;
+      default:
+        GST_DEBUG_OBJECT (sink, "invalid property id %d", prop_id);
+        break;
+    }
+
+    GST_OBJECT_UNLOCK (sink);
+
+    return;
+  }
+
+  /* in PLAYING or PAUSED state */
+  GST_OBJECT_LOCK (sink);
+
+  switch (prop_id) {
+    case PROP_CONTENT_TYPE:
+      g_free (sink->content_type);
+      sink->content_type = g_value_dup_string (value);
+      GST_DEBUG_OBJECT (sink, "content type set to %s", sink->content_type);
+      break;
+    default:
+      GST_WARNING_OBJECT (sink, "cannot set property when PLAYING");
+      break;
+  }
+
+  GST_OBJECT_UNLOCK (sink);
+}
+
+static void
+gst_curl_http_sink_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec)
+{
+  GstCurlHttpSink *sink;
+
+  g_return_if_fail (GST_IS_CURL_HTTP_SINK (object));
+  sink = GST_CURL_HTTP_SINK (object);
+
+  switch (prop_id) {
+    case PROP_PROXY:
+      g_value_set_string (value, sink->proxy);
+      break;
+    case PROP_PROXY_PORT:
+      g_value_set_int (value, sink->proxy_port);
+      break;
+    case PROP_PROXY_USER_NAME:
+      g_value_set_string (value, sink->proxy_user);
+      break;
+    case PROP_PROXY_USER_PASSWD:
+      g_value_set_string (value, sink->proxy_passwd);
+      break;
+    case PROP_USE_CONTENT_LENGTH:
+      g_value_set_boolean (value, sink->use_content_length);
+      break;
+    case PROP_CONTENT_TYPE:
+      g_value_set_string (value, sink->content_type);
+      break;
+    default:
+      GST_DEBUG_OBJECT (sink, "invalid property id");
+      break;
+  }
+}
+
+static gboolean
+gst_curl_http_sink_set_header_unlocked (GstCurlBaseSink * bcsink)
+{
+  GstCurlHttpSink *sink = GST_CURL_HTTP_SINK (bcsink);
+  gchar *tmp;
+
+  if (sink->header_list) {
+    curl_slist_free_all (sink->header_list);
+    sink->header_list = NULL;
+  }
+
+  if (!sink->proxy_headers_set && sink->use_proxy) {
+    sink->header_list = curl_slist_append (sink->header_list,
+        "Content-Length: 0");
+    sink->proxy_headers_set = TRUE;
+    goto set_headers;
+  }
+
+  if (sink->use_content_length) {
+    /* if content length is used we assume that every buffer is one
+     * entire file, which is the case when uploading several jpegs */
+    tmp =
+        g_strdup_printf ("Content-Length: %d", (int) bcsink->transfer_buf->len);
+    sink->header_list = curl_slist_append (sink->header_list, tmp);
+    g_free (tmp);
+  } else {
+    /* when sending a POST request to a HTTP 1.1 server, you can send data
+     * without knowing the size before starting the POST if you use chunked
+     * encoding */
+    sink->header_list = curl_slist_append (sink->header_list,
+        "Transfer-Encoding: chunked");
+  }
+
+  tmp = g_strdup_printf ("Content-Type: %s", sink->content_type);
+  sink->header_list = curl_slist_append (sink->header_list, tmp);
+  g_free (tmp);
+
+set_headers:
+
+  tmp = g_strdup_printf ("Content-Disposition: attachment; filename="
+      "\"%s\"", bcsink->file_name);
+  sink->header_list = curl_slist_append (sink->header_list, tmp);
+  g_free (tmp);
+  curl_easy_setopt (bcsink->curl, CURLOPT_HTTPHEADER, sink->header_list);
+
+  return TRUE;
+}
+
+static gboolean
+gst_curl_http_sink_set_options_unlocked (GstCurlBaseSink * bcsink)
+{
+  GstCurlHttpSink *sink = GST_CURL_HTTP_SINK (bcsink);
+  GstCurlTlsSinkClass *parent_class;
+
+  /* proxy settings */
+  if (sink->proxy != NULL && strlen (sink->proxy)) {
+    if (!proxy_setup (bcsink)) {
+      return FALSE;
+    }
+  }
+
+  curl_easy_setopt (bcsink->curl, CURLOPT_POST, 1L);
+
+  /* FIXME: check user & passwd */
+  curl_easy_setopt (bcsink->curl, CURLOPT_HTTPAUTH, CURLAUTH_ANY);
+
+  parent_class = GST_CURL_TLS_SINK_GET_CLASS (sink);
+
+  if (g_str_has_prefix (bcsink->url, "https://")) {
+    return parent_class->set_options_unlocked (bcsink);
+  }
+
+  return TRUE;
+}
+
+static gboolean
+gst_curl_http_sink_transfer_verify_response_code (GstCurlBaseSink * bcsink)
+{
+  GstCurlHttpSink *sink = GST_CURL_HTTP_SINK (bcsink);
+  glong resp;
+
+  curl_easy_getinfo (bcsink->curl, CURLINFO_RESPONSE_CODE, &resp);
+  GST_DEBUG_OBJECT (sink, "response code: %ld", resp);
+
+  if (resp < 100 || resp >= 300) {
+    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
+        ("HTTP response error: (received: %ld)", resp), (NULL));
+
+    return FALSE;
+  }
+
+  return TRUE;
+}
+
+static void
+gst_curl_http_sink_transfer_prepare_poll_wait (GstCurlBaseSink * bcsink)
+{
+  GstCurlHttpSink *sink = GST_CURL_HTTP_SINK (bcsink);
+
+  if (!sink->proxy_conn_established
+      && (sink->proxy_resp != RESPONSE_CONNECT_PROXY)
+      && sink->proxy_auth) {
+    curl_easy_getinfo (bcsink->curl, CURLINFO_HTTP_CONNECTCODE,
+        &sink->proxy_resp);
+    if ((sink->proxy_resp == RESPONSE_CONNECT_PROXY)) {
+      GST_LOG ("received HTTP/1.0 200 Connection Established");
+      /* Workaround: redefine HTTP headers before connecting to HTTP server.
+       * When talking to proxy, the Content-Length: 0 is send with the request.
+       */
+      curl_multi_remove_handle (bcsink->multi_handle, bcsink->curl);
+      gst_curl_http_sink_set_header_unlocked (bcsink);
+      curl_multi_add_handle (bcsink->multi_handle, bcsink->curl);
+      sink->proxy_conn_established = TRUE;
+    }
+  }
+}
+
+// FIXME check this: why critical when no mime is set???
+static void
+gst_curl_http_sink_set_mime_type (GstCurlBaseSink * bcsink, GstCaps * caps)
+{
+  GstCurlHttpSink *sink = GST_CURL_HTTP_SINK (bcsink);
+  GstStructure *structure;
+  const gchar *mime_type;
+
+  if (sink->content_type != NULL) {
+    return;
+  }
+
+  structure = gst_caps_get_structure (caps, 0);
+  mime_type = gst_structure_get_name (structure);
+  sink->content_type = g_strdup (mime_type);
+}
+
+static gboolean
+proxy_setup (GstCurlBaseSink * bcsink)
+{
+  GstCurlHttpSink *sink = GST_CURL_HTTP_SINK (bcsink);
+
+  if (curl_easy_setopt (bcsink->curl, CURLOPT_PROXY, sink->proxy)
+      != CURLE_OK) {
+    return FALSE;
+  }
+
+  if (curl_easy_setopt (bcsink->curl, CURLOPT_PROXYPORT, sink->proxy_port)
+      != CURLE_OK) {
+    return FALSE;
+  }
+
+  if (sink->proxy_user != NULL &&
+      strlen (sink->proxy_user) &&
+      sink->proxy_passwd != NULL && strlen (sink->proxy_passwd)) {
+    curl_easy_setopt (bcsink->curl, CURLOPT_PROXYUSERNAME, sink->proxy_user);
+    curl_easy_setopt (bcsink->curl, CURLOPT_PROXYPASSWORD, sink->proxy_passwd);
+    curl_easy_setopt (bcsink->curl, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
+    sink->proxy_auth = TRUE;
+  }
+
+  if (g_str_has_prefix (bcsink->url, "https://")) {
+    /* tunnel all operations through a given HTTP proxy */
+    if (curl_easy_setopt (bcsink->curl, CURLOPT_HTTPPROXYTUNNEL, 1L)
+        != CURLE_OK) {
+      return FALSE;
+    }
+  }
+
+  sink->use_proxy = TRUE;
+
+  return TRUE;
+}
diff --git a/ext/curl/gstcurlhttpsink.h b/ext/curl/gstcurlhttpsink.h
new file mode 100644 (file)
index 0000000..551b891
--- /dev/null
@@ -0,0 +1,69 @@
+/* GStreamer
+ * Copyright (C) 2011 Axis Communications <dev-gstreamer@axis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __GST_CURL_HTTP_SINK__
+#define __GST_CURL_HTTP_SINK__
+
+#include <gst/gst.h>
+#include <gst/base/gstbasesink.h>
+#include <curl/curl.h>
+#include "gstcurltlssink.h"
+
+G_BEGIN_DECLS
+#define GST_TYPE_CURL_HTTP_SINK \
+  (gst_curl_http_sink_get_type())
+#define GST_CURL_HTTP_SINK(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_CURL_HTTP_SINK, GstCurlHttpSink))
+#define GST_CURL_HTTP_SINK_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_CURL_HTTP_SINK, GstCurlHttpSinkClass))
+#define GST_IS_CURL_HTTP_SINK(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_CURL_HTTP_SINK))
+#define GST_IS_CURL_HTTP_SINK_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_CURL_HTTP_SINK))
+typedef struct _GstCurlHttpSink GstCurlHttpSink;
+typedef struct _GstCurlHttpSinkClass GstCurlHttpSinkClass;
+
+struct _GstCurlHttpSink
+{
+  GstCurlTlsSink parent;
+
+  /*< private > */
+  struct curl_slist *header_list;
+  gchar *proxy;
+  guint proxy_port;
+  gchar *proxy_user;
+  gchar *proxy_passwd;
+  gboolean use_content_length;
+  gchar *content_type;
+  gboolean use_proxy;
+  gboolean proxy_headers_set;
+  gboolean proxy_auth;
+  gboolean proxy_conn_established;
+  glong proxy_resp;
+};
+
+struct _GstCurlHttpSinkClass
+{
+  GstCurlTlsSinkClass parent_class;
+};
+
+GType gst_curl_http_sink_get_type (void);
+
+G_END_DECLS
+#endif
diff --git a/ext/curl/gstcurlsink.h b/ext/curl/gstcurlsink.h
deleted file mode 100644 (file)
index d158577..0000000
+++ /dev/null
@@ -1,100 +0,0 @@
-/* GStreamer
- * Copyright (C) 2011 Axis Communications <dev-gstreamer@axis.com>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-#ifndef __GST_CURL_SINK__
-#define __GST_CURL_SINK__
-
-#include <gst/gst.h>
-#include <gst/base/gstbasesink.h>
-#include <curl/curl.h>
-
-G_BEGIN_DECLS
-
-#define GST_TYPE_CURL_SINK \
-  (gst_curl_sink_get_type())
-#define GST_CURL_SINK(obj) \
-  (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_CURL_SINK, GstCurlSink))
-#define GST_CURL_SINK_CLASS(klass) \
-  (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_CURL_SINK, GstCurlSinkClass))
-#define GST_IS_CURL_SINK(obj) \
-  (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_CURL_SINK))
-#define GST_IS_CURL_SINK_CLASS(klass) \
-  (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_CURL_SINK))
-
-typedef struct _GstCurlSink GstCurlSink;
-typedef struct _GstCurlSinkClass GstCurlSinkClass;
-
-typedef struct _TransferBuffer TransferBuffer;
-typedef struct _TransferCondition TransferCondition;
-
-struct _TransferBuffer {
-  guint8 *ptr;
-  size_t len;
-  size_t offset;
-};
-
-struct _TransferCondition {
-  GCond *cond;
-  gboolean data_sent;
-  gboolean data_available;
-};
-
-struct _GstCurlSink
-{
-  GstBaseSink parent;
-
-  /*< private >*/
-  CURLM *multi_handle;
-  CURL *curl;
-  struct curl_slist *header_list;
-  GstPollFD fd;
-  GstPoll *fdset;
-  GThread *transfer_thread;
-  GstFlowReturn flow_ret;
-  TransferBuffer *transfer_buf;
-  TransferCondition *transfer_cond;
-  gint num_buffers_per_packet;
-  gint timeout;
-  gchar *url;
-  gchar *user;
-  gchar *passwd;
-  gchar *proxy;
-  guint proxy_port;
-  gchar *proxy_user;
-  gchar *proxy_passwd;
-  gchar *file_name;
-  guint qos_dscp;
-  gboolean accept_self_signed;
-  gboolean use_content_length;
-  gboolean transfer_thread_close;
-  gboolean new_file;
-  gchar *content_type;
-  gboolean proxy_headers_set;
-};
-
-struct _GstCurlSinkClass
-{
-  GstBaseSinkClass parent_class;
-};
-
-GType gst_curl_sink_get_type (void);
-
-G_END_DECLS
-
-#endif
diff --git a/ext/curl/gstcurltlssink.c b/ext/curl/gstcurltlssink.c
new file mode 100644 (file)
index 0000000..fcb1570
--- /dev/null
@@ -0,0 +1,279 @@
+/* GStreamer
+ * Copyright (C) 2011 Axis Communications <dev-gstreamer@axis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/**
+ * SECTION:element-curltlssink
+ * @short_description: sink that uploads data to a server using libcurl
+ * @see_also:
+ *
+ * This is a network sink that uses libcurl.
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <curl/curl.h>
+#include <string.h>
+#include <stdio.h>
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <unistd.h>
+#include <netinet/ip.h>
+#include <netinet/tcp.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "gstcurlbasesink.h"
+#include "gstcurltlssink.h"
+
+/* Default values */
+#define GST_CAT_DEFAULT                gst_curl_tls_sink_debug
+#define DEFAULT_INSECURE               TRUE
+
+
+/* Plugin specific settings */
+
+GST_DEBUG_CATEGORY_STATIC (gst_curl_tls_sink_debug);
+
+enum
+{
+  PROP_0,
+  PROP_CA_CERT,
+  PROP_CA_PATH,
+  PROP_CRYPTO_ENGINE,
+  PROP_INSECURE
+};
+
+
+/* Object class function declarations */
+
+static void gst_curl_tls_sink_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec);
+static void gst_curl_tls_sink_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec);
+static void gst_curl_tls_sink_finalize (GObject * gobject);
+static gboolean gst_curl_tls_sink_set_options_unlocked
+    (GstCurlBaseSink * bcsink);
+
+#define gst_curl_tls_sink_parent_class parent_class
+G_DEFINE_TYPE (GstCurlTlsSink, gst_curl_tls_sink, GST_TYPE_CURL_BASE_SINK);
+
+/* private functions */
+
+static void
+gst_curl_tls_sink_class_init (GstCurlTlsSinkClass * klass)
+{
+  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+  GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
+
+  GST_DEBUG_CATEGORY_INIT (gst_curl_tls_sink_debug, "curltlssink", 0,
+      "curl tls sink element");
+  GST_DEBUG_OBJECT (klass, "class_init");
+
+  gst_element_class_set_details_simple (element_class,
+      "Curl tls sink",
+      "Sink/Network",
+      "Upload data over TLS protocol using libcurl",
+      "Patricia Muscalu <patricia@axis.com>");
+
+  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_tls_sink_finalize);
+
+  gobject_class->set_property = gst_curl_tls_sink_set_property;
+  gobject_class->get_property = gst_curl_tls_sink_get_property;
+
+  klass->set_options_unlocked = gst_curl_tls_sink_set_options_unlocked;
+
+  g_object_class_install_property (gobject_class, PROP_CA_CERT,
+      g_param_spec_string ("ca-cert",
+          "CA certificate",
+          "CA certificate to use in order to verify the peer",
+          NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_CA_PATH,
+      g_param_spec_string ("ca-path",
+          "CA path",
+          "CA directory path to use in order to verify the peer",
+          NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_CRYPTO_ENGINE,
+      g_param_spec_string ("crypto-engine",
+          "OpenSSL crypto engine",
+          "OpenSSL crytpo engine to use for cipher operations",
+          NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_INSECURE,
+      g_param_spec_boolean ("insecure",
+          "Perform insecure SSL connections",
+          "Allow curl to perform insecure SSL connections",
+          DEFAULT_INSECURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+}
+
+static void
+gst_curl_tls_sink_init (GstCurlTlsSink * sink)
+{
+  sink->ca_cert = NULL;
+  sink->ca_path = NULL;
+  sink->crypto_engine = NULL;
+  sink->insecure = DEFAULT_INSECURE;
+}
+
+static void
+gst_curl_tls_sink_finalize (GObject * gobject)
+{
+  GstCurlTlsSink *this = GST_CURL_TLS_SINK (gobject);
+
+  GST_DEBUG ("finalizing curltlssink");
+
+  g_free (this->ca_cert);
+  g_free (this->ca_path);
+  g_free (this->crypto_engine);
+
+  G_OBJECT_CLASS (parent_class)->finalize (gobject);
+}
+
+static void
+gst_curl_tls_sink_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstCurlTlsSink *sink;
+  GstState cur_state;
+
+  g_return_if_fail (GST_IS_CURL_TLS_SINK (object));
+  sink = GST_CURL_TLS_SINK (object);
+
+  gst_element_get_state (GST_ELEMENT (sink), &cur_state, NULL, 0);
+  if (cur_state != GST_STATE_PLAYING && cur_state != GST_STATE_PAUSED) {
+    GST_OBJECT_LOCK (sink);
+
+    switch (prop_id) {
+      case PROP_CA_CERT:
+        g_free (sink->ca_cert);
+        sink->ca_cert = g_value_dup_string (value);
+        sink->insecure = FALSE;
+        GST_DEBUG_OBJECT (sink, "ca_cert set to %s", sink->ca_cert);
+        break;
+      case PROP_CA_PATH:
+        g_free (sink->ca_path);
+        sink->ca_path = g_value_dup_string (value);
+        sink->insecure = FALSE;
+        GST_DEBUG_OBJECT (sink, "ca_path set to %s", sink->ca_path);
+        break;
+      case PROP_CRYPTO_ENGINE:
+        g_free (sink->crypto_engine);
+        sink->crypto_engine = g_value_dup_string (value);
+        GST_DEBUG_OBJECT (sink, "crypto_engine set to %s", sink->crypto_engine);
+        break;
+      case PROP_INSECURE:
+        sink->insecure = g_value_get_boolean (value);
+        GST_DEBUG_OBJECT (sink, "insecure set to %d", sink->insecure);
+        break;
+    }
+
+    GST_OBJECT_UNLOCK (sink);
+
+    return;
+  }
+
+  GST_OBJECT_UNLOCK (sink);
+}
+
+static void
+gst_curl_tls_sink_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec)
+{
+  GstCurlTlsSink *sink;
+
+  g_return_if_fail (GST_IS_CURL_TLS_SINK (object));
+  sink = GST_CURL_TLS_SINK (object);
+
+  switch (prop_id) {
+    case PROP_CA_CERT:
+      g_value_set_string (value, sink->ca_cert);
+      break;
+    case PROP_CA_PATH:
+      g_value_set_string (value, sink->ca_path);
+      break;
+    case PROP_CRYPTO_ENGINE:
+      g_value_set_string (value, sink->crypto_engine);
+      break;
+    case PROP_INSECURE:
+      g_value_set_boolean (value, sink->insecure);
+      break;
+    default:
+      GST_DEBUG_OBJECT (sink, "invalid property id");
+      break;
+  }
+}
+
+static gboolean
+gst_curl_tls_sink_set_options_unlocked (GstCurlBaseSink * bcsink)
+{
+  GstCurlTlsSink *sink = GST_CURL_TLS_SINK (bcsink);
+
+  if (!g_str_has_prefix (bcsink->url, "http")) {
+    curl_easy_setopt (bcsink->curl, CURLOPT_USE_SSL, 1L);
+  }
+
+  /* crypto engine */
+  if ((g_strcmp0 (sink->crypto_engine, "auto") == 0) ||
+      (sink->crypto_engine == NULL)) {
+    if (curl_easy_setopt (bcsink->curl, CURLOPT_SSLENGINE_DEFAULT, 1L)
+        != CURLE_OK) {
+      GST_WARNING ("Error setting up default SSL engine.");
+    }
+  } else {
+    if (curl_easy_setopt (bcsink->curl, CURLOPT_SSLENGINE,
+            sink->crypto_engine) == CURLE_SSL_ENGINE_NOTFOUND) {
+      GST_WARNING ("Error setting up SSL engine: %s.", sink->crypto_engine);
+    }
+  }
+
+  /* note that, using ca-path can allow libcurl to make SSL-connections much more
+   * efficiently than using ca-cert if the ca-cert ile
+   * contains many CA certificates. */
+  if (sink->ca_cert != NULL && strlen (sink->ca_cert)) {
+    GST_DEBUG ("setting ca cert");
+    curl_easy_setopt (bcsink->curl, CURLOPT_CAINFO, sink->ca_cert);
+  }
+
+  if (sink->ca_path != NULL && strlen (sink->ca_path)) {
+    GST_DEBUG ("setting ca path");
+    curl_easy_setopt (bcsink->curl, CURLOPT_CAPATH, sink->ca_path);
+  }
+
+  if (!sink->insecure) {
+    /* identify authenticity of the peer's certificate */
+    curl_easy_setopt (bcsink->curl, CURLOPT_SSL_VERIFYPEER, 1L);
+    /* when CURLOPT_SSL_VERIFYHOST is 2, the commonName or subjectAltName
+     * fields are verified */
+    curl_easy_setopt (bcsink->curl, CURLOPT_SSL_VERIFYHOST, 2L);
+
+    return TRUE;
+  }
+
+  /* allow "insecure" SSL connections and transfers */
+  if (sink->insecure) {
+    curl_easy_setopt (bcsink->curl, CURLOPT_SSL_VERIFYPEER, 0L);
+    curl_easy_setopt (bcsink->curl, CURLOPT_SSL_VERIFYHOST, 0L);
+  }
+
+  return TRUE;
+}
diff --git a/ext/curl/gstcurltlssink.h b/ext/curl/gstcurltlssink.h
new file mode 100644 (file)
index 0000000..ca8f73a
--- /dev/null
@@ -0,0 +1,66 @@
+/* GStreamer
+ * Copyright (C) 2011 Axis Communications <dev-gstreamer@axis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __GST_CURL_TLS_SINK__
+#define __GST_CURL_TLS_SINK__
+
+#include <gst/gst.h>
+#include <gst/base/gstbasesink.h>
+#include <curl/curl.h>
+#include "gstcurlbasesink.h"
+
+G_BEGIN_DECLS
+#define GST_TYPE_CURL_TLS_SINK \
+  (gst_curl_tls_sink_get_type())
+#define GST_CURL_TLS_SINK(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_CURL_TLS_SINK, GstCurlTlsSink))
+#define GST_CURL_TLS_SINK_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_CURL_TLS_SINK, GstCurlTlsSinkClass))
+#define GST_CURL_TLS_SINK_GET_CLASS(obj) \
+  (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_CURL_TLS_SINK, GstCurlTlsSinkClass))
+#define GST_IS_CURL_TLS_SINK(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_CURL_TLS_SINK))
+#define GST_IS_CURL_TLS_SINK_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_CURL_TLS_SINK))
+typedef struct _GstCurlTlsSink GstCurlTlsSink;
+typedef struct _GstCurlTlsSinkClass GstCurlTlsSinkClass;
+
+struct _GstCurlTlsSink
+{
+  GstCurlBaseSink parent;
+
+  /*< private > */
+  gchar *ca_cert;
+  gchar *ca_path;
+  gchar *crypto_engine;
+  gboolean insecure;
+};
+
+struct _GstCurlTlsSinkClass
+{
+  GstCurlBaseSinkClass parent_class;
+
+  /* vmethods */
+    gboolean (*set_options_unlocked) (GstCurlBaseSink * sink);
+};
+
+GType gst_curl_tls_sink_get_type (void);
+
+G_END_DECLS
+#endif
index 0ef573d..b7fd73a 100644 (file)
@@ -147,6 +147,12 @@ else
 check_opus =
 endif
 
+if USE_CURL
+check_curl = elements/curlhttpsink
+else
+check_curl =
+endif
+
 VALGRIND_TO_FIX = \
        elements/mpeg2enc \
        elements/mplex    \
@@ -177,6 +183,7 @@ check_PROGRAMS = \
        $(check_timidity)  \
        $(check_kate)  \
        $(check_opus)  \
+       $(check_curl) \
        elements/autoconvert \
        elements/autovideoconvert \
        elements/asfmux \
diff --git a/tests/check/elements/curlhttpsink.c b/tests/check/elements/curlhttpsink.c
new file mode 100644 (file)
index 0000000..c2b0ef0
--- /dev/null
@@ -0,0 +1,136 @@
+/*
+ * Unittest for curlhttpsink
+ */
+
+#include <gst/check/gstcheck.h>
+#include <glib/gstdio.h>
+#include <curl/curl.h>
+
+static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
+    GST_PAD_SRC,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS_ANY);
+
+static GstPad *srcpad;
+
+static GstElement *sink;
+
+static GstElement *
+setup_curlhttpsink (void)
+{
+  GST_DEBUG ("setup_curlhttpsink");
+  sink = gst_check_setup_element ("curlhttpsink");
+  srcpad = gst_check_setup_src_pad (sink, &srctemplate);
+  gst_pad_set_active (srcpad, TRUE);
+
+  return sink;
+}
+
+static void
+cleanup_curlhttpsink (GstElement * sink)
+{
+  GST_DEBUG ("cleanup_curlhttpsink");
+
+  gst_check_teardown_src_pad (sink);
+  gst_check_teardown_element (sink);
+}
+
+
+GST_START_TEST (test_properties)
+{
+  GstElement *sink;
+  gchar *res_location = NULL;
+  gchar *res_file_name = NULL;
+  gchar *res_user;
+  gchar *res_passwd;
+  gchar *res_proxy;
+  guint res_proxy_port;
+  gchar *res_proxy_user;
+  gchar *res_proxy_passwd;
+  gchar *res_content_type;
+  gboolean res_use_content_length;
+
+  sink = setup_curlhttpsink ();
+
+  g_object_set (G_OBJECT (sink),
+      "location", "mylocation",
+      "file-name","myfile",
+      "user", "user",
+      "passwd", "passwd",
+      "proxy", "myproxy",
+      "proxy-port", 7777,
+      "proxy-user", "proxy_user",
+      "proxy-passwd", "proxy_passwd",
+      "content-type", "image/jpeg",
+      "use-content-length", TRUE,
+      NULL);
+
+  g_object_get (sink,
+      "location", &res_location,
+      "file-name", &res_file_name,
+      "user", &res_user,
+      "passwd", &res_passwd,
+      "proxy", &res_proxy,
+      "proxy-port", &res_proxy_port,
+      "proxy-user", &res_proxy_user,
+      "proxy-passwd", &res_proxy_passwd,
+      "content-type", &res_content_type,
+      "use-content-length", &res_use_content_length,
+      NULL);
+
+  fail_unless (strncmp (res_location, "mylocation", strlen ("mylocation"))
+      == 0);
+  fail_unless (strncmp (res_file_name, "myfile", strlen ("myfile"))
+      == 0);
+  fail_unless (strncmp (res_user, "user", strlen ("user")) == 0);
+  fail_unless (strncmp (res_passwd, "passwd", strlen ("passwd")) == 0);
+  fail_unless (strncmp (res_proxy, "myproxy", strlen ("myproxy")) == 0);
+  fail_unless (res_proxy_port == 7777);
+  fail_unless (strncmp (res_proxy_user, "proxy_user", strlen ("proxy_user"))
+      == 0);
+  fail_unless (strncmp (res_proxy_passwd, "proxy_passwd",
+        strlen ("proxy_passwd")) == 0);
+  fail_unless (strncmp (res_content_type, "image/jpeg", strlen ("image/jpeg"))
+      == 0);
+  fail_unless (res_use_content_length == TRUE);
+
+  g_free (res_location);
+  g_free (res_file_name);
+  g_free (res_user);
+  g_free (res_passwd);
+  g_free (res_proxy);
+  g_free (res_proxy_user);
+  g_free (res_proxy_passwd);
+  g_free (res_content_type);
+
+  /* new properties */
+  g_object_set (G_OBJECT (sink), "location", "newlocation", NULL);
+  g_object_get (sink, "location", &res_location, NULL);
+  fail_unless (strncmp (res_location, "newlocation", strlen ("newlocation"))
+      == 0);
+  g_free (res_location);
+
+  g_object_set (G_OBJECT (sink), "file-name", "newfile", NULL);
+  g_object_get (sink, "file-name", &res_file_name, NULL);
+  fail_unless (strncmp (res_file_name, "newfile", strlen ("newfile"))
+      == 0);
+  g_free (res_file_name);
+
+  cleanup_curlhttpsink (sink);
+}
+GST_END_TEST;
+
+static Suite *
+curlsink_suite (void)
+{
+  Suite *s = suite_create ("curlhttpsink");
+  TCase *tc_chain = tcase_create ("general");
+
+  suite_add_tcase (s, tc_chain);
+  tcase_set_timeout (tc_chain, 20);
+  tcase_add_test (tc_chain, test_properties);
+
+  return s;
+}
+
+GST_CHECK_MAIN (curlsink);