dtls: Propagate write errors backwards through dtlsenc/dtlsconnection
authorSebastian Dröge <sebastian@centricular.com>
Sun, 12 Jan 2020 10:48:55 +0000 (12:48 +0200)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Sun, 19 Jan 2020 11:16:34 +0000 (11:16 +0000)
ext/dtls/gstdtlsconnection.c
ext/dtls/gstdtlsconnection.h
ext/dtls/gstdtlsenc.c
ext/dtls/gstdtlsenc.h

index f25dfc3..86a0e4e 100644 (file)
@@ -899,14 +899,15 @@ static int
 bio_method_write (BIO * bio, const char *data, int size)
 {
   GstDtlsConnection *self = GST_DTLS_CONNECTION (BIO_get_data (bio));
+  gboolean ret = TRUE;
 
   GST_LOG_OBJECT (self, "BIO: writing %d", size);
 
   if (self->priv->send_callback)
-    self->priv->send_callback (self, data, size,
+    ret = self->priv->send_callback (self, data, size,
         self->priv->send_callback_user_data);
 
-  return size;
+  return ret ? size : -1;
 }
 
 static int
index 329e13e..bc2c569 100644 (file)
@@ -99,7 +99,7 @@ void gst_dtls_connection_stop(GstDtlsConnection *);
 void gst_dtls_connection_close(GstDtlsConnection *);
 
 
-typedef void (*GstDtlsConnectionSendCallback) (GstDtlsConnection * connection, gconstpointer data, gsize length, gpointer user_data);
+typedef gboolean (*GstDtlsConnectionSendCallback) (GstDtlsConnection * connection, gconstpointer data, gsize length, gpointer user_data);
 
 /*
  * Sets the callback that will be called whenever data needs to be sent.
index d6ba1eb..1f94c4b 100644 (file)
@@ -100,8 +100,8 @@ static gboolean sink_event (GstPad * pad, GstObject * parent, GstEvent * event);
 
 static void on_key_received (GstDtlsConnection *, gpointer key, guint cipher,
     guint auth, GstDtlsEnc *);
-static void on_send_data (GstDtlsConnection *, gconstpointer data, gint length,
-    GstDtlsEnc *);
+static gboolean on_send_data (GstDtlsConnection *, gconstpointer data,
+    gint length, GstDtlsEnc *);
 
 static void
 gst_dtls_enc_class_init (GstDtlsEncClass * klass)
@@ -378,6 +378,7 @@ src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
     GST_DEBUG_OBJECT (self, "src pad activating in push mode");
 
     self->flushing = FALSE;
+    self->src_ret = GST_FLOW_OK;
     self->send_initial_events = TRUE;
     success =
         gst_pad_start_task (pad, (GstTaskFunction) src_task_loop, self->src,
@@ -392,6 +393,7 @@ src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
     g_queue_foreach (&self->queue, (GFunc) gst_buffer_unref, NULL);
     g_queue_clear (&self->queue);
     self->flushing = TRUE;
+    self->src_ret = GST_FLOW_FLUSHING;
     g_cond_signal (&self->queue_cond_add);
     g_mutex_unlock (&self->queue_lock);
     success = gst_pad_stop_task (pad);
@@ -466,6 +468,9 @@ src_task_loop (GstPad * pad)
     GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s",
         gst_flow_get_name (ret));
   }
+  g_mutex_lock (&self->queue_lock);
+  self->src_ret = ret;
+  g_mutex_unlock (&self->queue_lock);
 }
 
 static GstFlowReturn
@@ -475,6 +480,19 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   GstMapInfo map_info;
   gint ret;
 
+  g_mutex_lock (&self->queue_lock);
+  if (self->src_ret != GST_FLOW_OK) {
+    if (G_UNLIKELY (self->src_ret == GST_FLOW_NOT_LINKED
+            || self->src_ret < GST_FLOW_EOS))
+      GST_ERROR_OBJECT (self, "Pushing previous data returned an error: %s",
+          gst_flow_get_name (self->src_ret));
+
+    gst_buffer_unref (buffer);
+    g_mutex_unlock (&self->queue_lock);
+    return self->src_ret;
+  }
+  g_mutex_unlock (&self->queue_lock);
+
   gst_buffer_map (buffer, &map_info, GST_MAP_READ);
 
   if (map_info.size) {
@@ -548,11 +566,12 @@ on_key_received (GstDtlsConnection * connection, gpointer key, guint cipher,
   g_signal_emit (self, signals[SIGNAL_ON_KEY_RECEIVED], 0);
 }
 
-static void
+static gboolean
 on_send_data (GstDtlsConnection * connection, gconstpointer data, gint length,
     GstDtlsEnc * self)
 {
   GstBuffer *buffer;
+  gboolean ret;
 
   GST_DEBUG_OBJECT (self, "sending data from %s with length %d",
       self->connection_id, length);
@@ -569,5 +588,9 @@ on_send_data (GstDtlsConnection * connection, gconstpointer data, gint length,
   g_cond_signal (&self->queue_cond_add);
 
   GST_TRACE_OBJECT (self, "send data: releasing lock");
+
+  ret = self->src_ret == GST_FLOW_OK;
   g_mutex_unlock (&self->queue_lock);
+
+  return ret;
 }
index 622936f..070a75e 100644 (file)
@@ -46,6 +46,7 @@ struct _GstDtlsEnc {
     GstElement element;
 
     GstPad *src;
+    GstFlowReturn src_ret;
 
     GQueue queue;
     GMutex queue_lock;