#define parent_class gst_curl_base_sink_parent_class
G_DEFINE_TYPE (GstCurlBaseSink, gst_curl_base_sink, GST_TYPE_BASE_SINK);
+static gboolean
+gst_curl_base_sink_default_has_buffered_data_unlocked (GstCurlBaseSink * sink)
+{
+ return sink->transfer_buf->len > 0;
+}
+
+static gboolean
+gst_curl_base_sink_has_buffered_data_unlocked (GstCurlBaseSink * sink)
+{
+ GstCurlBaseSinkClass *klass;
+ gboolean res = FALSE;
+
+ klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
+
+ if (klass->has_buffered_data_unlocked)
+ res = klass->has_buffered_data_unlocked (sink);
+
+ return res;
+}
+
static void
gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass)
{
klass->handle_transfer = handle_transfer;
klass->transfer_read_cb = gst_curl_base_sink_transfer_read_cb;
klass->transfer_data_buffer = gst_curl_base_sink_transfer_data_buffer;
+ klass->has_buffered_data_unlocked =
+ gst_curl_base_sink_default_has_buffered_data_unlocked;
/* FIXME: check against souphttpsrc and use same names for same properties */
g_object_class_install_property (gobject_class, PROP_LOCATION,
* then zero will be returned to indicate end of current transfer */
GST_OBJECT_LOCK (sink);
if (gst_curl_base_sink_wait_for_data_unlocked (sink) == FALSE) {
+
+ if (gst_curl_base_sink_has_buffered_data_unlocked (sink) &&
+ sink->transfer_thread_close) {
+ GST_WARNING_OBJECT (sink,
+ "discarding render data due to thread close flag");
+
+ GST_OBJECT_UNLOCK (sink);
+ return CURL_READFUNC_ABORT;
+ }
+
if (klass->flush_data_unlocked) {
bytes_to_send = klass->flush_data_unlocked (sink, curl_ptr,
- max_bytes_to_send, sink->new_file);
+ max_bytes_to_send, sink->new_file, sink->transfer_thread_close);
GST_OBJECT_UNLOCK (sink);
} else if (errno == EBUSY) {
GST_DEBUG_OBJECT (sink, "poll stopped");
retval = GST_FLOW_EOS;
+
+ GST_OBJECT_LOCK (sink);
+ if (gst_curl_base_sink_has_buffered_data_unlocked (sink))
+ GST_WARNING_OBJECT (sink,
+ "discarding render data due to thread close flag");
+ GST_OBJECT_UNLOCK (sink);
+
goto fail;
} else {
GST_DEBUG_OBJECT (sink, "poll failed: %s", g_strerror (errno));
static size_t gst_curl_smtp_sink_transfer_data_buffer (GstCurlBaseSink * sink,
void *curl_ptr, size_t block_size, guint * last_chunk);
static size_t gst_curl_smtp_sink_flush_data_unlocked (GstCurlBaseSink * bcsink,
- void *curl_ptr, size_t block_size, gboolean new_file);
+ void *curl_ptr, size_t block_size, gboolean new_file, gboolean close_transfer);
/* private functions */
GST_LOG ("final data sent");
}
+static void
+add_final_boundary_unlocked (GstCurlSmtpSink * sink)
+{
+ GByteArray *array;
+ gchar *boundary_end;
+ gsize len;
+ gint save, state;
+ gchar *data_out;
+
+ GST_DEBUG ("adding final boundary");
+
+ array = sink->base64_chunk->chunk_array;
+ g_assert (array);
+
+ /* it will need up to 5 bytes if line-breaking is enabled
+ * additional byte is needed for <CR> as it is not automatically added by glib */
+ data_out = g_malloc (6);
+ save = sink->base64_chunk->save;
+ state = sink->base64_chunk->state;
+ len = g_base64_encode_close (TRUE, data_out, &state, &save);
+
+ /* workaround */
+ data_out[len - 1] = '\r';
+ data_out[len] = '\n';
+
+ /* +1 for CR */
+ g_byte_array_append (array, (guint8 *) data_out, (guint) (len + 1));
+ g_free (data_out);
+
+ boundary_end = g_strdup_printf ("\r\n%s\r\n", BOUNDARY_STRING_END);
+ g_byte_array_append (array, (guint8 *) boundary_end, strlen (boundary_end));
+ g_free (boundary_end);
+
+ sink->final_boundary_added = TRUE;
+}
+
static gboolean
gst_curl_smtp_sink_event (GstBaseSink * bsink, GstEvent * event)
{
GstCurlBaseSink *bcsink = GST_CURL_BASE_SINK (bsink);
GstCurlSmtpSink *sink = GST_CURL_SMTP_SINK (bsink);
- GByteArray *array;
- gchar *boundary_end;
-
switch (event->type) {
case GST_EVENT_EOS:
GST_DEBUG_OBJECT (sink, "received EOS");
sink->eos = TRUE;
GST_OBJECT_UNLOCK (sink);
- if (sink->base64_chunk != NULL) {
- gsize len;
- gint save, state;
- gchar *data_out;
-
- array = sink->base64_chunk->chunk_array;
- g_assert (array);
-
- GST_DEBUG ("adding final boundary");
-
- /* it will need up to 5 bytes if line-breaking is enabled
- * additional byte is needed for <CR> as it is not automatically added by glib */
- data_out = g_malloc (6);
- save = sink->base64_chunk->save;
- state = sink->base64_chunk->state;
- len = g_base64_encode_close (TRUE, data_out, &state, &save);
- /* workaround */
- data_out[len - 1] = '\r';
- data_out[len] = '\n';
- /* +1 for CR */
- g_byte_array_append (array, (guint8 *) data_out, (guint) (len + 1));
- g_free (data_out);
-
- boundary_end = g_strdup_printf ("\r\n%s\r\n", BOUNDARY_STRING_END);
- g_byte_array_append (array, (guint8 *) boundary_end,
- strlen (boundary_end));
- g_free (boundary_end);
- }
+ if (sink->base64_chunk != NULL)
+ add_final_boundary_unlocked (sink);
gst_curl_base_sink_transfer_thread_notify_unlocked (bcsink);
return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event);
}
+static gboolean
+gst_curl_smtp_sink_has_buffered_data_unlocked (GstCurlBaseSink * bcsink)
+{
+ GstCurlSmtpSink *sink = GST_CURL_SMTP_SINK (bcsink);
+ Base64Chunk *chunk;
+ GByteArray *array = NULL;
+ gboolean ret = FALSE;
+
+ chunk = sink->base64_chunk;
+
+ if (chunk) {
+ array = chunk->chunk_array;
+ if (array)
+ ret = (array->len == 0 && sink->final_boundary_added) ? FALSE : TRUE;
+ }
+
+ return ret;
+}
+
static void
gst_curl_smtp_sink_class_init (GstCurlSmtpSinkClass * klass)
{
gst_curl_smtp_sink_transfer_data_buffer;
gstcurlbasesink_class->flush_data_unlocked =
gst_curl_smtp_sink_flush_data_unlocked;
+ gstcurlbasesink_class->has_buffered_data_unlocked =
+ gst_curl_smtp_sink_has_buffered_data_unlocked;
gstbasesink_class->event = gst_curl_smtp_sink_event;
gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_smtp_sink_finalize);
g_cond_init (&sink->cond_transfer_end);
sink->transfer_end = FALSE;
sink->eos = FALSE;
+ sink->final_boundary_added = FALSE;
sink->reset_transfer_options = FALSE;
sink->use_ssl = DEFAULT_USE_SSL;
static size_t
gst_curl_smtp_sink_flush_data_unlocked (GstCurlBaseSink * bcsink,
- void *curl_ptr, size_t block_size, gboolean new_file)
+ void *curl_ptr, size_t block_size, gboolean new_file,
+ gboolean close_transfer)
{
GstCurlSmtpSink *sink = GST_CURL_SMTP_SINK (bcsink);
Base64Chunk *chunk = sink->base64_chunk;
gint len;
gchar *data_out;
+ GST_DEBUG ("live: %d, num attachments: %d, num attachments_left: %d, eos: %d, "
+ "close_transfer: %d, final boundary: %d, array_len: %d", bcsink->is_live,
+ sink->nbr_attachments, sink->nbr_attachments_left, sink->eos, close_transfer,
+ sink->final_boundary_added, array->len);
+
+
if ((bcsink->is_live && (sink->nbr_attachments_left == sink->nbr_attachments))
- || (sink->nbr_attachments == 1) || sink->eos) {
+ || (sink->nbr_attachments == 1) || sink->eos || sink->final_boundary_added) {
bcsink->is_live = FALSE;
sink->reset_transfer_options = TRUE;
+ sink->final_boundary_added = FALSE;
GST_DEBUG ("returning 0, no more data to send in this transfer");
gst_curl_smtp_sink_set_payload_headers_unlocked (bcsink);
}
+
+ if (close_transfer && !sink->final_boundary_added)
+ add_final_boundary_unlocked (sink);
+
bytes_to_send = MIN (block_size, array->len);
memcpy ((guint8 *) curl_ptr, array->data, bytes_to_send);
g_byte_array_remove_range (array, 0, bytes_to_send);