curlsmtpsink: terminate transfer thread properly
authorPatricia Muscalu <patricia@axis.com>
Thu, 23 May 2013 12:32:07 +0000 (14:32 +0200)
committerTim-Philipp Müller <tim@centricular.net>
Fri, 24 May 2013 22:47:50 +0000 (23:47 +0100)
If no EOS has been sent, the curl readfunc callback will
return ABORT. The media file in that case will not be properly
finalized.

https://bugzilla.gnome.org/show_bug.cgi?id=700886

ext/curl/gstcurlbasesink.c
ext/curl/gstcurlbasesink.h
ext/curl/gstcurlsmtpsink.c
ext/curl/gstcurlsmtpsink.h

index deadad2..905a7b5 100644 (file)
@@ -147,6 +147,26 @@ static size_t transfer_data_buffer (void *curl_ptr, TransferBuffer * buf,
 #define parent_class gst_curl_base_sink_parent_class
 G_DEFINE_TYPE (GstCurlBaseSink, gst_curl_base_sink, GST_TYPE_BASE_SINK);
 
+static gboolean
+gst_curl_base_sink_default_has_buffered_data_unlocked (GstCurlBaseSink * sink)
+{
+  return sink->transfer_buf->len > 0;
+}
+
+static gboolean
+gst_curl_base_sink_has_buffered_data_unlocked (GstCurlBaseSink * sink)
+{
+  GstCurlBaseSinkClass *klass;
+  gboolean res = FALSE;
+
+  klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
+
+  if (klass->has_buffered_data_unlocked)
+    res = klass->has_buffered_data_unlocked (sink);
+
+  return res;
+}
+
 static void
 gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass)
 {
@@ -179,6 +199,8 @@ gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass)
   klass->handle_transfer = handle_transfer;
   klass->transfer_read_cb = gst_curl_base_sink_transfer_read_cb;
   klass->transfer_data_buffer = gst_curl_base_sink_transfer_data_buffer;
+  klass->has_buffered_data_unlocked =
+      gst_curl_base_sink_default_has_buffered_data_unlocked;
 
   /* FIXME: check against souphttpsrc and use same names for same properties */
   g_object_class_install_property (gobject_class, PROP_LOCATION,
@@ -685,9 +707,19 @@ gst_curl_base_sink_transfer_read_cb (void *curl_ptr, size_t size, size_t nmemb,
    * then zero will be returned to indicate end of current transfer */
   GST_OBJECT_LOCK (sink);
   if (gst_curl_base_sink_wait_for_data_unlocked (sink) == FALSE) {
+
+    if (gst_curl_base_sink_has_buffered_data_unlocked (sink) &&
+        sink->transfer_thread_close) {
+      GST_WARNING_OBJECT (sink,
+          "discarding render data due to thread close flag");
+
+      GST_OBJECT_UNLOCK (sink);
+      return CURL_READFUNC_ABORT;
+    }
+
     if (klass->flush_data_unlocked) {
       bytes_to_send = klass->flush_data_unlocked (sink, curl_ptr,
-          max_bytes_to_send, sink->new_file);
+          max_bytes_to_send, sink->new_file, sink->transfer_thread_close);
 
       GST_OBJECT_UNLOCK (sink);
 
@@ -800,6 +832,13 @@ handle_transfer (GstCurlBaseSink * sink)
       } else if (errno == EBUSY) {
         GST_DEBUG_OBJECT (sink, "poll stopped");
         retval = GST_FLOW_EOS;
+
+        GST_OBJECT_LOCK (sink);
+        if (gst_curl_base_sink_has_buffered_data_unlocked (sink))
+          GST_WARNING_OBJECT (sink,
+              "discarding render data due to thread close flag");
+        GST_OBJECT_UNLOCK (sink);
+
         goto fail;
       } else {
         GST_DEBUG_OBJECT (sink, "poll failed: %s", g_strerror (errno));
index 4096c5c..3def1ba 100644 (file)
@@ -101,7 +101,8 @@ struct _GstCurlBaseSinkClass
     size_t (*transfer_data_buffer) (GstCurlBaseSink * sink, void *curl_ptr,
       size_t block_size, guint * last_chunk);
     size_t (*flush_data_unlocked) (GstCurlBaseSink * sink, void *curl_ptr,
-      size_t block_size, gboolean new_file);
+      size_t block_size, gboolean new_file, gboolean close_transfer);
+  gboolean (*has_buffered_data_unlocked) (GstCurlBaseSink * sink);
 };
 
 GType gst_curl_base_sink_get_type (void);
index f796c14..2b83faa 100644 (file)
@@ -122,7 +122,7 @@ static gboolean gst_curl_smtp_sink_prepare_transfer (GstCurlBaseSink * bcsink);
 static size_t gst_curl_smtp_sink_transfer_data_buffer (GstCurlBaseSink * sink,
     void *curl_ptr, size_t block_size, guint * last_chunk);
 static size_t gst_curl_smtp_sink_flush_data_unlocked (GstCurlBaseSink * bcsink,
-    void *curl_ptr, size_t block_size, gboolean new_file);
+    void *curl_ptr, size_t block_size, gboolean new_file, gboolean close_transfer);
 
 /* private functions */
 
@@ -151,15 +151,48 @@ gst_curl_smtp_sink_wait_for_transfer_end_unlocked (GstCurlSmtpSink * sink)
   GST_LOG ("final data sent");
 }
 
+static void
+add_final_boundary_unlocked (GstCurlSmtpSink * sink)
+{
+  GByteArray *array;
+  gchar *boundary_end;
+  gsize len;
+  gint save, state;
+  gchar *data_out;
+
+  GST_DEBUG ("adding final boundary");
+
+  array = sink->base64_chunk->chunk_array;
+  g_assert (array);
+
+  /* it will need up to 5 bytes if line-breaking is enabled
+   * additional byte is needed for <CR> as it is not automatically added by glib */
+  data_out = g_malloc (6);
+  save = sink->base64_chunk->save;
+  state = sink->base64_chunk->state;
+  len = g_base64_encode_close (TRUE, data_out, &state, &save);
+
+  /* workaround */
+  data_out[len - 1] = '\r';
+  data_out[len] = '\n';
+
+  /* +1 for CR */
+  g_byte_array_append (array, (guint8 *) data_out, (guint) (len + 1));
+  g_free (data_out);
+
+  boundary_end = g_strdup_printf ("\r\n%s\r\n", BOUNDARY_STRING_END);
+  g_byte_array_append (array, (guint8 *) boundary_end, strlen (boundary_end));
+  g_free (boundary_end);
+
+  sink->final_boundary_added = TRUE;
+}
+
 static gboolean
 gst_curl_smtp_sink_event (GstBaseSink * bsink, GstEvent * event)
 {
   GstCurlBaseSink *bcsink = GST_CURL_BASE_SINK (bsink);
   GstCurlSmtpSink *sink = GST_CURL_SMTP_SINK (bsink);
 
-  GByteArray *array;
-  gchar *boundary_end;
-
   switch (event->type) {
     case GST_EVENT_EOS:
       GST_DEBUG_OBJECT (sink, "received EOS");
@@ -169,34 +202,8 @@ gst_curl_smtp_sink_event (GstBaseSink * bsink, GstEvent * event)
       sink->eos = TRUE;
       GST_OBJECT_UNLOCK (sink);
 
-      if (sink->base64_chunk != NULL) {
-        gsize len;
-        gint save, state;
-        gchar *data_out;
-
-        array = sink->base64_chunk->chunk_array;
-        g_assert (array);
-
-        GST_DEBUG ("adding final boundary");
-
-        /* it will need up to 5 bytes if line-breaking is enabled
-         * additional byte is needed for <CR> as it is not automatically added by glib */
-        data_out = g_malloc (6);
-        save = sink->base64_chunk->save;
-        state = sink->base64_chunk->state;
-        len = g_base64_encode_close (TRUE, data_out, &state, &save);
-        /* workaround */
-        data_out[len - 1] = '\r';
-        data_out[len] = '\n';
-        /* +1 for CR */
-        g_byte_array_append (array, (guint8 *) data_out, (guint) (len + 1));
-        g_free (data_out);
-
-        boundary_end = g_strdup_printf ("\r\n%s\r\n", BOUNDARY_STRING_END);
-        g_byte_array_append (array, (guint8 *) boundary_end,
-            strlen (boundary_end));
-        g_free (boundary_end);
-      }
+      if (sink->base64_chunk != NULL)
+        add_final_boundary_unlocked (sink);
 
       gst_curl_base_sink_transfer_thread_notify_unlocked (bcsink);
 
@@ -217,6 +224,25 @@ gst_curl_smtp_sink_event (GstBaseSink * bsink, GstEvent * event)
   return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event);
 }
 
+static gboolean
+gst_curl_smtp_sink_has_buffered_data_unlocked (GstCurlBaseSink * bcsink)
+{
+  GstCurlSmtpSink *sink = GST_CURL_SMTP_SINK (bcsink);
+  Base64Chunk *chunk;
+  GByteArray *array = NULL;
+  gboolean ret = FALSE;
+
+  chunk = sink->base64_chunk;
+
+  if (chunk) {
+    array = chunk->chunk_array;
+    if (array)
+      ret = (array->len == 0 && sink->final_boundary_added) ? FALSE : TRUE;
+  }
+
+  return ret;
+}
+
 static void
 gst_curl_smtp_sink_class_init (GstCurlSmtpSinkClass * klass)
 {
@@ -245,6 +271,8 @@ gst_curl_smtp_sink_class_init (GstCurlSmtpSinkClass * klass)
       gst_curl_smtp_sink_transfer_data_buffer;
   gstcurlbasesink_class->flush_data_unlocked =
       gst_curl_smtp_sink_flush_data_unlocked;
+  gstcurlbasesink_class->has_buffered_data_unlocked =
+      gst_curl_smtp_sink_has_buffered_data_unlocked;
 
   gstbasesink_class->event = gst_curl_smtp_sink_event;
   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_smtp_sink_finalize);
@@ -306,6 +334,7 @@ gst_curl_smtp_sink_init (GstCurlSmtpSink * sink)
   g_cond_init (&sink->cond_transfer_end);
   sink->transfer_end = FALSE;
   sink->eos = FALSE;
+  sink->final_boundary_added = FALSE;
 
   sink->reset_transfer_options = FALSE;
   sink->use_ssl = DEFAULT_USE_SSL;
@@ -710,7 +739,8 @@ gst_curl_smtp_sink_set_mime_type (GstCurlBaseSink * bcsink, GstCaps * caps)
 
 static size_t
 gst_curl_smtp_sink_flush_data_unlocked (GstCurlBaseSink * bcsink,
-    void *curl_ptr, size_t block_size, gboolean new_file)
+    void *curl_ptr, size_t block_size, gboolean new_file,
+    gboolean close_transfer)
 {
   GstCurlSmtpSink *sink = GST_CURL_SMTP_SINK (bcsink);
   Base64Chunk *chunk = sink->base64_chunk;
@@ -721,10 +751,17 @@ gst_curl_smtp_sink_flush_data_unlocked (GstCurlBaseSink * bcsink,
   gint len;
   gchar *data_out;
 
+  GST_DEBUG ("live: %d, num attachments: %d, num attachments_left: %d, eos: %d, "
+      "close_transfer: %d, final boundary: %d, array_len: %d", bcsink->is_live,
+      sink->nbr_attachments, sink->nbr_attachments_left, sink->eos, close_transfer,
+      sink->final_boundary_added, array->len);
+
+
   if ((bcsink->is_live && (sink->nbr_attachments_left == sink->nbr_attachments))
-      || (sink->nbr_attachments == 1) || sink->eos) {
+      || (sink->nbr_attachments == 1) || sink->eos || sink->final_boundary_added) {
     bcsink->is_live = FALSE;
     sink->reset_transfer_options = TRUE;
+    sink->final_boundary_added = FALSE;
 
     GST_DEBUG ("returning 0, no more data to send in this transfer");
 
@@ -759,6 +796,10 @@ gst_curl_smtp_sink_flush_data_unlocked (GstCurlBaseSink * bcsink,
     gst_curl_smtp_sink_set_payload_headers_unlocked (bcsink);
   }
 
+
+  if (close_transfer && !sink->final_boundary_added)
+    add_final_boundary_unlocked (sink);
+
   bytes_to_send = MIN (block_size, array->len);
   memcpy ((guint8 *) curl_ptr, array->data, bytes_to_send);
   g_byte_array_remove_range (array, 0, bytes_to_send);
index de57de7..94c3349 100644 (file)
@@ -73,6 +73,7 @@ struct _GstCurlSmtpSink
 
   gint nbr_attachments_left;
   gboolean reset_transfer_options;
+  gboolean final_boundary_added;
   gboolean eos;
 };