dtls: Handle errors/close_notify at all steps and propagate through the layers properly
authorSebastian Dröge <sebastian@centricular.com>
Sun, 12 Jan 2020 11:56:00 +0000 (13:56 +0200)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Sun, 19 Jan 2020 11:16:34 +0000 (11:16 +0000)
Previously we simply logged errors but never reported them to elements
or even to the user. Fatal errors are now properly reported.

Additionally proper connection closing is implemented based on EOS:
- dtlsenc: EOS will cause close_notify to be sent to the peer and only
           if the peer also sent back close_notify we will forward the
           EOS event.
- dtlsdec: EOS will be forwarded normally, this only means that the
           unterlying transport was closed. On receiving a DTLS packet
           containing close_notify, return EOS and send EOS downstream.

ext/dtls/gstdtlsconnection.c
ext/dtls/gstdtlsconnection.h
ext/dtls/gstdtlsdec.c
ext/dtls/gstdtlsenc.c

index 86a0e4e..a3dfeb7 100644 (file)
@@ -87,6 +87,10 @@ struct _GstDtlsConnectionPrivate
   gboolean is_alive;
   gboolean keys_exported;
 
+  gboolean sent_close_notify;
+  gboolean received_close_notify;
+  gboolean fatal_error;
+
   GMutex mutex;
   GCond condition;
   gpointer bio_buffer;
@@ -112,7 +116,9 @@ static void gst_dtls_connection_set_property (GObject *, guint prop_id,
 
 static void log_state (GstDtlsConnection *, const gchar * str);
 static void export_srtp_keys (GstDtlsConnection *);
-static void openssl_poll (GstDtlsConnection *);
+static GstFlowReturn openssl_poll (GstDtlsConnection *, GError ** err);
+static GstFlowReturn handle_error (GstDtlsConnection * self, int ret,
+    GstResourceError error_type, GError ** err);
 static int openssl_verify_callback (int preverify_ok,
     X509_STORE_CTX * x509_ctx);
 
@@ -284,16 +290,18 @@ gst_dtls_connection_set_property (GObject * object, guint prop_id,
   }
 }
 
-void
-gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client)
+gboolean
+gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client,
+    GError ** err)
 {
   GstDtlsConnectionPrivate *priv;
+  gboolean ret;
 
   priv = self->priv;
 
-  g_return_if_fail (priv->send_callback);
-  g_return_if_fail (priv->ssl);
-  g_return_if_fail (priv->bio);
+  g_return_val_if_fail (priv->send_callback, FALSE);
+  g_return_val_if_fail (priv->ssl, FALSE);
+  g_return_val_if_fail (priv->bio, FALSE);
 
   GST_TRACE_OBJECT (self, "locking @ start");
   g_mutex_lock (&priv->mutex);
@@ -305,6 +313,10 @@ gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client)
   priv->bio_buffer_offset = 0;
   priv->keys_exported = FALSE;
 
+  priv->fatal_error = FALSE;
+  priv->sent_close_notify = FALSE;
+  priv->received_close_notify = FALSE;
+
   priv->is_client = is_client;
   if (priv->is_client) {
     SSL_set_connect_state (priv->ssl);
@@ -313,12 +325,19 @@ gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client)
   }
   log_state (self, "initial state set");
 
-  openssl_poll (self);
+  ret = openssl_poll (self, err);
+  if (ret == GST_FLOW_EOS && err) {
+    *err =
+        g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_WRITE,
+        "Connection closed");
+  }
 
   log_state (self, "first poll done");
 
   GST_TRACE_OBJECT (self, "unlocking @ start");
   g_mutex_unlock (&priv->mutex);
+
+  return ret == GST_FLOW_OK;
 }
 
 static void
@@ -342,7 +361,7 @@ handle_timeout (gpointer data, gpointer user_data)
       GST_WARNING_OBJECT (self, "handling timeout failed");
     } else if (ret > 0) {
       log_state (self, "handling timeout before poll");
-      openssl_poll (self);
+      openssl_poll (self, NULL);
       log_state (self, "handling timeout after poll");
     }
   }
@@ -507,11 +526,13 @@ gst_dtls_connection_set_send_callback (GstDtlsConnection * self,
   g_mutex_unlock (&priv->mutex);
 }
 
-gint
-gst_dtls_connection_process (GstDtlsConnection * self, gpointer data, gint len)
+GstFlowReturn
+gst_dtls_connection_process (GstDtlsConnection * self, gpointer data, gsize len,
+    gsize * written, GError ** err)
 {
+  GstFlowReturn flow_ret = GST_FLOW_OK;
   GstDtlsConnectionPrivate *priv;
-  gint result;
+  int ret;
 
   g_return_val_if_fail (GST_IS_DTLS_CONNECTION (self), 0);
   g_return_val_if_fail (self->priv->ssl, 0);
@@ -525,6 +546,22 @@ gst_dtls_connection_process (GstDtlsConnection * self, gpointer data, gint len)
 
   g_warn_if_fail (!priv->bio_buffer);
 
+  if (self->priv->received_close_notify) {
+    GST_DEBUG_OBJECT (self, "Already received close_notify");
+    g_mutex_unlock (&priv->mutex);
+    return GST_FLOW_EOS;
+  }
+
+  if (self->priv->fatal_error) {
+    GST_ERROR_OBJECT (self, "Had a fatal error before");
+    g_mutex_unlock (&priv->mutex);
+    if (err)
+      *err =
+          g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
+          "Had fatal error before");
+    return GST_FLOW_ERROR;
+  }
+
   priv->bio_buffer = data;
   priv->bio_buffer_len = len;
   priv->bio_buffer_offset = 0;
@@ -532,29 +569,50 @@ gst_dtls_connection_process (GstDtlsConnection * self, gpointer data, gint len)
   log_state (self, "process start");
 
   if (SSL_want_write (priv->ssl)) {
-    openssl_poll (self);
+    flow_ret = openssl_poll (self, err);
     log_state (self, "process want write, after poll");
+    if (flow_ret != GST_FLOW_OK) {
+      g_mutex_unlock (&priv->mutex);
+      return flow_ret;
+    }
   }
 
-  result = SSL_read (priv->ssl, data, len);
+  ret = SSL_read (priv->ssl, data, len);
+  *written = ret >= 0 ? ret : 0;
+  GST_DEBUG_OBJECT (self, "read result: %d", ret);
+
+  flow_ret = handle_error (self, ret, GST_RESOURCE_ERROR_READ, err);
+  if (flow_ret == GST_FLOW_EOS) {
+    self->priv->received_close_notify = TRUE;
+    /* Notify about the connection being properly closed now if both
+     * sides did so */
+    if (self->priv->sent_close_notify && self->priv->send_callback)
+      self->priv->send_callback (self, NULL, 0, NULL);
+
+    g_mutex_unlock (&priv->mutex);
+    return flow_ret;
+  } else if (flow_ret != GST_FLOW_OK) {
+    g_mutex_unlock (&priv->mutex);
+    return flow_ret;
+  }
 
   log_state (self, "process after read");
 
-  openssl_poll (self);
+  flow_ret = openssl_poll (self, err);
 
   log_state (self, "process after poll");
 
-  GST_DEBUG_OBJECT (self, "read result: %d", result);
-
   GST_TRACE_OBJECT (self, "unlocking @ process");
   g_mutex_unlock (&priv->mutex);
 
-  return result;
+  return flow_ret;
 }
 
-gint
-gst_dtls_connection_send (GstDtlsConnection * self, gpointer data, gint len)
+GstFlowReturn
+gst_dtls_connection_send (GstDtlsConnection * self, gconstpointer data,
+    gsize len, gsize * written, GError ** err)
 {
+  GstFlowReturn flow_ret;
   int ret = 0;
 
   g_return_val_if_fail (GST_IS_DTLS_CONNECTION (self), 0);
@@ -566,20 +624,65 @@ gst_dtls_connection_send (GstDtlsConnection * self, gpointer data, gint len)
   g_mutex_lock (&self->priv->mutex);
   GST_TRACE_OBJECT (self, "locked @ send");
 
-  if (SSL_is_init_finished (self->priv->ssl)) {
+  if (self->priv->fatal_error) {
+    GST_ERROR_OBJECT (self, "Had a fatal error before");
+    g_mutex_unlock (&self->priv->mutex);
+    if (err)
+      *err =
+          g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
+          "Had fatal error before");
+    return GST_FLOW_ERROR;
+  }
+
+  if (self->priv->sent_close_notify) {
+    len = 0;
+    GST_DEBUG_OBJECT (self, "Not sending new data after close_notify");
+  }
+
+  if (len == 0) {
+    if (written)
+      *written = 0;
+    GST_DEBUG_OBJECT (self, "Sending close_notify");
+    ret = SSL_shutdown (self->priv->ssl);
+    self->priv->sent_close_notify = TRUE;
+    if (ret == 1) {
+      GST_LOG_OBJECT (self, "received peer close_notify already");
+      self->priv->received_close_notify = TRUE;
+      flow_ret = GST_FLOW_EOS;
+    } else if (ret == 0) {
+      GST_LOG_OBJECT (self, "did not receive peer close_notify yet");
+      flow_ret = GST_FLOW_OK;
+    } else {
+      flow_ret = handle_error (self, ret, GST_RESOURCE_ERROR_WRITE, err);
+    }
+  } else if (SSL_is_init_finished (self->priv->ssl)) {
+    GST_DEBUG_OBJECT (self, "sending data of %" G_GSIZE_FORMAT " B", len);
     ret = SSL_write (self->priv->ssl, data, len);
-    GST_DEBUG_OBJECT (self, "data sent: input was %d B, output is %d B", len,
-        ret);
+    if (ret <= 0) {
+      if (written)
+        *written = 0;
+      flow_ret = handle_error (self, ret, GST_RESOURCE_ERROR_WRITE, err);
+    } else {
+      if (written)
+        *written = ret;
+      flow_ret = GST_FLOW_OK;
+    }
   } else {
+    if (written)
+      *written = ret;
     GST_WARNING_OBJECT (self,
         "tried to send data before handshake was complete");
-    ret = 0;
+    if (err)
+      *err =
+          g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
+          "Tried to send data before handshake was complete");
+    flow_ret = GST_FLOW_ERROR;
   }
 
   GST_TRACE_OBJECT (self, "unlocking @ send");
   g_mutex_unlock (&self->priv->mutex);
 
-  return ret;
+  return flow_ret;
 }
 
 /*
@@ -724,12 +827,79 @@ ssl_err_cb (const char *str, size_t len, void *u)
   return 0;
 }
 
-static void
-openssl_poll (GstDtlsConnection * self)
+static GstFlowReturn
+handle_error (GstDtlsConnection * self, int ret, GstResourceError error_type,
+    GError ** err)
 {
-  int ret;
   int error;
 
+  error = SSL_get_error (self->priv->ssl, ret);
+
+  switch (error) {
+    case SSL_ERROR_NONE:
+      GST_TRACE_OBJECT (self, "No error");
+      return GST_FLOW_OK;
+    case SSL_ERROR_SSL:
+      GST_ERROR_OBJECT (self, "Fatal SSL error");
+      self->priv->fatal_error = TRUE;
+      ERR_print_errors_cb (ssl_err_cb, self);
+      if (err)
+        *err =
+            g_error_new_literal (GST_RESOURCE_ERROR, error_type,
+            "Fatal SSL error");
+      return GST_FLOW_ERROR;
+    case SSL_ERROR_ZERO_RETURN:
+      GST_LOG_OBJECT (self, "Connection was closed");
+      return GST_FLOW_EOS;
+    case SSL_ERROR_WANT_READ:
+      GST_LOG_OBJECT (self, "SSL wants read");
+      return GST_FLOW_OK;
+    case SSL_ERROR_WANT_WRITE:
+      GST_LOG_OBJECT (self, "SSL wants write");
+      return GST_FLOW_OK;
+    case SSL_ERROR_SYSCALL:{
+      gchar message[1024] = "<unknown>";
+      gint syserror;
+#ifdef G_OS_WIN32
+      syserror = WSAGetLastError ();
+      FormatMessage (FORMAT_MESSAGE_FROM_SYSTEM, NULL, syserror, 0, message,
+          sizeof message, NULL);
+#else
+      syserror = errno;
+      strerror_r (syserror, message, sizeof message);
+#endif
+
+      if (syserror == 0) {
+        GST_TRACE_OBJECT (self, "No error");
+        return GST_FLOW_OK;
+      } else {
+        GST_ERROR_OBJECT (self, "Fatal SSL syscall error: errno %d: %s",
+            syserror, message);
+        if (err)
+          *err =
+              g_error_new (GST_RESOURCE_ERROR, error_type,
+              "Fatal SSL syscall error: errno %d: %s", syserror, message);
+        self->priv->fatal_error = TRUE;
+        return GST_FLOW_ERROR;
+      }
+    }
+    default:
+      self->priv->fatal_error = TRUE;
+      GST_ERROR_OBJECT (self, "Unknown SSL error: %d, ret: %d", error, ret);
+      if (err)
+        *err =
+            g_error_new (GST_RESOURCE_ERROR, error_type,
+            "Unknown SSL error: %d, ret: %d", error, ret);
+      return GST_FLOW_ERROR;
+  }
+}
+
+static GstFlowReturn
+openssl_poll (GstDtlsConnection * self, GError ** err)
+{
+  int ret;
+  GstFlowReturn flow_ret;
+
   log_state (self, "poll: before handshake");
 
   ERR_clear_error ();
@@ -746,54 +916,23 @@ openssl_poll (GstDtlsConnection * self)
       } else {
         GST_INFO_OBJECT (self, "handshake is completed");
       }
-      return;
+      return GST_FLOW_OK;
     case 0:
       GST_DEBUG_OBJECT (self, "do_handshake encountered EOF");
       break;
     case -1:
-      GST_DEBUG_OBJECT (self, "do_handshake encountered BIO error");
+      GST_DEBUG_OBJECT (self, "do_handshake encountered potential BIO error");
       break;
     default:
       GST_DEBUG_OBJECT (self, "do_handshake returned %d", ret);
-  }
-
-  error = SSL_get_error (self->priv->ssl, ret);
-
-  switch (error) {
-    case SSL_ERROR_NONE:
-      GST_WARNING_OBJECT (self, "no error, handshake should be done");
-      break;
-    case SSL_ERROR_SSL:
-      GST_ERROR_OBJECT (self, "SSL error");
-      ERR_print_errors_cb (ssl_err_cb, self);
-      return;
-    case SSL_ERROR_WANT_READ:
-      GST_LOG_OBJECT (self, "SSL wants read");
-      break;
-    case SSL_ERROR_WANT_WRITE:
-      GST_LOG_OBJECT (self, "SSL wants write");
       break;
-    case SSL_ERROR_SYSCALL:{
-      gchar message[1024] = "<unknown>";
-      gint syserror;
-#ifdef G_OS_WIN32
-      syserror = WSAGetLastError ();
-      FormatMessage (FORMAT_MESSAGE_FROM_SYSTEM, NULL, syserror, 0, message,
-          sizeof message, NULL);
-#else
-      syserror = errno;
-      strerror_r (syserror, message, sizeof message);
-#endif
-      GST_CAT_LEVEL_LOG (GST_CAT_DEFAULT,
-          syserror != 0 ? GST_LEVEL_WARNING : GST_LEVEL_LOG,
-          self, "SSL syscall error: errno %d: %s", syserror, message);
-      break;
-    }
-    default:
-      GST_WARNING_OBJECT (self, "Unknown SSL error: %d, ret: %d", error, ret);
   }
 
+  flow_ret = handle_error (self, ret, GST_RESOURCE_ERROR_OPEN_WRITE, err);
+
   ERR_print_errors_cb (ssl_warn_cb, self);
+
+  return flow_ret;
 }
 
 static int
index bc2c569..6315ef1 100644 (file)
@@ -26,7 +26,7 @@
 #ifndef gstdtlsconnection_h
 #define gstdtlsconnection_h
 
-#include <glib-object.h>
+#include <gst/gst.h>
 
 G_BEGIN_DECLS
 
@@ -84,7 +84,7 @@ struct _GstDtlsConnectionClass {
 
 GType gst_dtls_connection_get_type(void) G_GNUC_CONST;
 
-void gst_dtls_connection_start(GstDtlsConnection *, gboolean is_client);
+gboolean gst_dtls_connection_start(GstDtlsConnection *, gboolean is_client, GError **err);
 void gst_dtls_connection_check_timeout(GstDtlsConnection *);
 
 /*
@@ -108,15 +108,30 @@ void gst_dtls_connection_set_send_callback(GstDtlsConnection *, GstDtlsConnectio
 
 /*
  * Processes data that has been received, the transformation is done in-place.
- * Returns the length of the plaintext data that was decoded, if no data is available, 0<= will be returned.
+ *
+ * Returns:
+ *   - GST_FLOW_EOS if the receive side of the DTLS connection was closed by
+ *     the peer, i.e. close_notify was sent by the peer
+ *   - GST_FLOW_ERROR + err if an error happened
+ *   - GST_FLOW_OK + written >= 0 if processing was successful. ptr then
+ *     contains the decoded bytes
  */
-gint gst_dtls_connection_process(GstDtlsConnection *, gpointer ptr, gint len);
+GstFlowReturn gst_dtls_connection_process(GstDtlsConnection *, gpointer ptr, gsize len, gsize *written, GError **err);
 
 /*
- * If the DTLS handshake is completed this function will encode the given data.
- * Returns the length of the data sent, or 0 if the DTLS handshake is not completed.
+ * Will encode and send the given data.
+ *
+ * Sending with len == 0 will close the send side of the DTLS connection and
+ * no further data can be sent anymore in the future. This will also send the
+ * close_notify to the peer.
+ *
+ * Returns:
+ *   - GST_FLOW_EOS if the send side of the DTLS connection was closed, i.e.
+ *     we received an EOS before.
+ *   - GST_FLOW_ERROR + err if an error happened
+ *   - GST_FLOW_OK + written >= 0 if processing was successful
  */
-gint gst_dtls_connection_send(GstDtlsConnection *, gpointer ptr, gint len);
+GstFlowReturn gst_dtls_connection_send(GstDtlsConnection *, gconstpointer ptr, gsize len, gsize *written, GError **err);
 
 G_END_DECLS
 
index 0d4ffe7..bba0035 100644 (file)
@@ -452,62 +452,105 @@ on_peer_certificate_received (GstDtlsConnection * connection, gchar * pem,
   return TRUE;
 }
 
-static gint
+static GstFlowReturn
 process_buffer (GstDtlsDec * self, GstBuffer * buffer)
 {
+  GstFlowReturn flow_ret;
   GstMapInfo map_info;
-  gint size;
+  GError *err = NULL;
+  gsize written = 0;
 
   if (!gst_buffer_map (buffer, &map_info, GST_MAP_READWRITE))
-    return 0;
+    return GST_FLOW_ERROR;
 
   if (!map_info.size) {
     gst_buffer_unmap (buffer, &map_info);
-    return 0;
+    return GST_FLOW_ERROR;
   }
 
-  size =
+  flow_ret =
       gst_dtls_connection_process (self->connection, map_info.data,
-      map_info.size);
+      map_info.size, &written, &err);
   gst_buffer_unmap (buffer, &map_info);
 
-  if (size <= 0)
-    return size;
-
-  gst_buffer_set_size (buffer, size);
+  switch (flow_ret) {
+    case GST_FLOW_OK:
+      GST_LOG_OBJECT (self,
+          "Decoded buffer of size %" G_GSIZE_FORMAT " B to %" G_GSIZE_FORMAT,
+          map_info.size, written);
+      gst_buffer_set_size (buffer, written);
+      break;
+    case GST_FLOW_EOS:
+      gst_buffer_set_size (buffer, written);
+      GST_DEBUG_OBJECT (self, "Peer closed the connection");
+      break;
+    case GST_FLOW_ERROR:
+      GST_ERROR_OBJECT (self, "Error processing buffer: %s", err->message);
+      GST_ELEMENT_ERROR (self, RESOURCE, READ, (NULL), ("%s", err->message));
+      g_clear_error (&err);
+      break;
+    default:
+      g_assert_not_reached ();
+  }
+  g_assert (err == NULL);
 
-  return size;
+  return flow_ret;
 }
 
+typedef struct
+{
+  GstDtlsDec *self;
+  GstFlowReturn flow_ret;
+  guint processed;
+} ProcessListData;
+
 static gboolean
 process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
 {
-  GstDtlsDec *self = GST_DTLS_DEC (user_data);
-  gint size;
+  ProcessListData *process_list_data = user_data;
+  GstDtlsDec *self = GST_DTLS_DEC (process_list_data->self);
+  GstFlowReturn flow_ret;
 
   *buffer = gst_buffer_make_writable (*buffer);
-  size = process_buffer (self, *buffer);
-  if (size <= 0)
+  flow_ret = process_buffer (self, *buffer);
+
+  process_list_data->flow_ret = flow_ret;
+  if (gst_buffer_get_size (*buffer) == 0)
     gst_buffer_replace (buffer, NULL);
+  else if (flow_ret != GST_FLOW_ERROR)
+    process_list_data->processed++;
 
-  return TRUE;
+  return flow_ret == GST_FLOW_OK;
 }
 
 static GstFlowReturn
 sink_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
 {
   GstDtlsDec *self = GST_DTLS_DEC (parent);
-  GstFlowReturn ret = GST_FLOW_OK;
   GstPad *other_pad;
+  ProcessListData process_list_data = { self, GST_FLOW_OK, 0 };
 
   list = gst_buffer_list_make_writable (list);
-  gst_buffer_list_foreach (list, process_buffer_from_list, self);
+  gst_buffer_list_foreach (list, process_buffer_from_list, &process_list_data);
+
+  /* If we successfully processed at least some buffers then forward those */
+  if (process_list_data.flow_ret != GST_FLOW_OK
+      && process_list_data.processed == 0) {
+    GST_ERROR_OBJECT (self, "Failed to process buffer list: %s",
+        gst_flow_get_name (process_list_data.flow_ret));
+    gst_buffer_list_unref (list);
+    return process_list_data.flow_ret;
+  }
+
+  /* Remove all buffers after the first one that failed to be processed */
+  gst_buffer_list_remove (list, process_list_data.processed,
+      gst_buffer_list_length (list) - process_list_data.processed);
 
   if (gst_buffer_list_length (list) == 0) {
     GST_DEBUG_OBJECT (self, "Not produced any buffers");
     gst_buffer_list_unref (list);
 
-    return GST_FLOW_OK;
+    return process_list_data.flow_ret;
   }
 
   g_mutex_lock (&self->src_mutex);
@@ -517,17 +560,25 @@ sink_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
   g_mutex_unlock (&self->src_mutex);
 
   if (other_pad) {
-    GST_LOG_OBJECT (self, "decoded buffer list with length %u, pushing",
+    gboolean was_eos = process_list_data.flow_ret == GST_FLOW_EOS;
+
+    GST_LOG_OBJECT (self, "pushing buffer list with length %u",
         gst_buffer_list_length (list));
-    ret = gst_pad_push_list (other_pad, list);
+    process_list_data.flow_ret = gst_pad_push_list (other_pad, list);
+
+    /* If the peer closed the connection, signal that we're done here now */
+    if (was_eos)
+      gst_pad_push_event (other_pad, gst_event_new_eos ());
+
     gst_object_unref (other_pad);
   } else {
-    GST_LOG_OBJECT (self, "dropped buffer list with length %d, not linked",
+    GST_LOG_OBJECT (self,
+        "dropping buffer list with length %d, have no source pad",
         gst_buffer_list_length (list));
     gst_buffer_list_unref (list);
   }
 
-  return ret;
+  return process_list_data.flow_ret;
 }
 
 static GstFlowReturn
@@ -535,7 +586,6 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 {
   GstDtlsDec *self = GST_DTLS_DEC (parent);
   GstFlowReturn ret = GST_FLOW_OK;
-  gint size;
   GstPad *other_pad;
 
   if (!self->agent) {
@@ -548,12 +598,12 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
       self->connection_id, gst_buffer_get_size (buffer));
 
   buffer = gst_buffer_make_writable (buffer);
-  size = process_buffer (self, buffer);
-
-  if (size <= 0) {
+  ret = process_buffer (self, buffer);
+  if (ret == GST_FLOW_ERROR) {
+    GST_ERROR_OBJECT (self, "Failed to process buffer: %s",
+        gst_flow_get_name (ret));
     gst_buffer_unref (buffer);
-
-    return GST_FLOW_OK;
+    return ret;
   }
 
   g_mutex_lock (&self->src_mutex);
@@ -563,11 +613,25 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   g_mutex_unlock (&self->src_mutex);
 
   if (other_pad) {
-    GST_LOG_OBJECT (self, "decoded buffer with length %d, pushing", size);
-    ret = gst_pad_push (other_pad, buffer);
+    gboolean was_eos = (ret == GST_FLOW_EOS);
+
+    if (gst_buffer_get_size (buffer) > 0) {
+      GST_LOG_OBJECT (self, "pushing buffer");
+      ret = gst_pad_push (other_pad, buffer);
+    } else {
+      gst_buffer_unref (buffer);
+    }
+
+    /* If the peer closed the connection, signal that we're done here now */
+    if (was_eos) {
+      gst_pad_push_event (other_pad, gst_event_new_eos ());
+      if (ret == GST_FLOW_OK)
+        ret = GST_FLOW_EOS;
+    }
+
     gst_object_unref (other_pad);
   } else {
-    GST_LOG_OBJECT (self, "dropped buffer with length %d, not linked", size);
+    GST_LOG_OBJECT (self, "dropping buffer, have no source pad");
     gst_buffer_unref (buffer);
   }
 
index 1f94c4b..fd59722 100644 (file)
@@ -101,7 +101,7 @@ static gboolean sink_event (GstPad * pad, GstObject * parent, GstEvent * event);
 static void on_key_received (GstDtlsConnection *, gpointer key, guint cipher,
     guint auth, GstDtlsEnc *);
 static gboolean on_send_data (GstDtlsConnection *, gconstpointer data,
-    gint length, GstDtlsEnc *);
+    gsize length, GstDtlsEnc *);
 
 static void
 gst_dtls_enc_class_init (GstDtlsEncClass * klass)
@@ -326,10 +326,17 @@ gst_dtls_enc_change_state (GstElement * element, GstStateChange transition)
   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
 
   switch (transition) {
-    case GST_STATE_CHANGE_READY_TO_PAUSED:
+    case GST_STATE_CHANGE_READY_TO_PAUSED:{
+      GError *err = NULL;
+
       GST_DEBUG_OBJECT (self, "starting connection %s", self->connection_id);
-      gst_dtls_connection_start (self->connection, self->is_client);
+      if (!gst_dtls_connection_start (self->connection, self->is_client, &err)) {
+        GST_ELEMENT_ERROR (self, RESOURCE, OPEN_WRITE, (NULL), ("%s",
+                err->message));
+        g_clear_error (&err);
+      }
       break;
+    }
     default:
       break;
   }
@@ -460,17 +467,25 @@ src_task_loop (GstPad * pad)
 
   GST_TRACE_OBJECT (self, "src loop: releasing lock");
 
-  ret = gst_pad_push (self->src, buffer);
-  if (check_connection_timeout)
-    gst_dtls_connection_check_timeout (self->connection);
+  if (buffer) {
+    ret = gst_pad_push (self->src, buffer);
+    if (check_connection_timeout)
+      gst_dtls_connection_check_timeout (self->connection);
 
-  if (G_UNLIKELY (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
-    GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s",
-        gst_flow_get_name (ret));
+    if (G_UNLIKELY (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
+      GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s",
+          gst_flow_get_name (ret));
+    }
+    g_mutex_lock (&self->queue_lock);
+    self->src_ret = ret;
+    g_mutex_unlock (&self->queue_lock);
+  } else {
+    GST_DEBUG_OBJECT (self, "Peer and us closed the connection, sending EOS");
+    gst_pad_push_event (self->src, gst_event_new_eos ());
+    g_mutex_lock (&self->queue_lock);
+    self->src_ret = GST_FLOW_EOS;
+    g_mutex_unlock (&self->queue_lock);
   }
-  g_mutex_lock (&self->queue_lock);
-  self->src_ret = ret;
-  g_mutex_unlock (&self->queue_lock);
 }
 
 static GstFlowReturn
@@ -478,7 +493,9 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 {
   GstDtlsEnc *self = GST_DTLS_ENC (parent);
   GstMapInfo map_info;
-  gint ret;
+  GError *err = NULL;
+  gsize to_write, written = 0;
+  GstFlowReturn ret = GST_FLOW_OK;
 
   g_mutex_lock (&self->queue_lock);
   if (self->src_ret != GST_FLOW_OK) {
@@ -495,28 +512,48 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 
   gst_buffer_map (buffer, &map_info, GST_MAP_READ);
 
-  if (map_info.size) {
+  to_write = map_info.size;
+
+  while (to_write > 0 && ret == GST_FLOW_OK) {
     ret =
         gst_dtls_connection_send (self->connection, map_info.data,
-        map_info.size);
-    if (ret != map_info.size) {
-      GST_WARNING_OBJECT (self,
-          "error sending data: %d B were written, expected value was %"
-          G_GSIZE_FORMAT " B", ret, map_info.size);
+        map_info.size, &written, &err);
+
+    switch (ret) {
+      case GST_FLOW_OK:
+        GST_DEBUG_OBJECT (self,
+            "Wrote %" G_GSIZE_FORMAT " B of %" G_GSIZE_FORMAT " B", written,
+            map_info.size);
+        g_assert (written <= to_write);
+        to_write -= written;
+        break;
+      case GST_FLOW_EOS:
+        GST_INFO_OBJECT (self, "Received data after the connection was closed");
+        break;
+      case GST_FLOW_ERROR:
+        GST_WARNING_OBJECT (self, "error sending data: %s", err->message);
+        GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL), ("%s", err->message));
+        g_clear_error (&err);
+        break;
+      default:
+        g_assert_not_reached ();
+        break;
     }
+
+    g_assert (err == NULL);
   }
 
   gst_buffer_unmap (buffer, &map_info);
-
   gst_buffer_unref (buffer);
 
-  return GST_FLOW_OK;
+  return ret;
 }
 
 
 static gboolean
 sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 {
+  GstDtlsEnc *self = GST_DTLS_ENC (parent);
   gboolean ret = FALSE;
 
   switch (GST_EVENT_TYPE (event)) {
@@ -528,6 +565,28 @@ sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       gst_event_unref (event);
       ret = TRUE;
       break;
+    case GST_EVENT_EOS:{
+      GstFlowReturn flow_ret;
+
+      /* Close the write side of the connection now */
+      flow_ret =
+          gst_dtls_connection_send (self->connection, NULL, 0, NULL, NULL);
+
+      if (flow_ret != GST_FLOW_OK)
+        GST_ERROR_OBJECT (self, "Failed to send close_notify");
+
+      /* Do not forward the EOS event unless the peer already closed to the
+       * connection itself. If it didn't yet then we'll later get the send
+       * callback called with no data and send EOS from there */
+      if (flow_ret == GST_FLOW_EOS) {
+        ret = gst_pad_event_default (pad, parent, event);
+      } else {
+        gst_event_unref (event);
+        ret = TRUE;
+      }
+
+      break;
+    }
     default:
       ret = gst_pad_event_default (pad, parent, event);
       break;
@@ -567,16 +626,17 @@ on_key_received (GstDtlsConnection * connection, gpointer key, guint cipher,
 }
 
 static gboolean
-on_send_data (GstDtlsConnection * connection, gconstpointer data, gint length,
+on_send_data (GstDtlsConnection * connection, gconstpointer data, gsize length,
     GstDtlsEnc * self)
 {
   GstBuffer *buffer;
   gboolean ret;
 
-  GST_DEBUG_OBJECT (self, "sending data from %s with length %d",
+  GST_DEBUG_OBJECT (self, "sending data from %s with length %" G_GSIZE_FORMAT,
       self->connection_id, length);
 
-  buffer = gst_buffer_new_wrapped (g_memdup (data, length), length);
+  buffer =
+      data ? gst_buffer_new_wrapped (g_memdup (data, length), length) : NULL;
 
   GST_TRACE_OBJECT (self, "send data: acquiring lock");
   g_mutex_lock (&self->queue_lock);