dtls: Update codes based on 1.18.4 52/253452/5
authorSangchul Lee <sc11.lee@samsung.com>
Wed, 10 Feb 2021 07:17:14 +0000 (16:17 +0900)
committerSangchul Lee <sc11.lee@samsung.com>
Thu, 3 Jun 2021 07:33:26 +0000 (16:33 +0900)
Files in ext/dtls/ are updated.

Change-Id: I14a943cf0210610c4e8ee3da31d03bd4f47944dd
Signed-off-by: Sangchul Lee <sc11.lee@samsung.com>
12 files changed:
ext/dtls/gstdtlscertificate.c
ext/dtls/gstdtlsconnection.c
ext/dtls/gstdtlsconnection.h
ext/dtls/gstdtlsdec.c
ext/dtls/gstdtlsenc.c
ext/dtls/gstdtlsenc.h
ext/dtls/gstdtlssrtpbin.c
ext/dtls/gstdtlssrtpdec.c
ext/dtls/gstdtlssrtpenc.c
ext/dtls/gstdtlssrtpenc.h
ext/dtls/meson.build
ext/dtls/plugin.c

index 4e44849..d7411c8 100644 (file)
@@ -179,12 +179,24 @@ gst_dtls_certificate_get_property (GObject * object, guint prop_id,
   }
 }
 
+static const gchar base64_alphabet[64] = {
+  'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
+  'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
+  'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
+  'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
+  '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '+', '/'
+};
+
 static void
 init_generated (GstDtlsCertificate * self)
 {
   GstDtlsCertificatePrivate *priv = self->priv;
   RSA *rsa;
+  BIGNUM *serial_number;
+  ASN1_INTEGER *asn1_serial_number;
   X509_NAME *name = NULL;
+  gchar common_name[9] = { 0, };
+  gint i;
 
   g_return_if_fail (!priv->x509);
   g_return_if_fail (!priv->private_key);
@@ -244,19 +256,30 @@ init_generated (GstDtlsCertificate * self)
   rsa = NULL;
 
   X509_set_version (priv->x509, 2);
-  ASN1_INTEGER_set (X509_get_serialNumber (priv->x509), 0);
+
+  /* Set a random 64 bit integer as serial number */
+  serial_number = BN_new ();
+  BN_pseudo_rand (serial_number, 64, 0, 0);
+  asn1_serial_number = X509_get_serialNumber (priv->x509);
+  BN_to_ASN1_INTEGER (serial_number, asn1_serial_number);
+  BN_free (serial_number);
+
+  /* Set a random 8 byte base64 string as issuer/subject */
+  name = X509_NAME_new ();
+  for (i = 0; i < 8; i++)
+    common_name[i] =
+        base64_alphabet[g_random_int_range (0, G_N_ELEMENTS (base64_alphabet))];
+  X509_NAME_add_entry_by_NID (name, NID_commonName, MBSTRING_ASC,
+      (const guchar *) common_name, -1, -1, 0);
+  X509_set_subject_name (priv->x509, name);
+  X509_set_issuer_name (priv->x509, name);
+  X509_NAME_free (name);
+
+  /* Set expiry in a year */
   X509_gmtime_adj (X509_getm_notBefore (priv->x509), 0);
   X509_gmtime_adj (X509_getm_notAfter (priv->x509), 31536000L); /* A year */
   X509_set_pubkey (priv->x509, priv->private_key);
 
-  name = X509_get_subject_name (priv->x509);
-  X509_NAME_add_entry_by_txt (name, "C", MBSTRING_ASC, (unsigned char *) "SE",
-      -1, -1, 0);
-  X509_NAME_add_entry_by_txt (name, "CN", MBSTRING_ASC,
-      (unsigned char *) "OpenWebRTC", -1, -1, 0);
-  X509_set_issuer_name (priv->x509, name);
-  name = NULL;
-
   if (!X509_sign (priv->x509, priv->private_key, EVP_sha256 ())) {
     GST_WARNING_OBJECT (self, "failed to sign certificate");
     EVP_PKEY_free (priv->private_key);
index 244ec99..1c8364a 100644 (file)
@@ -69,6 +69,7 @@ enum
 {
   PROP_0,
   PROP_AGENT,
+  PROP_CONNECTION_STATE,
   NUM_PROPERTIES
 };
 
@@ -87,13 +88,19 @@ struct _GstDtlsConnectionPrivate
   gboolean is_alive;
   gboolean keys_exported;
 
+  GstDtlsConnectionState connection_state;
+  gboolean sent_close_notify;
+  gboolean received_close_notify;
+
   GMutex mutex;
   GCond condition;
   gpointer bio_buffer;
   gint bio_buffer_len;
   gint bio_buffer_offset;
 
-  GClosure *send_closure;
+  GstDtlsConnectionSendCallback send_callback;
+  gpointer send_callback_user_data;
+  GDestroyNotify send_callback_destroy_notify;
 
   gboolean timeout_pending;
   GThreadPool *thread_pool;
@@ -107,10 +114,15 @@ G_DEFINE_TYPE_WITH_CODE (GstDtlsConnection, gst_dtls_connection, G_TYPE_OBJECT,
 static void gst_dtls_connection_finalize (GObject * gobject);
 static void gst_dtls_connection_set_property (GObject *, guint prop_id,
     const GValue *, GParamSpec *);
+static void gst_dtls_connection_get_property (GObject *, guint prop_id,
+    GValue *, GParamSpec *);
 
 static void log_state (GstDtlsConnection *, const gchar * str);
-static void export_srtp_keys (GstDtlsConnection *);
-static void openssl_poll (GstDtlsConnection *);
+static gboolean export_srtp_keys (GstDtlsConnection *, GError ** err);
+static GstFlowReturn openssl_poll (GstDtlsConnection *, gboolean * notify_state,
+    GError ** err);
+static GstFlowReturn handle_error (GstDtlsConnection * self, int ret,
+    GstResourceError error_type, gboolean * notify_state, GError ** err);
 static int openssl_verify_callback (int preverify_ok,
     X509_STORE_CTX * x509_ctx);
 
@@ -127,6 +139,7 @@ gst_dtls_connection_class_init (GstDtlsConnectionClass * klass)
   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
 
   gobject_class->set_property = gst_dtls_connection_set_property;
+  gobject_class->get_property = gst_dtls_connection_get_property;
 
   connection_ex_index =
       SSL_get_ex_new_index (0, (gpointer) "gstdtlsagent connection index", NULL,
@@ -134,20 +147,17 @@ gst_dtls_connection_class_init (GstDtlsConnectionClass * klass)
 
   signals[SIGNAL_ON_DECODER_KEY] =
       g_signal_new ("on-decoder-key", G_TYPE_FROM_CLASS (klass),
-      G_SIGNAL_RUN_LAST, 0, NULL, NULL,
-      g_cclosure_marshal_generic, G_TYPE_NONE, 3,
-      G_TYPE_POINTER, G_TYPE_UINT, G_TYPE_UINT);
+      G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
+      G_TYPE_NONE, 3, G_TYPE_POINTER, G_TYPE_UINT, G_TYPE_UINT);
 
   signals[SIGNAL_ON_ENCODER_KEY] =
       g_signal_new ("on-encoder-key", G_TYPE_FROM_CLASS (klass),
-      G_SIGNAL_RUN_LAST, 0, NULL, NULL,
-      g_cclosure_marshal_generic, G_TYPE_NONE, 3,
-      G_TYPE_POINTER, G_TYPE_UINT, G_TYPE_UINT);
+      G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
+      G_TYPE_NONE, 3, G_TYPE_POINTER, G_TYPE_UINT, G_TYPE_UINT);
 
   signals[SIGNAL_ON_PEER_CERTIFICATE] =
       g_signal_new ("on-peer-certificate", G_TYPE_FROM_CLASS (klass),
-      G_SIGNAL_RUN_LAST, 0, NULL, NULL,
-      g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_STRING);
+      G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_BOOLEAN, 1, G_TYPE_STRING);
 
   properties[PROP_AGENT] =
       g_param_spec_object ("agent",
@@ -156,6 +166,13 @@ gst_dtls_connection_class_init (GstDtlsConnectionClass * klass)
       GST_TYPE_DTLS_AGENT,
       G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS);
 
+  properties[PROP_CONNECTION_STATE] =
+      g_param_spec_enum ("connection-state",
+      "Connection State",
+      "Current connection state",
+      GST_DTLS_TYPE_CONNECTION_STATE,
+      GST_DTLS_CONNECTION_STATE_NEW, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
+
   g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
 
   _gst_dtls_init_openssl ();
@@ -173,8 +190,6 @@ gst_dtls_connection_init (GstDtlsConnection * self)
   priv->ssl = NULL;
   priv->bio = NULL;
 
-  priv->send_closure = NULL;
-
   priv->is_client = FALSE;
   priv->is_alive = TRUE;
   priv->keys_exported = FALSE;
@@ -206,10 +221,8 @@ gst_dtls_connection_finalize (GObject * gobject)
   SSL_free (priv->ssl);
   priv->ssl = NULL;
 
-  if (priv->send_closure) {
-    g_closure_unref (priv->send_closure);
-    priv->send_closure = NULL;
-  }
+  if (priv->send_callback_destroy_notify)
+    priv->send_callback_destroy_notify (priv->send_callback_user_data);
 
   g_mutex_clear (&priv->mutex);
   g_cond_clear (&priv->condition);
@@ -289,16 +302,37 @@ gst_dtls_connection_set_property (GObject * object, guint prop_id,
   }
 }
 
-void
-gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client)
+static void
+gst_dtls_connection_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec)
+{
+  GstDtlsConnection *self = GST_DTLS_CONNECTION (object);
+  GstDtlsConnectionPrivate *priv = self->priv;
+
+  switch (prop_id) {
+    case PROP_CONNECTION_STATE:
+      g_mutex_lock (&priv->mutex);
+      g_value_set_enum (value, priv->connection_state);
+      g_mutex_unlock (&priv->mutex);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
+  }
+}
+
+gboolean
+gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client,
+    GError ** err)
 {
   GstDtlsConnectionPrivate *priv;
+  gboolean ret;
+  gboolean notify_state = FALSE;
 
   priv = self->priv;
 
-  g_return_if_fail (priv->send_closure);
-  g_return_if_fail (priv->ssl);
-  g_return_if_fail (priv->bio);
+  g_return_val_if_fail (priv->send_callback, FALSE);
+  g_return_val_if_fail (priv->ssl, FALSE);
+  g_return_val_if_fail (priv->bio, FALSE);
 
   GST_TRACE_OBJECT (self, "locking @ start");
   g_mutex_lock (&priv->mutex);
@@ -310,20 +344,43 @@ gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client)
   priv->bio_buffer_offset = 0;
   priv->keys_exported = FALSE;
 
+  priv->sent_close_notify = FALSE;
+  priv->received_close_notify = FALSE;
+
+  /* Client immediately starts connecting, the server waits for a client to
+   * start the handshake process */
   priv->is_client = is_client;
   if (priv->is_client) {
+    priv->connection_state = GST_DTLS_CONNECTION_STATE_CONNECTING;
+    notify_state = TRUE;
     SSL_set_connect_state (priv->ssl);
   } else {
+    if (priv->connection_state != GST_DTLS_CONNECTION_STATE_NEW) {
+      priv->connection_state = GST_DTLS_CONNECTION_STATE_NEW;
+      notify_state = TRUE;
+    }
     SSL_set_accept_state (priv->ssl);
   }
   log_state (self, "initial state set");
 
-  openssl_poll (self);
+  ret = openssl_poll (self, &notify_state, err);
+  if (ret == GST_FLOW_EOS && err) {
+    *err =
+        g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_WRITE,
+        "Connection closed");
+  }
 
   log_state (self, "first poll done");
 
   GST_TRACE_OBJECT (self, "unlocking @ start");
   g_mutex_unlock (&priv->mutex);
+
+  if (notify_state) {
+    g_object_notify_by_pspec (G_OBJECT (self),
+        properties[PROP_CONNECTION_STATE]);
+  }
+
+  return ret == GST_FLOW_OK;
 }
 
 static void
@@ -332,6 +389,7 @@ handle_timeout (gpointer data, gpointer user_data)
   GstDtlsConnection *self = user_data;
   GstDtlsConnectionPrivate *priv;
   gint ret;
+  gboolean notify_state = FALSE;
 
   priv = self->priv;
 
@@ -347,11 +405,16 @@ handle_timeout (gpointer data, gpointer user_data)
       GST_WARNING_OBJECT (self, "handling timeout failed");
     } else if (ret > 0) {
       log_state (self, "handling timeout before poll");
-      openssl_poll (self);
+      openssl_poll (self, &notify_state, NULL);
       log_state (self, "handling timeout after poll");
     }
   }
   g_mutex_unlock (&priv->mutex);
+
+  if (notify_state) {
+    g_object_notify_by_pspec (G_OBJECT (self),
+        properties[PROP_CONNECTION_STATE]);
+  }
 }
 
 static gboolean
@@ -442,6 +505,8 @@ gst_dtls_connection_check_timeout (GstDtlsConnection * self)
 void
 gst_dtls_connection_stop (GstDtlsConnection * self)
 {
+  gboolean notify_state = FALSE;
+
   g_return_if_fail (GST_IS_DTLS_CONNECTION (self));
   g_return_if_fail (self->priv->ssl);
   g_return_if_fail (self->priv->bio);
@@ -453,6 +518,11 @@ gst_dtls_connection_stop (GstDtlsConnection * self)
   GST_TRACE_OBJECT (self, "locked @ stop");
 
   self->priv->is_alive = FALSE;
+  if (self->priv->connection_state != GST_DTLS_CONNECTION_STATE_FAILED
+      && self->priv->connection_state != GST_DTLS_CONNECTION_STATE_CLOSED) {
+    self->priv->connection_state = GST_DTLS_CONNECTION_STATE_CLOSED;
+    notify_state = TRUE;
+  }
   GST_TRACE_OBJECT (self, "signaling @ stop");
   g_cond_signal (&self->priv->condition);
   GST_TRACE_OBJECT (self, "signaled @ stop");
@@ -461,11 +531,18 @@ gst_dtls_connection_stop (GstDtlsConnection * self)
   g_mutex_unlock (&self->priv->mutex);
 
   GST_DEBUG_OBJECT (self, "stopped connection");
+
+  if (notify_state) {
+    g_object_notify_by_pspec (G_OBJECT (self),
+        properties[PROP_CONNECTION_STATE]);
+  }
 }
 
 void
 gst_dtls_connection_close (GstDtlsConnection * self)
 {
+  gboolean notify_state = FALSE;
+
   g_return_if_fail (GST_IS_DTLS_CONNECTION (self));
   g_return_if_fail (self->priv->ssl);
   g_return_if_fail (self->priv->bio);
@@ -481,41 +558,56 @@ gst_dtls_connection_close (GstDtlsConnection * self)
     g_cond_signal (&self->priv->condition);
   }
 
+  if (self->priv->connection_state != GST_DTLS_CONNECTION_STATE_FAILED
+      && self->priv->connection_state != GST_DTLS_CONNECTION_STATE_CLOSED) {
+    self->priv->connection_state = GST_DTLS_CONNECTION_STATE_CLOSED;
+    notify_state = TRUE;
+  }
+
   GST_TRACE_OBJECT (self, "unlocking @ close");
   g_mutex_unlock (&self->priv->mutex);
 
   GST_DEBUG_OBJECT (self, "closed connection");
+
+  if (notify_state) {
+    g_object_notify_by_pspec (G_OBJECT (self),
+        properties[PROP_CONNECTION_STATE]);
+  }
 }
 
 void
 gst_dtls_connection_set_send_callback (GstDtlsConnection * self,
-    GClosure * closure)
+    GstDtlsConnectionSendCallback callback, gpointer user_data,
+    GDestroyNotify destroy_notify)
 {
+  GstDtlsConnectionPrivate *priv;
+
   g_return_if_fail (GST_IS_DTLS_CONNECTION (self));
 
+  priv = self->priv;
+
   GST_TRACE_OBJECT (self, "locking @ set_send_callback");
-  g_mutex_lock (&self->priv->mutex);
+  g_mutex_lock (&priv->mutex);
   GST_TRACE_OBJECT (self, "locked @ set_send_callback");
 
-  if (self->priv->send_closure) {
-    g_closure_unref (self->priv->send_closure);
-    self->priv->send_closure = NULL;
-  }
-  self->priv->send_closure = closure;
-
-  if (closure && G_CLOSURE_NEEDS_MARSHAL (closure)) {
-    g_closure_set_marshal (closure, g_cclosure_marshal_generic);
-  }
+  if (priv->send_callback_destroy_notify)
+    priv->send_callback_destroy_notify (priv->send_callback_user_data);
+  priv->send_callback = callback;
+  priv->send_callback_user_data = user_data;
+  priv->send_callback_destroy_notify = destroy_notify;
 
   GST_TRACE_OBJECT (self, "unlocking @ set_send_callback");
-  g_mutex_unlock (&self->priv->mutex);
+  g_mutex_unlock (&priv->mutex);
 }
 
-gint
-gst_dtls_connection_process (GstDtlsConnection * self, gpointer data, gint len)
+GstFlowReturn
+gst_dtls_connection_process (GstDtlsConnection * self, gpointer data, gsize len,
+    gsize * written, GError ** err)
 {
+  GstFlowReturn flow_ret = GST_FLOW_OK;
   GstDtlsConnectionPrivate *priv;
-  gint result;
+  int ret;
+  gboolean notify_state = FALSE;
 
   g_return_val_if_fail (GST_IS_DTLS_CONNECTION (self), 0);
   g_return_val_if_fail (self->priv->ssl, 0);
@@ -527,6 +619,23 @@ gst_dtls_connection_process (GstDtlsConnection * self, gpointer data, gint len)
   g_mutex_lock (&priv->mutex);
   GST_TRACE_OBJECT (self, "locked @ process");
 
+  if (self->priv->received_close_notify
+      || self->priv->connection_state == GST_DTLS_CONNECTION_STATE_CLOSED) {
+    GST_DEBUG_OBJECT (self, "Already received close_notify");
+    g_mutex_unlock (&priv->mutex);
+    return GST_FLOW_EOS;
+  }
+
+  if (self->priv->connection_state == GST_DTLS_CONNECTION_STATE_FAILED) {
+    GST_ERROR_OBJECT (self, "Had a fatal error before");
+    g_mutex_unlock (&priv->mutex);
+    if (err)
+      *err =
+          g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
+          "Had fatal error before");
+    return GST_FLOW_ERROR;
+  }
+
   g_warn_if_fail (!priv->bio_buffer);
 
   priv->bio_buffer = data;
@@ -536,30 +645,84 @@ gst_dtls_connection_process (GstDtlsConnection * self, gpointer data, gint len)
   log_state (self, "process start");
 
   if (SSL_want_write (priv->ssl)) {
-    openssl_poll (self);
+    flow_ret = openssl_poll (self, &notify_state, err);
     log_state (self, "process want write, after poll");
+    if (flow_ret != GST_FLOW_OK) {
+      g_mutex_unlock (&priv->mutex);
+      return flow_ret;
+    }
   }
 
-  result = SSL_read (priv->ssl, data, len);
+  /* If we're a server and were in new state then by receiving the first data
+   * we would start the connection process */
+  if (!priv->is_client) {
+    if (self->priv->connection_state == GST_DTLS_CONNECTION_STATE_NEW) {
+      priv->connection_state = GST_DTLS_CONNECTION_STATE_CONNECTING;
+      notify_state = TRUE;
+    }
+  }
+
+  ret = SSL_read (priv->ssl, data, len);
+  *written = ret >= 0 ? ret : 0;
+  GST_DEBUG_OBJECT (self, "read result: %d", ret);
+
+  flow_ret =
+      handle_error (self, ret, GST_RESOURCE_ERROR_READ, &notify_state, err);
+  if (flow_ret == GST_FLOW_EOS) {
+    self->priv->received_close_notify = TRUE;
+    if (self->priv->connection_state != GST_DTLS_CONNECTION_STATE_FAILED
+        && self->priv->connection_state != GST_DTLS_CONNECTION_STATE_CLOSED) {
+      self->priv->connection_state = GST_DTLS_CONNECTION_STATE_CLOSED;
+      notify_state = TRUE;
+    }
+    /* Notify about the connection being properly closed now if both
+     * sides did so */
+    if (self->priv->sent_close_notify && self->priv->send_callback)
+      self->priv->send_callback (self, NULL, 0, NULL);
+
+    g_mutex_unlock (&priv->mutex);
+
+    if (notify_state) {
+      g_object_notify_by_pspec (G_OBJECT (self),
+          properties[PROP_CONNECTION_STATE]);
+    }
+
+    return flow_ret;
+  } else if (flow_ret != GST_FLOW_OK) {
+    g_mutex_unlock (&priv->mutex);
+
+    if (notify_state) {
+      g_object_notify_by_pspec (G_OBJECT (self),
+          properties[PROP_CONNECTION_STATE]);
+    }
+
+    return flow_ret;
+  }
 
   log_state (self, "process after read");
 
-  openssl_poll (self);
+  flow_ret = openssl_poll (self, &notify_state, err);
 
   log_state (self, "process after poll");
 
-  GST_DEBUG_OBJECT (self, "read result: %d", result);
-
   GST_TRACE_OBJECT (self, "unlocking @ process");
   g_mutex_unlock (&priv->mutex);
 
-  return result;
+  if (notify_state) {
+    g_object_notify_by_pspec (G_OBJECT (self),
+        properties[PROP_CONNECTION_STATE]);
+  }
+
+  return flow_ret;
 }
 
-gint
-gst_dtls_connection_send (GstDtlsConnection * self, gpointer data, gint len)
+GstFlowReturn
+gst_dtls_connection_send (GstDtlsConnection * self, gconstpointer data,
+    gsize len, gsize * written, GError ** err)
 {
+  GstFlowReturn flow_ret;
   int ret = 0;
+  gboolean notify_state = FALSE;
 
   g_return_val_if_fail (GST_IS_DTLS_CONNECTION (self), 0);
 
@@ -570,20 +733,78 @@ gst_dtls_connection_send (GstDtlsConnection * self, gpointer data, gint len)
   g_mutex_lock (&self->priv->mutex);
   GST_TRACE_OBJECT (self, "locked @ send");
 
-  if (SSL_is_init_finished (self->priv->ssl)) {
+  if (self->priv->connection_state == GST_DTLS_CONNECTION_STATE_FAILED) {
+    GST_ERROR_OBJECT (self, "Had a fatal error before");
+    g_mutex_unlock (&self->priv->mutex);
+    if (err)
+      *err =
+          g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
+          "Had fatal error before");
+    return GST_FLOW_ERROR;
+  }
+
+  if (self->priv->sent_close_notify) {
+    len = 0;
+    GST_DEBUG_OBJECT (self, "Not sending new data after close_notify");
+  }
+
+  if (len == 0) {
+    if (written)
+      *written = 0;
+    GST_DEBUG_OBJECT (self, "Sending close_notify");
+    ret = SSL_shutdown (self->priv->ssl);
+    if (self->priv->connection_state != GST_DTLS_CONNECTION_STATE_CLOSED &&
+        self->priv->connection_state != GST_DTLS_CONNECTION_STATE_FAILED) {
+      self->priv->connection_state = GST_DTLS_CONNECTION_STATE_CLOSED;
+      notify_state = TRUE;
+    }
+    if (ret == 1) {
+      GST_LOG_OBJECT (self, "received peer close_notify already");
+      self->priv->received_close_notify = TRUE;
+      flow_ret = GST_FLOW_EOS;
+    } else if (ret == 0) {
+      GST_LOG_OBJECT (self, "did not receive peer close_notify yet");
+      flow_ret = GST_FLOW_OK;
+    } else {
+      flow_ret =
+          handle_error (self, ret, GST_RESOURCE_ERROR_WRITE, &notify_state,
+          err);
+    }
+  } else if (SSL_is_init_finished (self->priv->ssl)) {
+    GST_DEBUG_OBJECT (self, "sending data of %" G_GSIZE_FORMAT " B", len);
     ret = SSL_write (self->priv->ssl, data, len);
-    GST_DEBUG_OBJECT (self, "data sent: input was %d B, output is %d B", len,
-        ret);
+    if (ret <= 0) {
+      if (written)
+        *written = 0;
+      flow_ret =
+          handle_error (self, ret, GST_RESOURCE_ERROR_WRITE, &notify_state,
+          err);
+    } else {
+      if (written)
+        *written = ret;
+      flow_ret = GST_FLOW_OK;
+    }
   } else {
+    if (written)
+      *written = ret;
     GST_WARNING_OBJECT (self,
         "tried to send data before handshake was complete");
-    ret = 0;
+    if (err)
+      *err =
+          g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
+          "Tried to send data before handshake was complete");
+    flow_ret = GST_FLOW_ERROR;
   }
 
   GST_TRACE_OBJECT (self, "unlocking @ send");
   g_mutex_unlock (&self->priv->mutex);
 
-  return ret;
+  if (notify_state) {
+    g_object_notify_by_pspec (G_OBJECT (self),
+        properties[PROP_CONNECTION_STATE]);
+  }
+
+  return flow_ret;
 }
 
 /*
@@ -630,8 +851,8 @@ log_state (GstDtlsConnection * self, const gchar * str)
 #endif
 }
 
-static void
-export_srtp_keys (GstDtlsConnection * self)
+static gboolean
+export_srtp_keys (GstDtlsConnection * self, GError ** err)
 {
   typedef struct
   {
@@ -669,12 +890,26 @@ export_srtp_keys (GstDtlsConnection * self)
       NULL, 0, 0);
 
   if (!success) {
-    GST_WARNING_OBJECT (self, "failed to export srtp keys");
-    return;
+    GST_WARNING_OBJECT (self, "Failed to export SRTP keys");
+    if (err)
+      *err =
+          g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
+          "Failed to export SRTP keys");
+    return FALSE;
   }
 
   profile = SSL_get_selected_srtp_profile (self->priv->ssl);
 
+  if (!profile) {
+    GST_WARNING_OBJECT (self,
+        "No SRTP capabilities negotiated during handshake");
+    if (err)
+      *err =
+          g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
+          "No SRTP capabilities negotiated during handshake");
+    return FALSE;
+  }
+
   GST_INFO_OBJECT (self, "keys received, profile is %s", profile->name);
 
   switch (profile->id) {
@@ -687,8 +922,13 @@ export_srtp_keys (GstDtlsConnection * self)
       auth = GST_DTLS_SRTP_AUTH_HMAC_SHA1_32;
       break;
     default:
-      GST_WARNING_OBJECT (self, "invalid crypto suite set by handshake");
-      goto beach;
+      GST_WARNING_OBJECT (self,
+          "Invalid/unsupported crypto suite set by handshake");
+      if (err)
+        *err =
+            g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
+            "Invalid/unsupported crypto suite set by handshake");
+      return FALSE;
   }
 
   client_key.key = exported_keys.client_key;
@@ -708,8 +948,9 @@ export_srtp_keys (GstDtlsConnection * self)
         auth);
   }
 
-beach:
   self->priv->keys_exported = TRUE;
+
+  return TRUE;
 }
 
 static int
@@ -728,12 +969,66 @@ ssl_err_cb (const char *str, size_t len, void *u)
   return 0;
 }
 
-static void
-openssl_poll (GstDtlsConnection * self)
+static GstFlowReturn
+handle_error (GstDtlsConnection * self, int ret, GstResourceError error_type,
+    gboolean * notify_state, GError ** err)
 {
-  int ret;
   int error;
 
+  error = SSL_get_error (self->priv->ssl, ret);
+
+  switch (error) {
+    case SSL_ERROR_NONE:
+      GST_TRACE_OBJECT (self, "No error");
+      return GST_FLOW_OK;
+    case SSL_ERROR_SSL:
+      GST_ERROR_OBJECT (self, "Fatal SSL error");
+      if (self->priv->connection_state != GST_DTLS_CONNECTION_STATE_FAILED) {
+        self->priv->connection_state = GST_DTLS_CONNECTION_STATE_FAILED;
+        *notify_state = TRUE;
+      }
+      ERR_print_errors_cb (ssl_err_cb, self);
+      if (err)
+        *err =
+            g_error_new_literal (GST_RESOURCE_ERROR, error_type,
+            "Fatal SSL error");
+      return GST_FLOW_ERROR;
+    case SSL_ERROR_ZERO_RETURN:
+      GST_LOG_OBJECT (self, "Connection was closed");
+      return GST_FLOW_EOS;
+    case SSL_ERROR_WANT_READ:
+      GST_LOG_OBJECT (self, "SSL wants read");
+      return GST_FLOW_OK;
+    case SSL_ERROR_WANT_WRITE:
+      GST_LOG_OBJECT (self, "SSL wants write");
+      return GST_FLOW_OK;
+    case SSL_ERROR_SYSCALL:
+      /* OpenSSL shouldn't be making real system calls, so we can safely
+       * ignore syscall errors. System interactions should happen through
+       * our BIO.
+       */
+      GST_DEBUG_OBJECT (self, "OpenSSL reported a syscall error, ignoring.");
+      return GST_FLOW_OK;
+    default:
+      if (self->priv->connection_state != GST_DTLS_CONNECTION_STATE_FAILED) {
+        self->priv->connection_state = GST_DTLS_CONNECTION_STATE_FAILED;
+        *notify_state = TRUE;
+      }
+      GST_ERROR_OBJECT (self, "Unknown SSL error: %d, ret: %d", error, ret);
+      if (err)
+        *err =
+            g_error_new (GST_RESOURCE_ERROR, error_type,
+            "Unknown SSL error: %d, ret: %d", error, ret);
+      return GST_FLOW_ERROR;
+  }
+}
+
+static GstFlowReturn
+openssl_poll (GstDtlsConnection * self, gboolean * notify_state, GError ** err)
+{
+  int ret;
+  GstFlowReturn flow_ret;
+
   log_state (self, "poll: before handshake");
 
   ERR_clear_error ();
@@ -746,58 +1041,39 @@ openssl_poll (GstDtlsConnection * self)
       if (!self->priv->keys_exported) {
         GST_INFO_OBJECT (self,
             "handshake just completed successfully, exporting keys");
-        export_srtp_keys (self);
+
+        if (!export_srtp_keys (self, err))
+          return GST_FLOW_ERROR;
+
+        if (self->priv->connection_state != GST_DTLS_CONNECTION_STATE_FAILED
+            && self->priv->connection_state != GST_DTLS_CONNECTION_STATE_CLOSED
+            && self->priv->connection_state !=
+            GST_DTLS_CONNECTION_STATE_CONNECTED) {
+          self->priv->connection_state = GST_DTLS_CONNECTION_STATE_CONNECTED;
+          *notify_state = TRUE;
+        }
       } else {
         GST_INFO_OBJECT (self, "handshake is completed");
       }
-      return;
+      return GST_FLOW_OK;
     case 0:
       GST_DEBUG_OBJECT (self, "do_handshake encountered EOF");
       break;
     case -1:
-      GST_DEBUG_OBJECT (self, "do_handshake encountered BIO error");
+      GST_DEBUG_OBJECT (self, "do_handshake encountered potential BIO error");
       break;
     default:
       GST_DEBUG_OBJECT (self, "do_handshake returned %d", ret);
-  }
-
-  error = SSL_get_error (self->priv->ssl, ret);
-
-  switch (error) {
-    case SSL_ERROR_NONE:
-      GST_WARNING_OBJECT (self, "no error, handshake should be done");
-      break;
-    case SSL_ERROR_SSL:
-      GST_ERROR_OBJECT (self, "SSL error");
-      ERR_print_errors_cb (ssl_err_cb, self);
-      return;
-    case SSL_ERROR_WANT_READ:
-      GST_LOG_OBJECT (self, "SSL wants read");
-      break;
-    case SSL_ERROR_WANT_WRITE:
-      GST_LOG_OBJECT (self, "SSL wants write");
-      break;
-    case SSL_ERROR_SYSCALL:{
-      gchar message[1024] = "<unknown>";
-      gint syserror;
-#ifdef G_OS_WIN32
-      syserror = WSAGetLastError ();
-      FormatMessage (FORMAT_MESSAGE_FROM_SYSTEM, NULL, syserror, 0, message,
-          sizeof message, NULL);
-#else
-      syserror = errno;
-      strerror_r (syserror, message, sizeof message);
-#endif
-      GST_CAT_LEVEL_LOG (GST_CAT_DEFAULT,
-          syserror != 0 ? GST_LEVEL_WARNING : GST_LEVEL_LOG,
-          self, "SSL syscall error: errno %d: %s", syserror, message);
       break;
-    }
-    default:
-      GST_WARNING_OBJECT (self, "Unknown SSL error: %d, ret: %d", error, ret);
   }
 
+  flow_ret =
+      handle_error (self, ret, GST_RESOURCE_ERROR_OPEN_WRITE, notify_state,
+      err);
+
   ERR_print_errors_cb (ssl_warn_cb, self);
+
+  return flow_ret;
 }
 
 static int
@@ -903,27 +1179,15 @@ static int
 bio_method_write (BIO * bio, const char *data, int size)
 {
   GstDtlsConnection *self = GST_DTLS_CONNECTION (BIO_get_data (bio));
+  gboolean ret = TRUE;
 
   GST_LOG_OBJECT (self, "BIO: writing %d", size);
 
-  if (self->priv->send_closure) {
-    GValue values[3] = { G_VALUE_INIT };
-
-    g_value_init (&values[0], GST_TYPE_DTLS_CONNECTION);
-    g_value_set_object (&values[0], self);
-
-    g_value_init (&values[1], G_TYPE_POINTER);
-    g_value_set_pointer (&values[1], (gpointer) data);
+  if (self->priv->send_callback)
+    ret = self->priv->send_callback (self, data, size,
+        self->priv->send_callback_user_data);
 
-    g_value_init (&values[2], G_TYPE_INT);
-    g_value_set_int (&values[2], size);
-
-    g_closure_invoke (self->priv->send_closure, NULL, 3, values, NULL);
-
-    g_value_unset (&values[0]);
-  }
-
-  return size;
+  return ret ? size : -1;
 }
 
 static int
@@ -989,7 +1253,7 @@ bio_method_ctrl (BIO * bio, int cmd, long arg1, void *arg2)
       GST_LOG_OBJECT (self, "BIO: EOF reset");
       return 1;
     case BIO_CTRL_EOF:{
-      gint eof = !(priv->bio_buffer_len - priv->bio_buffer_offset);
+      gint eof = priv->is_alive == FALSE;
       GST_LOG_OBJECT (self, "BIO: EOF query returned %d", eof);
       return eof;
     }
@@ -1038,3 +1302,24 @@ bio_method_free (BIO * bio)
   GST_LOG_OBJECT (GST_DTLS_CONNECTION (BIO_get_data (bio)), "BIO free");
   return 0;
 }
+
+GType
+gst_dtls_connection_state_get_type (void)
+{
+  static GType type = 0;
+  static const GEnumValue values[] = {
+    {GST_DTLS_CONNECTION_STATE_NEW, "New connection", "new"},
+    {GST_DTLS_CONNECTION_STATE_CLOSED, "Closed connection on either side",
+        "closed"},
+    {GST_DTLS_CONNECTION_STATE_FAILED, "Failed connection", "failed"},
+    {GST_DTLS_CONNECTION_STATE_CONNECTING, "Connecting", "connecting"},
+    {GST_DTLS_CONNECTION_STATE_CONNECTED, "Successfully connected",
+        "connected"},
+    {0, NULL, NULL},
+  };
+
+  if (!type) {
+    type = g_enum_register_static ("GstDtlsConnectionState", values);
+  }
+  return type;
+}
index 6260b93..b590486 100644 (file)
@@ -26,7 +26,7 @@
 #ifndef gstdtlsconnection_h
 #define gstdtlsconnection_h
 
-#include <glib-object.h>
+#include <gst/gst.h>
 
 G_BEGIN_DECLS
 
@@ -65,6 +65,18 @@ typedef enum {
 
 #define GST_DTLS_SRTP_MASTER_KEY_LENGTH 30
 
+typedef enum
+{
+  GST_DTLS_CONNECTION_STATE_NEW,
+  GST_DTLS_CONNECTION_STATE_CLOSED,
+  GST_DTLS_CONNECTION_STATE_FAILED,
+  GST_DTLS_CONNECTION_STATE_CONNECTING,
+  GST_DTLS_CONNECTION_STATE_CONNECTED,
+} GstDtlsConnectionState;
+
+GType gst_dtls_connection_state_get_type (void);
+#define GST_DTLS_TYPE_CONNECTION_STATE (gst_dtls_connection_state_get_type ())
+
 /*
  * GstDtlsConnection:
  *
@@ -84,7 +96,7 @@ struct _GstDtlsConnectionClass {
 
 GType gst_dtls_connection_get_type(void) G_GNUC_CONST;
 
-void gst_dtls_connection_start(GstDtlsConnection *, gboolean is_client);
+gboolean gst_dtls_connection_start(GstDtlsConnection *, gboolean is_client, GError **err);
 void gst_dtls_connection_check_timeout(GstDtlsConnection *);
 
 /*
@@ -98,25 +110,40 @@ void gst_dtls_connection_stop(GstDtlsConnection *);
  */
 void gst_dtls_connection_close(GstDtlsConnection *);
 
+
+typedef gboolean (*GstDtlsConnectionSendCallback) (GstDtlsConnection * connection, gconstpointer data, gsize length, gpointer user_data);
+
 /*
- * Sets the closure that will be called whenever data needs to be sent.
- *
- * The closure will get called with the following arguments:
- * void cb(GstDtlsConnection *, gpointer data, gint length, gpointer user_data)
+ * Sets the callback that will be called whenever data needs to be sent.
  */
-void gst_dtls_connection_set_send_callback(GstDtlsConnection *, GClosure *);
+void gst_dtls_connection_set_send_callback(GstDtlsConnection *, GstDtlsConnectionSendCallback, gpointer, GDestroyNotify);
 
 /*
- * Processes data that has been recevied, the transformation is done in-place.
- * Returns the length of the plaintext data that was decoded, if no data is available, 0<= will be returned.
+ * Processes data that has been received, the transformation is done in-place.
+ *
+ * Returns:
+ *   - GST_FLOW_EOS if the receive side of the DTLS connection was closed by
+ *     the peer, i.e. close_notify was sent by the peer
+ *   - GST_FLOW_ERROR + err if an error happened
+ *   - GST_FLOW_OK + written >= 0 if processing was successful. ptr then
+ *     contains the decoded bytes
  */
-gint gst_dtls_connection_process(GstDtlsConnection *, gpointer ptr, gint len);
+GstFlowReturn gst_dtls_connection_process(GstDtlsConnection *, gpointer ptr, gsize len, gsize *written, GError **err);
 
 /*
- * If the DTLS handshake is completed this function will encode the given data.
- * Returns the length of the data sent, or 0 if the DTLS handshake is not completed.
+ * Will encode and send the given data.
+ *
+ * Sending with len == 0 will close the send side of the DTLS connection and
+ * no further data can be sent anymore in the future. This will also send the
+ * close_notify to the peer.
+ *
+ * Returns:
+ *   - GST_FLOW_EOS if the send side of the DTLS connection was closed, i.e.
+ *     we received an EOS before.
+ *   - GST_FLOW_ERROR + err if an error happened
+ *   - GST_FLOW_OK + written >= 0 if processing was successful
  */
-gint gst_dtls_connection_send(GstDtlsConnection *, gpointer ptr, gint len);
+GstFlowReturn gst_dtls_connection_send(GstDtlsConnection *, gconstpointer ptr, gsize len, gsize *written, GError **err);
 
 G_END_DECLS
 
index e41914e..b1d5615 100644 (file)
@@ -63,10 +63,10 @@ enum
   PROP_CONNECTION_ID,
   PROP_PEM,
   PROP_PEER_PEM,
-
   PROP_DECODER_KEY,
   PROP_SRTP_CIPHER,
   PROP_SRTP_AUTH,
+  PROP_CONNECTION_STATE,
   NUM_PROPERTIES
 };
 
@@ -128,8 +128,7 @@ gst_dtls_dec_class_init (GstDtlsDecClass * klass)
 
   signals[SIGNAL_ON_KEY_RECEIVED] =
       g_signal_new ("on-key-received", G_TYPE_FROM_CLASS (klass),
-      G_SIGNAL_RUN_LAST, 0, NULL, NULL,
-      g_cclosure_marshal_generic, G_TYPE_NONE, 0);
+      G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
 
   properties[PROP_CONNECTION_ID] =
       g_param_spec_string ("connection-id",
@@ -141,7 +140,12 @@ gst_dtls_dec_class_init (GstDtlsDecClass * klass)
       g_param_spec_string ("pem",
       "PEM string",
       "A string containing a X509 certificate and RSA private key in PEM format",
-      DEFAULT_PEM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+      DEFAULT_PEM,
+#ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK
+      G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | GST_PARAM_DOC_SHOW_DEFAULT);
+#else
+      G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+#endif
 
   properties[PROP_PEER_PEM] =
       g_param_spec_string ("peer-pem",
@@ -171,6 +175,13 @@ gst_dtls_dec_class_init (GstDtlsDecClass * klass)
       0, GST_DTLS_SRTP_AUTH_HMAC_SHA1_80, DEFAULT_SRTP_AUTH,
       G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
 
+  properties[PROP_CONNECTION_STATE] =
+      g_param_spec_enum ("connection-state",
+      "Connection State",
+      "Current connection state",
+      GST_DTLS_TYPE_CONNECTION_STATE,
+      GST_DTLS_CONNECTION_STATE_NEW, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
+
   g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
 
   gst_element_class_add_static_pad_template (element_class, &src_template);
@@ -301,6 +312,13 @@ gst_dtls_dec_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_SRTP_AUTH:
       g_value_set_uint (value, self->srtp_auth);
       break;
+    case PROP_CONNECTION_STATE:
+      if (self->connection)
+        g_object_get_property (G_OBJECT (self->connection), "connection-state",
+            value);
+      else
+        g_value_set_enum (value, GST_DTLS_CONNECTION_STATE_CLOSED);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
   }
@@ -452,62 +470,105 @@ on_peer_certificate_received (GstDtlsConnection * connection, gchar * pem,
   return TRUE;
 }
 
-static gint
+static GstFlowReturn
 process_buffer (GstDtlsDec * self, GstBuffer * buffer)
 {
+  GstFlowReturn flow_ret;
   GstMapInfo map_info;
-  gint size;
+  GError *err = NULL;
+  gsize written = 0;
 
   if (!gst_buffer_map (buffer, &map_info, GST_MAP_READWRITE))
-    return 0;
+    return GST_FLOW_ERROR;
 
   if (!map_info.size) {
     gst_buffer_unmap (buffer, &map_info);
-    return 0;
+    return GST_FLOW_ERROR;
   }
 
-  size =
+  flow_ret =
       gst_dtls_connection_process (self->connection, map_info.data,
-      map_info.size);
+      map_info.size, &written, &err);
   gst_buffer_unmap (buffer, &map_info);
 
-  if (size <= 0)
-    return size;
-
-  gst_buffer_set_size (buffer, size);
+  switch (flow_ret) {
+    case GST_FLOW_OK:
+      GST_LOG_OBJECT (self,
+          "Decoded buffer of size %" G_GSIZE_FORMAT " B to %" G_GSIZE_FORMAT,
+          map_info.size, written);
+      gst_buffer_set_size (buffer, written);
+      break;
+    case GST_FLOW_EOS:
+      gst_buffer_set_size (buffer, written);
+      GST_DEBUG_OBJECT (self, "Peer closed the connection");
+      break;
+    case GST_FLOW_ERROR:
+      GST_ERROR_OBJECT (self, "Error processing buffer: %s", err->message);
+      GST_ELEMENT_ERROR (self, RESOURCE, READ, (NULL), ("%s", err->message));
+      g_clear_error (&err);
+      break;
+    default:
+      g_assert_not_reached ();
+  }
+  g_assert (err == NULL);
 
-  return size;
+  return flow_ret;
 }
 
+typedef struct
+{
+  GstDtlsDec *self;
+  GstFlowReturn flow_ret;
+  guint processed;
+} ProcessListData;
+
 static gboolean
 process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
 {
-  GstDtlsDec *self = GST_DTLS_DEC (user_data);
-  gint size;
+  ProcessListData *process_list_data = user_data;
+  GstDtlsDec *self = GST_DTLS_DEC (process_list_data->self);
+  GstFlowReturn flow_ret;
 
   *buffer = gst_buffer_make_writable (*buffer);
-  size = process_buffer (self, *buffer);
-  if (size <= 0)
+  flow_ret = process_buffer (self, *buffer);
+
+  process_list_data->flow_ret = flow_ret;
+  if (gst_buffer_get_size (*buffer) == 0)
     gst_buffer_replace (buffer, NULL);
+  else if (flow_ret != GST_FLOW_ERROR)
+    process_list_data->processed++;
 
-  return TRUE;
+  return flow_ret == GST_FLOW_OK;
 }
 
 static GstFlowReturn
 sink_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
 {
   GstDtlsDec *self = GST_DTLS_DEC (parent);
-  GstFlowReturn ret = GST_FLOW_OK;
   GstPad *other_pad;
+  ProcessListData process_list_data = { self, GST_FLOW_OK, 0 };
 
   list = gst_buffer_list_make_writable (list);
-  gst_buffer_list_foreach (list, process_buffer_from_list, self);
+  gst_buffer_list_foreach (list, process_buffer_from_list, &process_list_data);
+
+  /* If we successfully processed at least some buffers then forward those */
+  if (process_list_data.flow_ret != GST_FLOW_OK
+      && process_list_data.processed == 0) {
+    GST_ERROR_OBJECT (self, "Failed to process buffer list: %s",
+        gst_flow_get_name (process_list_data.flow_ret));
+    gst_buffer_list_unref (list);
+    return process_list_data.flow_ret;
+  }
+
+  /* Remove all buffers after the first one that failed to be processed */
+  gst_buffer_list_remove (list, process_list_data.processed,
+      gst_buffer_list_length (list) - process_list_data.processed);
 
   if (gst_buffer_list_length (list) == 0) {
     GST_DEBUG_OBJECT (self, "Not produced any buffers");
     gst_buffer_list_unref (list);
 
-    return GST_FLOW_OK;
+    return process_list_data.flow_ret;
   }
 
   g_mutex_lock (&self->src_mutex);
@@ -517,17 +578,25 @@ sink_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
   g_mutex_unlock (&self->src_mutex);
 
   if (other_pad) {
-    GST_LOG_OBJECT (self, "decoded buffer list with length %u, pushing",
+    gboolean was_eos = process_list_data.flow_ret == GST_FLOW_EOS;
+
+    GST_LOG_OBJECT (self, "pushing buffer list with length %u",
         gst_buffer_list_length (list));
-    ret = gst_pad_push_list (other_pad, list);
+    process_list_data.flow_ret = gst_pad_push_list (other_pad, list);
+
+    /* If the peer closed the connection, signal that we're done here now */
+    if (was_eos)
+      gst_pad_push_event (other_pad, gst_event_new_eos ());
+
     gst_object_unref (other_pad);
   } else {
-    GST_LOG_OBJECT (self, "dropped buffer list with length %d, not linked",
+    GST_LOG_OBJECT (self,
+        "dropping buffer list with length %d, have no source pad",
         gst_buffer_list_length (list));
     gst_buffer_list_unref (list);
   }
 
-  return ret;
+  return process_list_data.flow_ret;
 }
 
 static GstFlowReturn
@@ -535,7 +604,6 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 {
   GstDtlsDec *self = GST_DTLS_DEC (parent);
   GstFlowReturn ret = GST_FLOW_OK;
-  gint size;
   GstPad *other_pad;
 
   if (!self->agent) {
@@ -548,12 +616,12 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
       self->connection_id, gst_buffer_get_size (buffer));
 
   buffer = gst_buffer_make_writable (buffer);
-  size = process_buffer (self, buffer);
-
-  if (size <= 0) {
+  ret = process_buffer (self, buffer);
+  if (ret == GST_FLOW_ERROR) {
+    GST_ERROR_OBJECT (self, "Failed to process buffer: %s",
+        gst_flow_get_name (ret));
     gst_buffer_unref (buffer);
-
-    return GST_FLOW_OK;
+    return ret;
   }
 
   g_mutex_lock (&self->src_mutex);
@@ -563,11 +631,25 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   g_mutex_unlock (&self->src_mutex);
 
   if (other_pad) {
-    GST_LOG_OBJECT (self, "decoded buffer with length %d, pushing", size);
-    ret = gst_pad_push (other_pad, buffer);
+    gboolean was_eos = (ret == GST_FLOW_EOS);
+
+    if (gst_buffer_get_size (buffer) > 0) {
+      GST_LOG_OBJECT (self, "pushing buffer");
+      ret = gst_pad_push (other_pad, buffer);
+    } else {
+      gst_buffer_unref (buffer);
+    }
+
+    /* If the peer closed the connection, signal that we're done here now */
+    if (was_eos) {
+      gst_pad_push_event (other_pad, gst_event_new_eos ());
+      if (ret == GST_FLOW_OK)
+        ret = GST_FLOW_EOS;
+    }
+
     gst_object_unref (other_pad);
   } else {
-    GST_LOG_OBJECT (self, "dropped buffer with length %d, not linked", size);
+    GST_LOG_OBJECT (self, "dropping buffer, have no source pad");
     gst_buffer_unref (buffer);
   }
 
@@ -680,12 +762,23 @@ gst_dtls_dec_fetch_connection (gchar * id)
 }
 
 static void
+on_connection_state_changed (GObject * object, GParamSpec * pspec,
+    gpointer user_data)
+{
+  GstDtlsDec *self = GST_DTLS_DEC (user_data);
+
+  g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_CONNECTION_STATE]);
+}
+
+static void
 create_connection (GstDtlsDec * self, gchar * id)
 {
   g_return_if_fail (GST_IS_DTLS_DEC (self));
   g_return_if_fail (GST_IS_DTLS_AGENT (self->agent));
 
   if (self->connection) {
+    g_signal_handlers_disconnect_by_func (self->connection,
+        on_connection_state_changed, self);
     g_object_unref (self->connection);
     self->connection = NULL;
   }
@@ -705,6 +798,10 @@ create_connection (GstDtlsDec * self, gchar * id)
 
   self->connection =
       g_object_new (GST_TYPE_DTLS_CONNECTION, "agent", self->agent, NULL);
+  g_signal_connect_object (self->connection,
+      "notify::connection-state", G_CALLBACK (on_connection_state_changed),
+      self, 0);
+  on_connection_state_changed (NULL, NULL, self);
 
   g_object_weak_ref (G_OBJECT (self->connection),
       (GWeakNotify) connection_weak_ref_notify, g_strdup (id));
index 5b3bb26..e64ee4d 100644 (file)
@@ -62,10 +62,10 @@ enum
   PROP_0,
   PROP_CONNECTION_ID,
   PROP_IS_CLIENT,
-
   PROP_ENCODER_KEY,
   PROP_SRTP_CIPHER,
   PROP_SRTP_AUTH,
+  PROP_CONNECTION_STATE,
   NUM_PROPERTIES
 };
 
@@ -100,8 +100,8 @@ static gboolean sink_event (GstPad * pad, GstObject * parent, GstEvent * event);
 
 static void on_key_received (GstDtlsConnection *, gpointer key, guint cipher,
     guint auth, GstDtlsEnc *);
-static void on_send_data (GstDtlsConnection *, gconstpointer data, gint length,
-    GstDtlsEnc *);
+static gboolean on_send_data (GstDtlsConnection *, gconstpointer data,
+    gsize length, GstDtlsEnc *);
 
 static void
 gst_dtls_enc_class_init (GstDtlsEncClass * klass)
@@ -122,8 +122,7 @@ gst_dtls_enc_class_init (GstDtlsEncClass * klass)
 
   signals[SIGNAL_ON_KEY_RECEIVED] =
       g_signal_new ("on-key-received", G_TYPE_FROM_CLASS (klass),
-      G_SIGNAL_RUN_LAST, 0, NULL, NULL,
-      g_cclosure_marshal_generic, G_TYPE_NONE, 0);
+      G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
 
   properties[PROP_CONNECTION_ID] =
       g_param_spec_string ("connection-id",
@@ -161,6 +160,13 @@ gst_dtls_enc_class_init (GstDtlsEncClass * klass)
       0, GST_DTLS_SRTP_AUTH_HMAC_SHA1_80, DEFAULT_SRTP_AUTH,
       G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
 
+  properties[PROP_CONNECTION_STATE] =
+      g_param_spec_enum ("connection-state",
+      "Connection State",
+      "Current connection state",
+      GST_DTLS_TYPE_CONNECTION_STATE,
+      GST_DTLS_CONNECTION_STATE_NEW, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
+
   g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
 
   gst_element_class_add_static_pad_template (element_class, &src_template);
@@ -270,11 +276,27 @@ gst_dtls_enc_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_SRTP_AUTH:
       g_value_set_uint (value, self->srtp_auth);
       break;
+    case PROP_CONNECTION_STATE:
+      if (self->connection)
+        g_object_get_property (G_OBJECT (self->connection), "connection-state",
+            value);
+      else
+        g_value_set_enum (value, GST_DTLS_CONNECTION_STATE_CLOSED);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
   }
 }
 
+static void
+on_connection_state_changed (GObject * object, GParamSpec * pspec,
+    gpointer user_data)
+{
+  GstDtlsEnc *self = GST_DTLS_ENC (user_data);
+
+  g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_CONNECTION_STATE]);
+}
+
 static GstStateChangeReturn
 gst_dtls_enc_change_state (GstElement * element, GstStateChange transition)
 {
@@ -295,9 +317,13 @@ gst_dtls_enc_change_state (GstElement * element, GstStateChange transition)
 
         g_signal_connect_object (self->connection,
             "on-encoder-key", G_CALLBACK (on_key_received), self, 0);
+        g_signal_connect_object (self->connection,
+            "notify::connection-state",
+            G_CALLBACK (on_connection_state_changed), self, 0);
+        on_connection_state_changed (NULL, NULL, self);
 
         gst_dtls_connection_set_send_callback (self->connection,
-            g_cclosure_new (G_CALLBACK (on_send_data), self, NULL));
+            (GstDtlsConnectionSendCallback) on_send_data, self, NULL);
       } else {
         GST_WARNING_OBJECT (self,
             "trying to change state to ready without connection id");
@@ -314,7 +340,8 @@ gst_dtls_enc_change_state (GstElement * element, GstStateChange transition)
 
       if (self->connection) {
         gst_dtls_connection_close (self->connection);
-        gst_dtls_connection_set_send_callback (self->connection, NULL);
+        gst_dtls_connection_set_send_callback (self->connection, NULL, NULL,
+            NULL);
         g_object_unref (self->connection);
         self->connection = NULL;
       }
@@ -326,10 +353,17 @@ gst_dtls_enc_change_state (GstElement * element, GstStateChange transition)
   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
 
   switch (transition) {
-    case GST_STATE_CHANGE_READY_TO_PAUSED:
+    case GST_STATE_CHANGE_READY_TO_PAUSED:{
+      GError *err = NULL;
+
       GST_DEBUG_OBJECT (self, "starting connection %s", self->connection_id);
-      gst_dtls_connection_start (self->connection, self->is_client);
+      if (!gst_dtls_connection_start (self->connection, self->is_client, &err)) {
+        GST_ELEMENT_ERROR (self, RESOURCE, OPEN_WRITE, (NULL), ("%s",
+                err->message));
+        g_clear_error (&err);
+      }
       break;
+    }
     default:
       break;
   }
@@ -378,6 +412,7 @@ src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
     GST_DEBUG_OBJECT (self, "src pad activating in push mode");
 
     self->flushing = FALSE;
+    self->src_ret = GST_FLOW_OK;
     self->send_initial_events = TRUE;
     success =
         gst_pad_start_task (pad, (GstTaskFunction) src_task_loop, self->src,
@@ -392,6 +427,7 @@ src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
     g_queue_foreach (&self->queue, (GFunc) gst_buffer_unref, NULL);
     g_queue_clear (&self->queue);
     self->flushing = TRUE;
+    self->src_ret = GST_FLOW_FLUSHING;
     g_cond_signal (&self->queue_cond_add);
     g_mutex_unlock (&self->queue_lock);
     success = gst_pad_stop_task (pad);
@@ -458,13 +494,24 @@ src_task_loop (GstPad * pad)
 
   GST_TRACE_OBJECT (self, "src loop: releasing lock");
 
-  ret = gst_pad_push (self->src, buffer);
-  if (check_connection_timeout)
-    gst_dtls_connection_check_timeout (self->connection);
+  if (buffer) {
+    ret = gst_pad_push (self->src, buffer);
+    if (check_connection_timeout)
+      gst_dtls_connection_check_timeout (self->connection);
 
-  if (G_UNLIKELY (ret != GST_FLOW_OK)) {
-    GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s",
-        gst_flow_get_name (ret));
+    if (G_UNLIKELY (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
+      GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s",
+          gst_flow_get_name (ret));
+    }
+    g_mutex_lock (&self->queue_lock);
+    self->src_ret = ret;
+    g_mutex_unlock (&self->queue_lock);
+  } else {
+    GST_DEBUG_OBJECT (self, "Peer and us closed the connection, sending EOS");
+    gst_pad_push_event (self->src, gst_event_new_eos ());
+    g_mutex_lock (&self->queue_lock);
+    self->src_ret = GST_FLOW_EOS;
+    g_mutex_unlock (&self->queue_lock);
   }
 }
 
@@ -473,32 +520,67 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 {
   GstDtlsEnc *self = GST_DTLS_ENC (parent);
   GstMapInfo map_info;
-  gint ret;
+  GError *err = NULL;
+  gsize to_write, written = 0;
+  GstFlowReturn ret = GST_FLOW_OK;
+
+  g_mutex_lock (&self->queue_lock);
+  if (self->src_ret != GST_FLOW_OK) {
+    if (G_UNLIKELY (self->src_ret == GST_FLOW_NOT_LINKED
+            || self->src_ret < GST_FLOW_EOS))
+      GST_ERROR_OBJECT (self, "Pushing previous data returned an error: %s",
+          gst_flow_get_name (self->src_ret));
+
+    gst_buffer_unref (buffer);
+    g_mutex_unlock (&self->queue_lock);
+    return self->src_ret;
+  }
+  g_mutex_unlock (&self->queue_lock);
 
   gst_buffer_map (buffer, &map_info, GST_MAP_READ);
 
-  if (map_info.size) {
+  to_write = map_info.size;
+
+  while (to_write > 0 && ret == GST_FLOW_OK) {
     ret =
         gst_dtls_connection_send (self->connection, map_info.data,
-        map_info.size);
-    if (ret != map_info.size) {
-      GST_WARNING_OBJECT (self,
-          "error sending data: %d B were written, expected value was %"
-          G_GSIZE_FORMAT " B", ret, map_info.size);
+        map_info.size, &written, &err);
+
+    switch (ret) {
+      case GST_FLOW_OK:
+        GST_DEBUG_OBJECT (self,
+            "Wrote %" G_GSIZE_FORMAT " B of %" G_GSIZE_FORMAT " B", written,
+            map_info.size);
+        g_assert (written <= to_write);
+        to_write -= written;
+        break;
+      case GST_FLOW_EOS:
+        GST_INFO_OBJECT (self, "Received data after the connection was closed");
+        break;
+      case GST_FLOW_ERROR:
+        GST_WARNING_OBJECT (self, "error sending data: %s", err->message);
+        GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL), ("%s", err->message));
+        g_clear_error (&err);
+        break;
+      default:
+        g_assert_not_reached ();
+        break;
     }
+
+    g_assert (err == NULL);
   }
 
   gst_buffer_unmap (buffer, &map_info);
-
   gst_buffer_unref (buffer);
 
-  return GST_FLOW_OK;
+  return ret;
 }
 
 
 static gboolean
 sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 {
+  GstDtlsEnc *self = GST_DTLS_ENC (parent);
   gboolean ret = FALSE;
 
   switch (GST_EVENT_TYPE (event)) {
@@ -510,6 +592,28 @@ sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       gst_event_unref (event);
       ret = TRUE;
       break;
+    case GST_EVENT_EOS:{
+      GstFlowReturn flow_ret;
+
+      /* Close the write side of the connection now */
+      flow_ret =
+          gst_dtls_connection_send (self->connection, NULL, 0, NULL, NULL);
+
+      if (flow_ret != GST_FLOW_OK)
+        GST_ERROR_OBJECT (self, "Failed to send close_notify");
+
+      /* Do not forward the EOS event unless the peer already closed to the
+       * connection itself. If it didn't yet then we'll later get the send
+       * callback called with no data and send EOS from there */
+      if (flow_ret == GST_FLOW_EOS) {
+        ret = gst_pad_event_default (pad, parent, event);
+      } else {
+        gst_event_unref (event);
+        ret = TRUE;
+      }
+
+      break;
+    }
     default:
       ret = gst_pad_event_default (pad, parent, event);
       break;
@@ -548,16 +652,18 @@ on_key_received (GstDtlsConnection * connection, gpointer key, guint cipher,
   g_signal_emit (self, signals[SIGNAL_ON_KEY_RECEIVED], 0);
 }
 
-static void
-on_send_data (GstDtlsConnection * connection, gconstpointer data, gint length,
+static gboolean
+on_send_data (GstDtlsConnection * connection, gconstpointer data, gsize length,
     GstDtlsEnc * self)
 {
   GstBuffer *buffer;
+  gboolean ret;
 
-  GST_DEBUG_OBJECT (self, "sending data from %s with length %d",
+  GST_DEBUG_OBJECT (self, "sending data from %s with length %" G_GSIZE_FORMAT,
       self->connection_id, length);
 
-  buffer = gst_buffer_new_wrapped (g_memdup (data, length), length);
+  buffer =
+      data ? gst_buffer_new_wrapped (g_memdup (data, length), length) : NULL;
 
   GST_TRACE_OBJECT (self, "send data: acquiring lock");
   g_mutex_lock (&self->queue_lock);
@@ -569,5 +675,9 @@ on_send_data (GstDtlsConnection * connection, gconstpointer data, gint length,
   g_cond_signal (&self->queue_cond_add);
 
   GST_TRACE_OBJECT (self, "send data: releasing lock");
+
+  ret = self->src_ret == GST_FLOW_OK;
   g_mutex_unlock (&self->queue_lock);
+
+  return ret;
 }
index 622936f..070a75e 100644 (file)
@@ -46,6 +46,7 @@ struct _GstDtlsEnc {
     GstElement element;
 
     GstPad *src;
+    GstFlowReturn src_ret;
 
     GQueue queue;
     GMutex queue_lock;
index a4104ff..ddbcda8 100644 (file)
@@ -121,6 +121,10 @@ gst_dtls_srtp_bin_class_init (GstDtlsSrtpBinClass * klass)
       G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS);
 
   g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
+
+#ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK
+  gst_type_mark_as_plugin_api (GST_TYPE_DTLS_SRTP_BIN, 0);
+#endif
 }
 
 static void
index 9836210..da03bd1 100644 (file)
@@ -28,7 +28,6 @@
 #endif
 
 #include "gstdtlssrtpdec.h"
-
 #include "gstdtlsconnection.h"
 
 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
@@ -69,6 +68,7 @@ enum
   PROP_0,
   PROP_PEM,
   PROP_PEER_PEM,
+  PROP_CONNECTION_STATE,
   NUM_PROPERTIES
 };
 
@@ -123,7 +123,12 @@ gst_dtls_srtp_dec_class_init (GstDtlsSrtpDecClass * klass)
       g_param_spec_string ("pem",
       "PEM string",
       "A string containing a X509 certificate and RSA private key in PEM format",
-      DEFAULT_PEM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+      DEFAULT_PEM,
+#ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK
+      G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | GST_PARAM_DOC_SHOW_DEFAULT);
+#else
+      G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+#endif
 
   properties[PROP_PEER_PEM] =
       g_param_spec_string ("peer-pem",
@@ -131,6 +136,13 @@ gst_dtls_srtp_dec_class_init (GstDtlsSrtpDecClass * klass)
       "The X509 certificate received in the DTLS handshake, in PEM format",
       DEFAULT_PEER_PEM, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
 
+  properties[PROP_CONNECTION_STATE] =
+      g_param_spec_enum ("connection-state",
+      "Connection State",
+      "Current connection state",
+      GST_DTLS_TYPE_CONNECTION_STATE,
+      GST_DTLS_CONNECTION_STATE_NEW, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
+
   g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
 
   gst_element_class_add_static_pad_template (element_class, &sink_template);
@@ -146,6 +158,15 @@ gst_dtls_srtp_dec_class_init (GstDtlsSrtpDecClass * klass)
 }
 
 static void
+on_connection_state_changed (GObject * object, GParamSpec * pspec,
+    gpointer user_data)
+{
+  GstDtlsSrtpDec *self = GST_DTLS_SRTP_DEC (user_data);
+
+  g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_CONNECTION_STATE]);
+}
+
+static void
 gst_dtls_srtp_dec_init (GstDtlsSrtpDec * self)
 {
   GstElementClass *klass = GST_ELEMENT_GET_CLASS (GST_ELEMENT (self));
@@ -225,6 +246,8 @@ gst_dtls_srtp_dec_init (GstDtlsSrtpDec * self)
       G_CALLBACK (on_decoder_request_key), self);
   g_signal_connect (self->bin.dtls_element, "notify::peer-pem",
       G_CALLBACK (on_peer_pem), self);
+  g_signal_connect (self->bin.dtls_element, "notify::connection-state",
+      G_CALLBACK (on_connection_state_changed), self);
 }
 
 static void
@@ -268,6 +291,10 @@ gst_dtls_srtp_dec_get_property (GObject * object,
         GST_WARNING_OBJECT (self, "tried to get peer-pem after disabling DTLS");
       }
       break;
+    case PROP_CONNECTION_STATE:
+      g_object_get_property (G_OBJECT (self->bin.dtls_element),
+          "connection-state", value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
   }
index 237aec5..283ad9d 100644 (file)
@@ -28,6 +28,7 @@
 #endif
 
 #include "gstdtlssrtpenc.h"
+#include "gstdtlsconnection.h"
 
 #include <stdio.h>
 
@@ -76,12 +77,15 @@ enum
 {
   PROP_0,
   PROP_IS_CLIENT,
+  PROP_CONNECTION_STATE,
+  PROP_RTP_SYNC,
   NUM_PROPERTIES
 };
 
 static GParamSpec *properties[NUM_PROPERTIES];
 
 #define DEFAULT_IS_CLIENT FALSE
+#define DEFAULT_RTP_SYNC FALSE
 
 static gboolean transform_enum (GBinding *, const GValue * source_value,
     GValue * target_value, GEnumClass *);
@@ -126,8 +130,7 @@ gst_dtls_srtp_enc_class_init (GstDtlsSrtpEncClass * klass)
 
   signals[SIGNAL_ON_KEY_SET] =
       g_signal_new ("on-key-set", G_TYPE_FROM_CLASS (klass),
-      G_SIGNAL_RUN_LAST, 0, NULL, NULL,
-      g_cclosure_marshal_generic, G_TYPE_NONE, 0);
+      G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
 
   properties[PROP_IS_CLIENT] =
       g_param_spec_boolean ("is-client",
@@ -137,6 +140,19 @@ gst_dtls_srtp_enc_class_init (GstDtlsSrtpEncClass * klass)
       DEFAULT_IS_CLIENT,
       GST_PARAM_MUTABLE_READY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
 
+  properties[PROP_CONNECTION_STATE] =
+      g_param_spec_enum ("connection-state",
+      "Connection State",
+      "Current connection state",
+      GST_DTLS_TYPE_CONNECTION_STATE,
+      GST_DTLS_CONNECTION_STATE_NEW, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
+
+
+  properties[PROP_RTP_SYNC] =
+      g_param_spec_boolean ("rtp-sync", "Synchronize RTP",
+      "Synchronize RTP to the pipeline clock before merging with RTCP",
+      DEFAULT_RTP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
   g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
 
   gst_element_class_add_static_pad_template (element_class, &rtp_sink_template);
@@ -154,6 +170,15 @@ gst_dtls_srtp_enc_class_init (GstDtlsSrtpEncClass * klass)
 }
 
 static void
+on_connection_state_changed (GObject * object, GParamSpec * pspec,
+    gpointer user_data)
+{
+  GstDtlsSrtpEnc *self = GST_DTLS_SRTP_ENC (user_data);
+
+  g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_CONNECTION_STATE]);
+}
+
+static void
 gst_dtls_srtp_enc_init (GstDtlsSrtpEnc * self)
 {
   GstElementClass *klass = GST_ELEMENT_GET_CLASS (GST_ELEMENT (self));
@@ -161,15 +186,20 @@ gst_dtls_srtp_enc_init (GstDtlsSrtpEnc * self)
   gboolean ret;
 
 /*
-                 +--------------------+     +-----------------+
-     rtp_sink-R-o|rtp_sink     rtp_src|o-R-o|                 |
-                 |       srtpenc      |     |                 |
-    rtcp_sink-R-o|srtcp_sink  rtcp_src|o-R-o|                 |
-                 +--------------------+     |     funnel      |o---src
-                                            |                 |
-                 +--------------------+     |                 |
-    data_sink-R-o|       dtlsenc      |o---o|                 |
-                 +--------------------+     +-----------------+
+                 +--------------------+    +--------------+     +-----------------+
+     rtp_sink-R-o|rtp_sink     rtp_src|o-R-o   clocksync  |o-R-o|                 |
+                 |       srtpenc      |    +--------------+     |                 |
+                 |                    |                         |                 |
+    rtcp_sink-R-o|srtcp_sink  rtcp_src|o-----------R-----------o|                 |
+                 +--------------------+                         |     funnel      |o---src
+                                                                |                 |
+                 +--------------------+                         |                 |
+    data_sink-R-o|       dtlsenc      |o-----------------------o|                 |
+                 +--------------------+                         +-----------------+
+
+    The clocksync element is tied to the sync property. If sync=true, RTP output will be
+    synchronised to the clock, so it doesn't slow down RTCP traffic by being synched later
+    in the pipeline
 */
 
   self->srtp_enc = gst_element_factory_make ("srtpenc", NULL);
@@ -216,6 +246,9 @@ gst_dtls_srtp_enc_init (GstDtlsSrtpEnc * self)
 
   g_object_set (self->srtp_enc, "random-key", TRUE, NULL);
 
+  g_signal_connect (self->bin.dtls_element, "notify::connection-state",
+      G_CALLBACK (on_connection_state_changed), self);
+
   g_object_bind_property (G_OBJECT (self), "key", self->srtp_enc, "key",
       G_BINDING_DEFAULT);
   g_object_bind_property_full (G_OBJECT (self), "srtp-cipher", self->srtp_enc,
@@ -269,6 +302,9 @@ gst_dtls_srtp_enc_set_property (GObject * object,
             "tried to set is-client after disabling DTLS");
       }
       break;
+    case PROP_RTP_SYNC:
+      self->rtp_sync = g_value_get_boolean (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
   }
@@ -290,6 +326,13 @@ gst_dtls_srtp_enc_get_property (GObject * object,
             "tried to get is-client after disabling DTLS");
       }
       break;
+    case PROP_CONNECTION_STATE:
+      g_object_get_property (G_OBJECT (self->bin.dtls_element),
+          "connection-state", value);
+      break;
+    case PROP_RTP_SYNC:
+      g_value_set_boolean (value, self->rtp_sync);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
   }
@@ -332,13 +375,32 @@ gst_dtls_srtp_enc_request_new_pad (GstElement * element,
   g_return_val_if_fail (self->srtp_enc, NULL);
 
   if (templ == gst_element_class_get_pad_template (klass, "rtp_sink_%d")) {
+    gchar *clocksync_name;
+    GstElement *clocksync;
+
+    sscanf (name, "rtp_sink_%d", &pad_n);
+
+    clocksync_name = g_strdup_printf ("clocksync_%d", pad_n);
+    clocksync = gst_element_factory_make ("clocksync", clocksync_name);
+    g_free (clocksync_name);
+
+    if (clocksync == NULL) {
+      goto fail_create;
+    }
+
+    g_object_bind_property (self, "rtp-sync", clocksync, "sync",
+        G_BINDING_SYNC_CREATE);
+
+    gst_bin_add (GST_BIN (self), clocksync);
+    gst_element_sync_state_with_parent (clocksync);
+
     target_pad = gst_element_get_request_pad (self->srtp_enc, name);
     g_return_val_if_fail (target_pad, NULL);
 
-    sscanf (GST_PAD_NAME (target_pad), "rtp_sink_%d", &pad_n);
     srtp_src_name = g_strdup_printf ("rtp_src_%d", pad_n);
 
-    gst_element_link_pads (self->srtp_enc, srtp_src_name, self->funnel, NULL);
+    gst_element_link_pads (self->srtp_enc, srtp_src_name, clocksync, NULL);
+    gst_element_link_pads (clocksync, "src", self->funnel, NULL);
 
     g_free (srtp_src_name);
 
@@ -376,6 +438,11 @@ gst_dtls_srtp_enc_request_new_pad (GstElement * element,
   }
 
   return ghost_pad;
+
+fail_create:
+  GST_ELEMENT_ERROR (self, CORE, MISSING_PLUGIN, NULL,
+      ("%s", "Failed to create internal clocksync element"));
+  return NULL;
 }
 
 static void
index 5dd603d..a2a86b5 100644 (file)
@@ -44,6 +44,8 @@ typedef struct _GstDtlsSrtpEncClass GstDtlsSrtpEncClass;
 struct _GstDtlsSrtpEnc {
     GstDtlsSrtpBin bin;
 
+    gboolean rtp_sync;
+
     GstElement *srtp_enc;
     GstElement *funnel;
 };
index ab91f2d..74babae 100644 (file)
@@ -24,4 +24,5 @@ if openssl_dep.found() and libcrypto_dep.found()
     install_dir : plugins_install_dir,
   )
   pkgconfig.generate(gstdtls, install_dir : plugins_pkgconfig_install_dir)
+  plugins += [gstdtls]
 endif
index e531eff..1dfc758 100644 (file)
 static gboolean
 plugin_init (GstPlugin * plugin)
 {
+#ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK
+  gst_type_mark_as_plugin_api (GST_DTLS_TYPE_CONNECTION_STATE, 0);
+#endif
+
   return gst_element_register (plugin, "dtlsenc", GST_RANK_NONE,
       GST_TYPE_DTLS_ENC)
       && gst_element_register (plugin, "dtlsdec", GST_RANK_NONE,