From a132138f1c094097e69bbe0dc25fd0d8dd0549c2 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Sebastian=20Dr=C3=B6ge?= Date: Sun, 12 Jan 2020 12:48:55 +0200 Subject: [PATCH] dtls: Propagate write errors backwards through dtlsenc/dtlsconnection --- ext/dtls/gstdtlsconnection.c | 5 +++-- ext/dtls/gstdtlsconnection.h | 2 +- ext/dtls/gstdtlsenc.c | 29 ++++++++++++++++++++++++++--- ext/dtls/gstdtlsenc.h | 1 + 4 files changed, 31 insertions(+), 6 deletions(-) diff --git a/ext/dtls/gstdtlsconnection.c b/ext/dtls/gstdtlsconnection.c index f25dfc3..86a0e4e 100644 --- a/ext/dtls/gstdtlsconnection.c +++ b/ext/dtls/gstdtlsconnection.c @@ -899,14 +899,15 @@ static int bio_method_write (BIO * bio, const char *data, int size) { GstDtlsConnection *self = GST_DTLS_CONNECTION (BIO_get_data (bio)); + gboolean ret = TRUE; GST_LOG_OBJECT (self, "BIO: writing %d", size); if (self->priv->send_callback) - self->priv->send_callback (self, data, size, + ret = self->priv->send_callback (self, data, size, self->priv->send_callback_user_data); - return size; + return ret ? size : -1; } static int diff --git a/ext/dtls/gstdtlsconnection.h b/ext/dtls/gstdtlsconnection.h index 329e13e..bc2c569 100644 --- a/ext/dtls/gstdtlsconnection.h +++ b/ext/dtls/gstdtlsconnection.h @@ -99,7 +99,7 @@ void gst_dtls_connection_stop(GstDtlsConnection *); void gst_dtls_connection_close(GstDtlsConnection *); -typedef void (*GstDtlsConnectionSendCallback) (GstDtlsConnection * connection, gconstpointer data, gsize length, gpointer user_data); +typedef gboolean (*GstDtlsConnectionSendCallback) (GstDtlsConnection * connection, gconstpointer data, gsize length, gpointer user_data); /* * Sets the callback that will be called whenever data needs to be sent. diff --git a/ext/dtls/gstdtlsenc.c b/ext/dtls/gstdtlsenc.c index d6ba1eb..1f94c4b 100644 --- a/ext/dtls/gstdtlsenc.c +++ b/ext/dtls/gstdtlsenc.c @@ -100,8 +100,8 @@ static gboolean sink_event (GstPad * pad, GstObject * parent, GstEvent * event); static void on_key_received (GstDtlsConnection *, gpointer key, guint cipher, guint auth, GstDtlsEnc *); -static void on_send_data (GstDtlsConnection *, gconstpointer data, gint length, - GstDtlsEnc *); +static gboolean on_send_data (GstDtlsConnection *, gconstpointer data, + gint length, GstDtlsEnc *); static void gst_dtls_enc_class_init (GstDtlsEncClass * klass) @@ -378,6 +378,7 @@ src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode, GST_DEBUG_OBJECT (self, "src pad activating in push mode"); self->flushing = FALSE; + self->src_ret = GST_FLOW_OK; self->send_initial_events = TRUE; success = gst_pad_start_task (pad, (GstTaskFunction) src_task_loop, self->src, @@ -392,6 +393,7 @@ src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode, g_queue_foreach (&self->queue, (GFunc) gst_buffer_unref, NULL); g_queue_clear (&self->queue); self->flushing = TRUE; + self->src_ret = GST_FLOW_FLUSHING; g_cond_signal (&self->queue_cond_add); g_mutex_unlock (&self->queue_lock); success = gst_pad_stop_task (pad); @@ -466,6 +468,9 @@ src_task_loop (GstPad * pad) GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s", gst_flow_get_name (ret)); } + g_mutex_lock (&self->queue_lock); + self->src_ret = ret; + g_mutex_unlock (&self->queue_lock); } static GstFlowReturn @@ -475,6 +480,19 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GstMapInfo map_info; gint ret; + g_mutex_lock (&self->queue_lock); + if (self->src_ret != GST_FLOW_OK) { + if (G_UNLIKELY (self->src_ret == GST_FLOW_NOT_LINKED + || self->src_ret < GST_FLOW_EOS)) + GST_ERROR_OBJECT (self, "Pushing previous data returned an error: %s", + gst_flow_get_name (self->src_ret)); + + gst_buffer_unref (buffer); + g_mutex_unlock (&self->queue_lock); + return self->src_ret; + } + g_mutex_unlock (&self->queue_lock); + gst_buffer_map (buffer, &map_info, GST_MAP_READ); if (map_info.size) { @@ -548,11 +566,12 @@ on_key_received (GstDtlsConnection * connection, gpointer key, guint cipher, g_signal_emit (self, signals[SIGNAL_ON_KEY_RECEIVED], 0); } -static void +static gboolean on_send_data (GstDtlsConnection * connection, gconstpointer data, gint length, GstDtlsEnc * self) { GstBuffer *buffer; + gboolean ret; GST_DEBUG_OBJECT (self, "sending data from %s with length %d", self->connection_id, length); @@ -569,5 +588,9 @@ on_send_data (GstDtlsConnection * connection, gconstpointer data, gint length, g_cond_signal (&self->queue_cond_add); GST_TRACE_OBJECT (self, "send data: releasing lock"); + + ret = self->src_ret == GST_FLOW_OK; g_mutex_unlock (&self->queue_lock); + + return ret; } diff --git a/ext/dtls/gstdtlsenc.h b/ext/dtls/gstdtlsenc.h index 622936f..070a75e 100644 --- a/ext/dtls/gstdtlsenc.h +++ b/ext/dtls/gstdtlsenc.h @@ -46,6 +46,7 @@ struct _GstDtlsEnc { GstElement element; GstPad *src; + GstFlowReturn src_ret; GQueue queue; GMutex queue_lock; -- 2.7.4