Files in ext/dtls/ are updated.
Change-Id: I14a943cf0210610c4e8ee3da31d03bd4f47944dd
Signed-off-by: Sangchul Lee <sc11.lee@samsung.com>
}
}
+static const gchar base64_alphabet[64] = {
+ 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
+ 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
+ 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
+ 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '+', '/'
+};
+
static void
init_generated (GstDtlsCertificate * self)
{
GstDtlsCertificatePrivate *priv = self->priv;
RSA *rsa;
+ BIGNUM *serial_number;
+ ASN1_INTEGER *asn1_serial_number;
X509_NAME *name = NULL;
+ gchar common_name[9] = { 0, };
+ gint i;
g_return_if_fail (!priv->x509);
g_return_if_fail (!priv->private_key);
rsa = NULL;
X509_set_version (priv->x509, 2);
- ASN1_INTEGER_set (X509_get_serialNumber (priv->x509), 0);
+
+ /* Set a random 64 bit integer as serial number */
+ serial_number = BN_new ();
+ BN_pseudo_rand (serial_number, 64, 0, 0);
+ asn1_serial_number = X509_get_serialNumber (priv->x509);
+ BN_to_ASN1_INTEGER (serial_number, asn1_serial_number);
+ BN_free (serial_number);
+
+ /* Set a random 8 byte base64 string as issuer/subject */
+ name = X509_NAME_new ();
+ for (i = 0; i < 8; i++)
+ common_name[i] =
+ base64_alphabet[g_random_int_range (0, G_N_ELEMENTS (base64_alphabet))];
+ X509_NAME_add_entry_by_NID (name, NID_commonName, MBSTRING_ASC,
+ (const guchar *) common_name, -1, -1, 0);
+ X509_set_subject_name (priv->x509, name);
+ X509_set_issuer_name (priv->x509, name);
+ X509_NAME_free (name);
+
+ /* Set expiry in a year */
X509_gmtime_adj (X509_getm_notBefore (priv->x509), 0);
X509_gmtime_adj (X509_getm_notAfter (priv->x509), 31536000L); /* A year */
X509_set_pubkey (priv->x509, priv->private_key);
- name = X509_get_subject_name (priv->x509);
- X509_NAME_add_entry_by_txt (name, "C", MBSTRING_ASC, (unsigned char *) "SE",
- -1, -1, 0);
- X509_NAME_add_entry_by_txt (name, "CN", MBSTRING_ASC,
- (unsigned char *) "OpenWebRTC", -1, -1, 0);
- X509_set_issuer_name (priv->x509, name);
- name = NULL;
-
if (!X509_sign (priv->x509, priv->private_key, EVP_sha256 ())) {
GST_WARNING_OBJECT (self, "failed to sign certificate");
EVP_PKEY_free (priv->private_key);
{
PROP_0,
PROP_AGENT,
+ PROP_CONNECTION_STATE,
NUM_PROPERTIES
};
gboolean is_alive;
gboolean keys_exported;
+ GstDtlsConnectionState connection_state;
+ gboolean sent_close_notify;
+ gboolean received_close_notify;
+
GMutex mutex;
GCond condition;
gpointer bio_buffer;
gint bio_buffer_len;
gint bio_buffer_offset;
- GClosure *send_closure;
+ GstDtlsConnectionSendCallback send_callback;
+ gpointer send_callback_user_data;
+ GDestroyNotify send_callback_destroy_notify;
gboolean timeout_pending;
GThreadPool *thread_pool;
static void gst_dtls_connection_finalize (GObject * gobject);
static void gst_dtls_connection_set_property (GObject *, guint prop_id,
const GValue *, GParamSpec *);
+static void gst_dtls_connection_get_property (GObject *, guint prop_id,
+ GValue *, GParamSpec *);
static void log_state (GstDtlsConnection *, const gchar * str);
-static void export_srtp_keys (GstDtlsConnection *);
-static void openssl_poll (GstDtlsConnection *);
+static gboolean export_srtp_keys (GstDtlsConnection *, GError ** err);
+static GstFlowReturn openssl_poll (GstDtlsConnection *, gboolean * notify_state,
+ GError ** err);
+static GstFlowReturn handle_error (GstDtlsConnection * self, int ret,
+ GstResourceError error_type, gboolean * notify_state, GError ** err);
static int openssl_verify_callback (int preverify_ok,
X509_STORE_CTX * x509_ctx);
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
gobject_class->set_property = gst_dtls_connection_set_property;
+ gobject_class->get_property = gst_dtls_connection_get_property;
connection_ex_index =
SSL_get_ex_new_index (0, (gpointer) "gstdtlsagent connection index", NULL,
signals[SIGNAL_ON_DECODER_KEY] =
g_signal_new ("on-decoder-key", G_TYPE_FROM_CLASS (klass),
- G_SIGNAL_RUN_LAST, 0, NULL, NULL,
- g_cclosure_marshal_generic, G_TYPE_NONE, 3,
- G_TYPE_POINTER, G_TYPE_UINT, G_TYPE_UINT);
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
+ G_TYPE_NONE, 3, G_TYPE_POINTER, G_TYPE_UINT, G_TYPE_UINT);
signals[SIGNAL_ON_ENCODER_KEY] =
g_signal_new ("on-encoder-key", G_TYPE_FROM_CLASS (klass),
- G_SIGNAL_RUN_LAST, 0, NULL, NULL,
- g_cclosure_marshal_generic, G_TYPE_NONE, 3,
- G_TYPE_POINTER, G_TYPE_UINT, G_TYPE_UINT);
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
+ G_TYPE_NONE, 3, G_TYPE_POINTER, G_TYPE_UINT, G_TYPE_UINT);
signals[SIGNAL_ON_PEER_CERTIFICATE] =
g_signal_new ("on-peer-certificate", G_TYPE_FROM_CLASS (klass),
- G_SIGNAL_RUN_LAST, 0, NULL, NULL,
- g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_STRING);
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_BOOLEAN, 1, G_TYPE_STRING);
properties[PROP_AGENT] =
g_param_spec_object ("agent",
GST_TYPE_DTLS_AGENT,
G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS);
+ properties[PROP_CONNECTION_STATE] =
+ g_param_spec_enum ("connection-state",
+ "Connection State",
+ "Current connection state",
+ GST_DTLS_TYPE_CONNECTION_STATE,
+ GST_DTLS_CONNECTION_STATE_NEW, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
+
g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
_gst_dtls_init_openssl ();
priv->ssl = NULL;
priv->bio = NULL;
- priv->send_closure = NULL;
-
priv->is_client = FALSE;
priv->is_alive = TRUE;
priv->keys_exported = FALSE;
SSL_free (priv->ssl);
priv->ssl = NULL;
- if (priv->send_closure) {
- g_closure_unref (priv->send_closure);
- priv->send_closure = NULL;
- }
+ if (priv->send_callback_destroy_notify)
+ priv->send_callback_destroy_notify (priv->send_callback_user_data);
g_mutex_clear (&priv->mutex);
g_cond_clear (&priv->condition);
}
}
-void
-gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client)
+static void
+gst_dtls_connection_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstDtlsConnection *self = GST_DTLS_CONNECTION (object);
+ GstDtlsConnectionPrivate *priv = self->priv;
+
+ switch (prop_id) {
+ case PROP_CONNECTION_STATE:
+ g_mutex_lock (&priv->mutex);
+ g_value_set_enum (value, priv->connection_state);
+ g_mutex_unlock (&priv->mutex);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
+ }
+}
+
+gboolean
+gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client,
+ GError ** err)
{
GstDtlsConnectionPrivate *priv;
+ gboolean ret;
+ gboolean notify_state = FALSE;
priv = self->priv;
- g_return_if_fail (priv->send_closure);
- g_return_if_fail (priv->ssl);
- g_return_if_fail (priv->bio);
+ g_return_val_if_fail (priv->send_callback, FALSE);
+ g_return_val_if_fail (priv->ssl, FALSE);
+ g_return_val_if_fail (priv->bio, FALSE);
GST_TRACE_OBJECT (self, "locking @ start");
g_mutex_lock (&priv->mutex);
priv->bio_buffer_offset = 0;
priv->keys_exported = FALSE;
+ priv->sent_close_notify = FALSE;
+ priv->received_close_notify = FALSE;
+
+ /* Client immediately starts connecting, the server waits for a client to
+ * start the handshake process */
priv->is_client = is_client;
if (priv->is_client) {
+ priv->connection_state = GST_DTLS_CONNECTION_STATE_CONNECTING;
+ notify_state = TRUE;
SSL_set_connect_state (priv->ssl);
} else {
+ if (priv->connection_state != GST_DTLS_CONNECTION_STATE_NEW) {
+ priv->connection_state = GST_DTLS_CONNECTION_STATE_NEW;
+ notify_state = TRUE;
+ }
SSL_set_accept_state (priv->ssl);
}
log_state (self, "initial state set");
- openssl_poll (self);
+ ret = openssl_poll (self, ¬ify_state, err);
+ if (ret == GST_FLOW_EOS && err) {
+ *err =
+ g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_WRITE,
+ "Connection closed");
+ }
log_state (self, "first poll done");
GST_TRACE_OBJECT (self, "unlocking @ start");
g_mutex_unlock (&priv->mutex);
+
+ if (notify_state) {
+ g_object_notify_by_pspec (G_OBJECT (self),
+ properties[PROP_CONNECTION_STATE]);
+ }
+
+ return ret == GST_FLOW_OK;
}
static void
GstDtlsConnection *self = user_data;
GstDtlsConnectionPrivate *priv;
gint ret;
+ gboolean notify_state = FALSE;
priv = self->priv;
GST_WARNING_OBJECT (self, "handling timeout failed");
} else if (ret > 0) {
log_state (self, "handling timeout before poll");
- openssl_poll (self);
+ openssl_poll (self, ¬ify_state, NULL);
log_state (self, "handling timeout after poll");
}
}
g_mutex_unlock (&priv->mutex);
+
+ if (notify_state) {
+ g_object_notify_by_pspec (G_OBJECT (self),
+ properties[PROP_CONNECTION_STATE]);
+ }
}
static gboolean
void
gst_dtls_connection_stop (GstDtlsConnection * self)
{
+ gboolean notify_state = FALSE;
+
g_return_if_fail (GST_IS_DTLS_CONNECTION (self));
g_return_if_fail (self->priv->ssl);
g_return_if_fail (self->priv->bio);
GST_TRACE_OBJECT (self, "locked @ stop");
self->priv->is_alive = FALSE;
+ if (self->priv->connection_state != GST_DTLS_CONNECTION_STATE_FAILED
+ && self->priv->connection_state != GST_DTLS_CONNECTION_STATE_CLOSED) {
+ self->priv->connection_state = GST_DTLS_CONNECTION_STATE_CLOSED;
+ notify_state = TRUE;
+ }
GST_TRACE_OBJECT (self, "signaling @ stop");
g_cond_signal (&self->priv->condition);
GST_TRACE_OBJECT (self, "signaled @ stop");
g_mutex_unlock (&self->priv->mutex);
GST_DEBUG_OBJECT (self, "stopped connection");
+
+ if (notify_state) {
+ g_object_notify_by_pspec (G_OBJECT (self),
+ properties[PROP_CONNECTION_STATE]);
+ }
}
void
gst_dtls_connection_close (GstDtlsConnection * self)
{
+ gboolean notify_state = FALSE;
+
g_return_if_fail (GST_IS_DTLS_CONNECTION (self));
g_return_if_fail (self->priv->ssl);
g_return_if_fail (self->priv->bio);
g_cond_signal (&self->priv->condition);
}
+ if (self->priv->connection_state != GST_DTLS_CONNECTION_STATE_FAILED
+ && self->priv->connection_state != GST_DTLS_CONNECTION_STATE_CLOSED) {
+ self->priv->connection_state = GST_DTLS_CONNECTION_STATE_CLOSED;
+ notify_state = TRUE;
+ }
+
GST_TRACE_OBJECT (self, "unlocking @ close");
g_mutex_unlock (&self->priv->mutex);
GST_DEBUG_OBJECT (self, "closed connection");
+
+ if (notify_state) {
+ g_object_notify_by_pspec (G_OBJECT (self),
+ properties[PROP_CONNECTION_STATE]);
+ }
}
void
gst_dtls_connection_set_send_callback (GstDtlsConnection * self,
- GClosure * closure)
+ GstDtlsConnectionSendCallback callback, gpointer user_data,
+ GDestroyNotify destroy_notify)
{
+ GstDtlsConnectionPrivate *priv;
+
g_return_if_fail (GST_IS_DTLS_CONNECTION (self));
+ priv = self->priv;
+
GST_TRACE_OBJECT (self, "locking @ set_send_callback");
- g_mutex_lock (&self->priv->mutex);
+ g_mutex_lock (&priv->mutex);
GST_TRACE_OBJECT (self, "locked @ set_send_callback");
- if (self->priv->send_closure) {
- g_closure_unref (self->priv->send_closure);
- self->priv->send_closure = NULL;
- }
- self->priv->send_closure = closure;
-
- if (closure && G_CLOSURE_NEEDS_MARSHAL (closure)) {
- g_closure_set_marshal (closure, g_cclosure_marshal_generic);
- }
+ if (priv->send_callback_destroy_notify)
+ priv->send_callback_destroy_notify (priv->send_callback_user_data);
+ priv->send_callback = callback;
+ priv->send_callback_user_data = user_data;
+ priv->send_callback_destroy_notify = destroy_notify;
GST_TRACE_OBJECT (self, "unlocking @ set_send_callback");
- g_mutex_unlock (&self->priv->mutex);
+ g_mutex_unlock (&priv->mutex);
}
-gint
-gst_dtls_connection_process (GstDtlsConnection * self, gpointer data, gint len)
+GstFlowReturn
+gst_dtls_connection_process (GstDtlsConnection * self, gpointer data, gsize len,
+ gsize * written, GError ** err)
{
+ GstFlowReturn flow_ret = GST_FLOW_OK;
GstDtlsConnectionPrivate *priv;
- gint result;
+ int ret;
+ gboolean notify_state = FALSE;
g_return_val_if_fail (GST_IS_DTLS_CONNECTION (self), 0);
g_return_val_if_fail (self->priv->ssl, 0);
g_mutex_lock (&priv->mutex);
GST_TRACE_OBJECT (self, "locked @ process");
+ if (self->priv->received_close_notify
+ || self->priv->connection_state == GST_DTLS_CONNECTION_STATE_CLOSED) {
+ GST_DEBUG_OBJECT (self, "Already received close_notify");
+ g_mutex_unlock (&priv->mutex);
+ return GST_FLOW_EOS;
+ }
+
+ if (self->priv->connection_state == GST_DTLS_CONNECTION_STATE_FAILED) {
+ GST_ERROR_OBJECT (self, "Had a fatal error before");
+ g_mutex_unlock (&priv->mutex);
+ if (err)
+ *err =
+ g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
+ "Had fatal error before");
+ return GST_FLOW_ERROR;
+ }
+
g_warn_if_fail (!priv->bio_buffer);
priv->bio_buffer = data;
log_state (self, "process start");
if (SSL_want_write (priv->ssl)) {
- openssl_poll (self);
+ flow_ret = openssl_poll (self, ¬ify_state, err);
log_state (self, "process want write, after poll");
+ if (flow_ret != GST_FLOW_OK) {
+ g_mutex_unlock (&priv->mutex);
+ return flow_ret;
+ }
}
- result = SSL_read (priv->ssl, data, len);
+ /* If we're a server and were in new state then by receiving the first data
+ * we would start the connection process */
+ if (!priv->is_client) {
+ if (self->priv->connection_state == GST_DTLS_CONNECTION_STATE_NEW) {
+ priv->connection_state = GST_DTLS_CONNECTION_STATE_CONNECTING;
+ notify_state = TRUE;
+ }
+ }
+
+ ret = SSL_read (priv->ssl, data, len);
+ *written = ret >= 0 ? ret : 0;
+ GST_DEBUG_OBJECT (self, "read result: %d", ret);
+
+ flow_ret =
+ handle_error (self, ret, GST_RESOURCE_ERROR_READ, ¬ify_state, err);
+ if (flow_ret == GST_FLOW_EOS) {
+ self->priv->received_close_notify = TRUE;
+ if (self->priv->connection_state != GST_DTLS_CONNECTION_STATE_FAILED
+ && self->priv->connection_state != GST_DTLS_CONNECTION_STATE_CLOSED) {
+ self->priv->connection_state = GST_DTLS_CONNECTION_STATE_CLOSED;
+ notify_state = TRUE;
+ }
+ /* Notify about the connection being properly closed now if both
+ * sides did so */
+ if (self->priv->sent_close_notify && self->priv->send_callback)
+ self->priv->send_callback (self, NULL, 0, NULL);
+
+ g_mutex_unlock (&priv->mutex);
+
+ if (notify_state) {
+ g_object_notify_by_pspec (G_OBJECT (self),
+ properties[PROP_CONNECTION_STATE]);
+ }
+
+ return flow_ret;
+ } else if (flow_ret != GST_FLOW_OK) {
+ g_mutex_unlock (&priv->mutex);
+
+ if (notify_state) {
+ g_object_notify_by_pspec (G_OBJECT (self),
+ properties[PROP_CONNECTION_STATE]);
+ }
+
+ return flow_ret;
+ }
log_state (self, "process after read");
- openssl_poll (self);
+ flow_ret = openssl_poll (self, ¬ify_state, err);
log_state (self, "process after poll");
- GST_DEBUG_OBJECT (self, "read result: %d", result);
-
GST_TRACE_OBJECT (self, "unlocking @ process");
g_mutex_unlock (&priv->mutex);
- return result;
+ if (notify_state) {
+ g_object_notify_by_pspec (G_OBJECT (self),
+ properties[PROP_CONNECTION_STATE]);
+ }
+
+ return flow_ret;
}
-gint
-gst_dtls_connection_send (GstDtlsConnection * self, gpointer data, gint len)
+GstFlowReturn
+gst_dtls_connection_send (GstDtlsConnection * self, gconstpointer data,
+ gsize len, gsize * written, GError ** err)
{
+ GstFlowReturn flow_ret;
int ret = 0;
+ gboolean notify_state = FALSE;
g_return_val_if_fail (GST_IS_DTLS_CONNECTION (self), 0);
g_mutex_lock (&self->priv->mutex);
GST_TRACE_OBJECT (self, "locked @ send");
- if (SSL_is_init_finished (self->priv->ssl)) {
+ if (self->priv->connection_state == GST_DTLS_CONNECTION_STATE_FAILED) {
+ GST_ERROR_OBJECT (self, "Had a fatal error before");
+ g_mutex_unlock (&self->priv->mutex);
+ if (err)
+ *err =
+ g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
+ "Had fatal error before");
+ return GST_FLOW_ERROR;
+ }
+
+ if (self->priv->sent_close_notify) {
+ len = 0;
+ GST_DEBUG_OBJECT (self, "Not sending new data after close_notify");
+ }
+
+ if (len == 0) {
+ if (written)
+ *written = 0;
+ GST_DEBUG_OBJECT (self, "Sending close_notify");
+ ret = SSL_shutdown (self->priv->ssl);
+ if (self->priv->connection_state != GST_DTLS_CONNECTION_STATE_CLOSED &&
+ self->priv->connection_state != GST_DTLS_CONNECTION_STATE_FAILED) {
+ self->priv->connection_state = GST_DTLS_CONNECTION_STATE_CLOSED;
+ notify_state = TRUE;
+ }
+ if (ret == 1) {
+ GST_LOG_OBJECT (self, "received peer close_notify already");
+ self->priv->received_close_notify = TRUE;
+ flow_ret = GST_FLOW_EOS;
+ } else if (ret == 0) {
+ GST_LOG_OBJECT (self, "did not receive peer close_notify yet");
+ flow_ret = GST_FLOW_OK;
+ } else {
+ flow_ret =
+ handle_error (self, ret, GST_RESOURCE_ERROR_WRITE, ¬ify_state,
+ err);
+ }
+ } else if (SSL_is_init_finished (self->priv->ssl)) {
+ GST_DEBUG_OBJECT (self, "sending data of %" G_GSIZE_FORMAT " B", len);
ret = SSL_write (self->priv->ssl, data, len);
- GST_DEBUG_OBJECT (self, "data sent: input was %d B, output is %d B", len,
- ret);
+ if (ret <= 0) {
+ if (written)
+ *written = 0;
+ flow_ret =
+ handle_error (self, ret, GST_RESOURCE_ERROR_WRITE, ¬ify_state,
+ err);
+ } else {
+ if (written)
+ *written = ret;
+ flow_ret = GST_FLOW_OK;
+ }
} else {
+ if (written)
+ *written = ret;
GST_WARNING_OBJECT (self,
"tried to send data before handshake was complete");
- ret = 0;
+ if (err)
+ *err =
+ g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
+ "Tried to send data before handshake was complete");
+ flow_ret = GST_FLOW_ERROR;
}
GST_TRACE_OBJECT (self, "unlocking @ send");
g_mutex_unlock (&self->priv->mutex);
- return ret;
+ if (notify_state) {
+ g_object_notify_by_pspec (G_OBJECT (self),
+ properties[PROP_CONNECTION_STATE]);
+ }
+
+ return flow_ret;
}
/*
#endif
}
-static void
-export_srtp_keys (GstDtlsConnection * self)
+static gboolean
+export_srtp_keys (GstDtlsConnection * self, GError ** err)
{
typedef struct
{
NULL, 0, 0);
if (!success) {
- GST_WARNING_OBJECT (self, "failed to export srtp keys");
- return;
+ GST_WARNING_OBJECT (self, "Failed to export SRTP keys");
+ if (err)
+ *err =
+ g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
+ "Failed to export SRTP keys");
+ return FALSE;
}
profile = SSL_get_selected_srtp_profile (self->priv->ssl);
+ if (!profile) {
+ GST_WARNING_OBJECT (self,
+ "No SRTP capabilities negotiated during handshake");
+ if (err)
+ *err =
+ g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
+ "No SRTP capabilities negotiated during handshake");
+ return FALSE;
+ }
+
GST_INFO_OBJECT (self, "keys received, profile is %s", profile->name);
switch (profile->id) {
auth = GST_DTLS_SRTP_AUTH_HMAC_SHA1_32;
break;
default:
- GST_WARNING_OBJECT (self, "invalid crypto suite set by handshake");
- goto beach;
+ GST_WARNING_OBJECT (self,
+ "Invalid/unsupported crypto suite set by handshake");
+ if (err)
+ *err =
+ g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
+ "Invalid/unsupported crypto suite set by handshake");
+ return FALSE;
}
client_key.key = exported_keys.client_key;
auth);
}
-beach:
self->priv->keys_exported = TRUE;
+
+ return TRUE;
}
static int
return 0;
}
-static void
-openssl_poll (GstDtlsConnection * self)
+static GstFlowReturn
+handle_error (GstDtlsConnection * self, int ret, GstResourceError error_type,
+ gboolean * notify_state, GError ** err)
{
- int ret;
int error;
+ error = SSL_get_error (self->priv->ssl, ret);
+
+ switch (error) {
+ case SSL_ERROR_NONE:
+ GST_TRACE_OBJECT (self, "No error");
+ return GST_FLOW_OK;
+ case SSL_ERROR_SSL:
+ GST_ERROR_OBJECT (self, "Fatal SSL error");
+ if (self->priv->connection_state != GST_DTLS_CONNECTION_STATE_FAILED) {
+ self->priv->connection_state = GST_DTLS_CONNECTION_STATE_FAILED;
+ *notify_state = TRUE;
+ }
+ ERR_print_errors_cb (ssl_err_cb, self);
+ if (err)
+ *err =
+ g_error_new_literal (GST_RESOURCE_ERROR, error_type,
+ "Fatal SSL error");
+ return GST_FLOW_ERROR;
+ case SSL_ERROR_ZERO_RETURN:
+ GST_LOG_OBJECT (self, "Connection was closed");
+ return GST_FLOW_EOS;
+ case SSL_ERROR_WANT_READ:
+ GST_LOG_OBJECT (self, "SSL wants read");
+ return GST_FLOW_OK;
+ case SSL_ERROR_WANT_WRITE:
+ GST_LOG_OBJECT (self, "SSL wants write");
+ return GST_FLOW_OK;
+ case SSL_ERROR_SYSCALL:
+ /* OpenSSL shouldn't be making real system calls, so we can safely
+ * ignore syscall errors. System interactions should happen through
+ * our BIO.
+ */
+ GST_DEBUG_OBJECT (self, "OpenSSL reported a syscall error, ignoring.");
+ return GST_FLOW_OK;
+ default:
+ if (self->priv->connection_state != GST_DTLS_CONNECTION_STATE_FAILED) {
+ self->priv->connection_state = GST_DTLS_CONNECTION_STATE_FAILED;
+ *notify_state = TRUE;
+ }
+ GST_ERROR_OBJECT (self, "Unknown SSL error: %d, ret: %d", error, ret);
+ if (err)
+ *err =
+ g_error_new (GST_RESOURCE_ERROR, error_type,
+ "Unknown SSL error: %d, ret: %d", error, ret);
+ return GST_FLOW_ERROR;
+ }
+}
+
+static GstFlowReturn
+openssl_poll (GstDtlsConnection * self, gboolean * notify_state, GError ** err)
+{
+ int ret;
+ GstFlowReturn flow_ret;
+
log_state (self, "poll: before handshake");
ERR_clear_error ();
if (!self->priv->keys_exported) {
GST_INFO_OBJECT (self,
"handshake just completed successfully, exporting keys");
- export_srtp_keys (self);
+
+ if (!export_srtp_keys (self, err))
+ return GST_FLOW_ERROR;
+
+ if (self->priv->connection_state != GST_DTLS_CONNECTION_STATE_FAILED
+ && self->priv->connection_state != GST_DTLS_CONNECTION_STATE_CLOSED
+ && self->priv->connection_state !=
+ GST_DTLS_CONNECTION_STATE_CONNECTED) {
+ self->priv->connection_state = GST_DTLS_CONNECTION_STATE_CONNECTED;
+ *notify_state = TRUE;
+ }
} else {
GST_INFO_OBJECT (self, "handshake is completed");
}
- return;
+ return GST_FLOW_OK;
case 0:
GST_DEBUG_OBJECT (self, "do_handshake encountered EOF");
break;
case -1:
- GST_DEBUG_OBJECT (self, "do_handshake encountered BIO error");
+ GST_DEBUG_OBJECT (self, "do_handshake encountered potential BIO error");
break;
default:
GST_DEBUG_OBJECT (self, "do_handshake returned %d", ret);
- }
-
- error = SSL_get_error (self->priv->ssl, ret);
-
- switch (error) {
- case SSL_ERROR_NONE:
- GST_WARNING_OBJECT (self, "no error, handshake should be done");
- break;
- case SSL_ERROR_SSL:
- GST_ERROR_OBJECT (self, "SSL error");
- ERR_print_errors_cb (ssl_err_cb, self);
- return;
- case SSL_ERROR_WANT_READ:
- GST_LOG_OBJECT (self, "SSL wants read");
- break;
- case SSL_ERROR_WANT_WRITE:
- GST_LOG_OBJECT (self, "SSL wants write");
- break;
- case SSL_ERROR_SYSCALL:{
- gchar message[1024] = "<unknown>";
- gint syserror;
-#ifdef G_OS_WIN32
- syserror = WSAGetLastError ();
- FormatMessage (FORMAT_MESSAGE_FROM_SYSTEM, NULL, syserror, 0, message,
- sizeof message, NULL);
-#else
- syserror = errno;
- strerror_r (syserror, message, sizeof message);
-#endif
- GST_CAT_LEVEL_LOG (GST_CAT_DEFAULT,
- syserror != 0 ? GST_LEVEL_WARNING : GST_LEVEL_LOG,
- self, "SSL syscall error: errno %d: %s", syserror, message);
break;
- }
- default:
- GST_WARNING_OBJECT (self, "Unknown SSL error: %d, ret: %d", error, ret);
}
+ flow_ret =
+ handle_error (self, ret, GST_RESOURCE_ERROR_OPEN_WRITE, notify_state,
+ err);
+
ERR_print_errors_cb (ssl_warn_cb, self);
+
+ return flow_ret;
}
static int
bio_method_write (BIO * bio, const char *data, int size)
{
GstDtlsConnection *self = GST_DTLS_CONNECTION (BIO_get_data (bio));
+ gboolean ret = TRUE;
GST_LOG_OBJECT (self, "BIO: writing %d", size);
- if (self->priv->send_closure) {
- GValue values[3] = { G_VALUE_INIT };
-
- g_value_init (&values[0], GST_TYPE_DTLS_CONNECTION);
- g_value_set_object (&values[0], self);
-
- g_value_init (&values[1], G_TYPE_POINTER);
- g_value_set_pointer (&values[1], (gpointer) data);
+ if (self->priv->send_callback)
+ ret = self->priv->send_callback (self, data, size,
+ self->priv->send_callback_user_data);
- g_value_init (&values[2], G_TYPE_INT);
- g_value_set_int (&values[2], size);
-
- g_closure_invoke (self->priv->send_closure, NULL, 3, values, NULL);
-
- g_value_unset (&values[0]);
- }
-
- return size;
+ return ret ? size : -1;
}
static int
GST_LOG_OBJECT (self, "BIO: EOF reset");
return 1;
case BIO_CTRL_EOF:{
- gint eof = !(priv->bio_buffer_len - priv->bio_buffer_offset);
+ gint eof = priv->is_alive == FALSE;
GST_LOG_OBJECT (self, "BIO: EOF query returned %d", eof);
return eof;
}
GST_LOG_OBJECT (GST_DTLS_CONNECTION (BIO_get_data (bio)), "BIO free");
return 0;
}
+
+GType
+gst_dtls_connection_state_get_type (void)
+{
+ static GType type = 0;
+ static const GEnumValue values[] = {
+ {GST_DTLS_CONNECTION_STATE_NEW, "New connection", "new"},
+ {GST_DTLS_CONNECTION_STATE_CLOSED, "Closed connection on either side",
+ "closed"},
+ {GST_DTLS_CONNECTION_STATE_FAILED, "Failed connection", "failed"},
+ {GST_DTLS_CONNECTION_STATE_CONNECTING, "Connecting", "connecting"},
+ {GST_DTLS_CONNECTION_STATE_CONNECTED, "Successfully connected",
+ "connected"},
+ {0, NULL, NULL},
+ };
+
+ if (!type) {
+ type = g_enum_register_static ("GstDtlsConnectionState", values);
+ }
+ return type;
+}
#ifndef gstdtlsconnection_h
#define gstdtlsconnection_h
-#include <glib-object.h>
+#include <gst/gst.h>
G_BEGIN_DECLS
#define GST_DTLS_SRTP_MASTER_KEY_LENGTH 30
+typedef enum
+{
+ GST_DTLS_CONNECTION_STATE_NEW,
+ GST_DTLS_CONNECTION_STATE_CLOSED,
+ GST_DTLS_CONNECTION_STATE_FAILED,
+ GST_DTLS_CONNECTION_STATE_CONNECTING,
+ GST_DTLS_CONNECTION_STATE_CONNECTED,
+} GstDtlsConnectionState;
+
+GType gst_dtls_connection_state_get_type (void);
+#define GST_DTLS_TYPE_CONNECTION_STATE (gst_dtls_connection_state_get_type ())
+
/*
* GstDtlsConnection:
*
GType gst_dtls_connection_get_type(void) G_GNUC_CONST;
-void gst_dtls_connection_start(GstDtlsConnection *, gboolean is_client);
+gboolean gst_dtls_connection_start(GstDtlsConnection *, gboolean is_client, GError **err);
void gst_dtls_connection_check_timeout(GstDtlsConnection *);
/*
*/
void gst_dtls_connection_close(GstDtlsConnection *);
+
+typedef gboolean (*GstDtlsConnectionSendCallback) (GstDtlsConnection * connection, gconstpointer data, gsize length, gpointer user_data);
+
/*
- * Sets the closure that will be called whenever data needs to be sent.
- *
- * The closure will get called with the following arguments:
- * void cb(GstDtlsConnection *, gpointer data, gint length, gpointer user_data)
+ * Sets the callback that will be called whenever data needs to be sent.
*/
-void gst_dtls_connection_set_send_callback(GstDtlsConnection *, GClosure *);
+void gst_dtls_connection_set_send_callback(GstDtlsConnection *, GstDtlsConnectionSendCallback, gpointer, GDestroyNotify);
/*
- * Processes data that has been recevied, the transformation is done in-place.
- * Returns the length of the plaintext data that was decoded, if no data is available, 0<= will be returned.
+ * Processes data that has been received, the transformation is done in-place.
+ *
+ * Returns:
+ * - GST_FLOW_EOS if the receive side of the DTLS connection was closed by
+ * the peer, i.e. close_notify was sent by the peer
+ * - GST_FLOW_ERROR + err if an error happened
+ * - GST_FLOW_OK + written >= 0 if processing was successful. ptr then
+ * contains the decoded bytes
*/
-gint gst_dtls_connection_process(GstDtlsConnection *, gpointer ptr, gint len);
+GstFlowReturn gst_dtls_connection_process(GstDtlsConnection *, gpointer ptr, gsize len, gsize *written, GError **err);
/*
- * If the DTLS handshake is completed this function will encode the given data.
- * Returns the length of the data sent, or 0 if the DTLS handshake is not completed.
+ * Will encode and send the given data.
+ *
+ * Sending with len == 0 will close the send side of the DTLS connection and
+ * no further data can be sent anymore in the future. This will also send the
+ * close_notify to the peer.
+ *
+ * Returns:
+ * - GST_FLOW_EOS if the send side of the DTLS connection was closed, i.e.
+ * we received an EOS before.
+ * - GST_FLOW_ERROR + err if an error happened
+ * - GST_FLOW_OK + written >= 0 if processing was successful
*/
-gint gst_dtls_connection_send(GstDtlsConnection *, gpointer ptr, gint len);
+GstFlowReturn gst_dtls_connection_send(GstDtlsConnection *, gconstpointer ptr, gsize len, gsize *written, GError **err);
G_END_DECLS
PROP_CONNECTION_ID,
PROP_PEM,
PROP_PEER_PEM,
-
PROP_DECODER_KEY,
PROP_SRTP_CIPHER,
PROP_SRTP_AUTH,
+ PROP_CONNECTION_STATE,
NUM_PROPERTIES
};
signals[SIGNAL_ON_KEY_RECEIVED] =
g_signal_new ("on-key-received", G_TYPE_FROM_CLASS (klass),
- G_SIGNAL_RUN_LAST, 0, NULL, NULL,
- g_cclosure_marshal_generic, G_TYPE_NONE, 0);
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
properties[PROP_CONNECTION_ID] =
g_param_spec_string ("connection-id",
g_param_spec_string ("pem",
"PEM string",
"A string containing a X509 certificate and RSA private key in PEM format",
- DEFAULT_PEM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+ DEFAULT_PEM,
+#ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | GST_PARAM_DOC_SHOW_DEFAULT);
+#else
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+#endif
properties[PROP_PEER_PEM] =
g_param_spec_string ("peer-pem",
0, GST_DTLS_SRTP_AUTH_HMAC_SHA1_80, DEFAULT_SRTP_AUTH,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
+ properties[PROP_CONNECTION_STATE] =
+ g_param_spec_enum ("connection-state",
+ "Connection State",
+ "Current connection state",
+ GST_DTLS_TYPE_CONNECTION_STATE,
+ GST_DTLS_CONNECTION_STATE_NEW, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
+
g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
gst_element_class_add_static_pad_template (element_class, &src_template);
case PROP_SRTP_AUTH:
g_value_set_uint (value, self->srtp_auth);
break;
+ case PROP_CONNECTION_STATE:
+ if (self->connection)
+ g_object_get_property (G_OBJECT (self->connection), "connection-state",
+ value);
+ else
+ g_value_set_enum (value, GST_DTLS_CONNECTION_STATE_CLOSED);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
}
return TRUE;
}
-static gint
+static GstFlowReturn
process_buffer (GstDtlsDec * self, GstBuffer * buffer)
{
+ GstFlowReturn flow_ret;
GstMapInfo map_info;
- gint size;
+ GError *err = NULL;
+ gsize written = 0;
if (!gst_buffer_map (buffer, &map_info, GST_MAP_READWRITE))
- return 0;
+ return GST_FLOW_ERROR;
if (!map_info.size) {
gst_buffer_unmap (buffer, &map_info);
- return 0;
+ return GST_FLOW_ERROR;
}
- size =
+ flow_ret =
gst_dtls_connection_process (self->connection, map_info.data,
- map_info.size);
+ map_info.size, &written, &err);
gst_buffer_unmap (buffer, &map_info);
- if (size <= 0)
- return size;
-
- gst_buffer_set_size (buffer, size);
+ switch (flow_ret) {
+ case GST_FLOW_OK:
+ GST_LOG_OBJECT (self,
+ "Decoded buffer of size %" G_GSIZE_FORMAT " B to %" G_GSIZE_FORMAT,
+ map_info.size, written);
+ gst_buffer_set_size (buffer, written);
+ break;
+ case GST_FLOW_EOS:
+ gst_buffer_set_size (buffer, written);
+ GST_DEBUG_OBJECT (self, "Peer closed the connection");
+ break;
+ case GST_FLOW_ERROR:
+ GST_ERROR_OBJECT (self, "Error processing buffer: %s", err->message);
+ GST_ELEMENT_ERROR (self, RESOURCE, READ, (NULL), ("%s", err->message));
+ g_clear_error (&err);
+ break;
+ default:
+ g_assert_not_reached ();
+ }
+ g_assert (err == NULL);
- return size;
+ return flow_ret;
}
+typedef struct
+{
+ GstDtlsDec *self;
+ GstFlowReturn flow_ret;
+ guint processed;
+} ProcessListData;
+
static gboolean
process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
{
- GstDtlsDec *self = GST_DTLS_DEC (user_data);
- gint size;
+ ProcessListData *process_list_data = user_data;
+ GstDtlsDec *self = GST_DTLS_DEC (process_list_data->self);
+ GstFlowReturn flow_ret;
*buffer = gst_buffer_make_writable (*buffer);
- size = process_buffer (self, *buffer);
- if (size <= 0)
+ flow_ret = process_buffer (self, *buffer);
+
+ process_list_data->flow_ret = flow_ret;
+ if (gst_buffer_get_size (*buffer) == 0)
gst_buffer_replace (buffer, NULL);
+ else if (flow_ret != GST_FLOW_ERROR)
+ process_list_data->processed++;
- return TRUE;
+ return flow_ret == GST_FLOW_OK;
}
static GstFlowReturn
sink_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
{
GstDtlsDec *self = GST_DTLS_DEC (parent);
- GstFlowReturn ret = GST_FLOW_OK;
GstPad *other_pad;
+ ProcessListData process_list_data = { self, GST_FLOW_OK, 0 };
list = gst_buffer_list_make_writable (list);
- gst_buffer_list_foreach (list, process_buffer_from_list, self);
+ gst_buffer_list_foreach (list, process_buffer_from_list, &process_list_data);
+
+ /* If we successfully processed at least some buffers then forward those */
+ if (process_list_data.flow_ret != GST_FLOW_OK
+ && process_list_data.processed == 0) {
+ GST_ERROR_OBJECT (self, "Failed to process buffer list: %s",
+ gst_flow_get_name (process_list_data.flow_ret));
+ gst_buffer_list_unref (list);
+ return process_list_data.flow_ret;
+ }
+
+ /* Remove all buffers after the first one that failed to be processed */
+ gst_buffer_list_remove (list, process_list_data.processed,
+ gst_buffer_list_length (list) - process_list_data.processed);
if (gst_buffer_list_length (list) == 0) {
GST_DEBUG_OBJECT (self, "Not produced any buffers");
gst_buffer_list_unref (list);
- return GST_FLOW_OK;
+ return process_list_data.flow_ret;
}
g_mutex_lock (&self->src_mutex);
g_mutex_unlock (&self->src_mutex);
if (other_pad) {
- GST_LOG_OBJECT (self, "decoded buffer list with length %u, pushing",
+ gboolean was_eos = process_list_data.flow_ret == GST_FLOW_EOS;
+
+ GST_LOG_OBJECT (self, "pushing buffer list with length %u",
gst_buffer_list_length (list));
- ret = gst_pad_push_list (other_pad, list);
+ process_list_data.flow_ret = gst_pad_push_list (other_pad, list);
+
+ /* If the peer closed the connection, signal that we're done here now */
+ if (was_eos)
+ gst_pad_push_event (other_pad, gst_event_new_eos ());
+
gst_object_unref (other_pad);
} else {
- GST_LOG_OBJECT (self, "dropped buffer list with length %d, not linked",
+ GST_LOG_OBJECT (self,
+ "dropping buffer list with length %d, have no source pad",
gst_buffer_list_length (list));
gst_buffer_list_unref (list);
}
- return ret;
+ return process_list_data.flow_ret;
}
static GstFlowReturn
{
GstDtlsDec *self = GST_DTLS_DEC (parent);
GstFlowReturn ret = GST_FLOW_OK;
- gint size;
GstPad *other_pad;
if (!self->agent) {
self->connection_id, gst_buffer_get_size (buffer));
buffer = gst_buffer_make_writable (buffer);
- size = process_buffer (self, buffer);
-
- if (size <= 0) {
+ ret = process_buffer (self, buffer);
+ if (ret == GST_FLOW_ERROR) {
+ GST_ERROR_OBJECT (self, "Failed to process buffer: %s",
+ gst_flow_get_name (ret));
gst_buffer_unref (buffer);
-
- return GST_FLOW_OK;
+ return ret;
}
g_mutex_lock (&self->src_mutex);
g_mutex_unlock (&self->src_mutex);
if (other_pad) {
- GST_LOG_OBJECT (self, "decoded buffer with length %d, pushing", size);
- ret = gst_pad_push (other_pad, buffer);
+ gboolean was_eos = (ret == GST_FLOW_EOS);
+
+ if (gst_buffer_get_size (buffer) > 0) {
+ GST_LOG_OBJECT (self, "pushing buffer");
+ ret = gst_pad_push (other_pad, buffer);
+ } else {
+ gst_buffer_unref (buffer);
+ }
+
+ /* If the peer closed the connection, signal that we're done here now */
+ if (was_eos) {
+ gst_pad_push_event (other_pad, gst_event_new_eos ());
+ if (ret == GST_FLOW_OK)
+ ret = GST_FLOW_EOS;
+ }
+
gst_object_unref (other_pad);
} else {
- GST_LOG_OBJECT (self, "dropped buffer with length %d, not linked", size);
+ GST_LOG_OBJECT (self, "dropping buffer, have no source pad");
gst_buffer_unref (buffer);
}
}
static void
+on_connection_state_changed (GObject * object, GParamSpec * pspec,
+ gpointer user_data)
+{
+ GstDtlsDec *self = GST_DTLS_DEC (user_data);
+
+ g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_CONNECTION_STATE]);
+}
+
+static void
create_connection (GstDtlsDec * self, gchar * id)
{
g_return_if_fail (GST_IS_DTLS_DEC (self));
g_return_if_fail (GST_IS_DTLS_AGENT (self->agent));
if (self->connection) {
+ g_signal_handlers_disconnect_by_func (self->connection,
+ on_connection_state_changed, self);
g_object_unref (self->connection);
self->connection = NULL;
}
self->connection =
g_object_new (GST_TYPE_DTLS_CONNECTION, "agent", self->agent, NULL);
+ g_signal_connect_object (self->connection,
+ "notify::connection-state", G_CALLBACK (on_connection_state_changed),
+ self, 0);
+ on_connection_state_changed (NULL, NULL, self);
g_object_weak_ref (G_OBJECT (self->connection),
(GWeakNotify) connection_weak_ref_notify, g_strdup (id));
PROP_0,
PROP_CONNECTION_ID,
PROP_IS_CLIENT,
-
PROP_ENCODER_KEY,
PROP_SRTP_CIPHER,
PROP_SRTP_AUTH,
+ PROP_CONNECTION_STATE,
NUM_PROPERTIES
};
static void on_key_received (GstDtlsConnection *, gpointer key, guint cipher,
guint auth, GstDtlsEnc *);
-static void on_send_data (GstDtlsConnection *, gconstpointer data, gint length,
- GstDtlsEnc *);
+static gboolean on_send_data (GstDtlsConnection *, gconstpointer data,
+ gsize length, GstDtlsEnc *);
static void
gst_dtls_enc_class_init (GstDtlsEncClass * klass)
signals[SIGNAL_ON_KEY_RECEIVED] =
g_signal_new ("on-key-received", G_TYPE_FROM_CLASS (klass),
- G_SIGNAL_RUN_LAST, 0, NULL, NULL,
- g_cclosure_marshal_generic, G_TYPE_NONE, 0);
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
properties[PROP_CONNECTION_ID] =
g_param_spec_string ("connection-id",
0, GST_DTLS_SRTP_AUTH_HMAC_SHA1_80, DEFAULT_SRTP_AUTH,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
+ properties[PROP_CONNECTION_STATE] =
+ g_param_spec_enum ("connection-state",
+ "Connection State",
+ "Current connection state",
+ GST_DTLS_TYPE_CONNECTION_STATE,
+ GST_DTLS_CONNECTION_STATE_NEW, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
+
g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
gst_element_class_add_static_pad_template (element_class, &src_template);
case PROP_SRTP_AUTH:
g_value_set_uint (value, self->srtp_auth);
break;
+ case PROP_CONNECTION_STATE:
+ if (self->connection)
+ g_object_get_property (G_OBJECT (self->connection), "connection-state",
+ value);
+ else
+ g_value_set_enum (value, GST_DTLS_CONNECTION_STATE_CLOSED);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
}
}
+static void
+on_connection_state_changed (GObject * object, GParamSpec * pspec,
+ gpointer user_data)
+{
+ GstDtlsEnc *self = GST_DTLS_ENC (user_data);
+
+ g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_CONNECTION_STATE]);
+}
+
static GstStateChangeReturn
gst_dtls_enc_change_state (GstElement * element, GstStateChange transition)
{
g_signal_connect_object (self->connection,
"on-encoder-key", G_CALLBACK (on_key_received), self, 0);
+ g_signal_connect_object (self->connection,
+ "notify::connection-state",
+ G_CALLBACK (on_connection_state_changed), self, 0);
+ on_connection_state_changed (NULL, NULL, self);
gst_dtls_connection_set_send_callback (self->connection,
- g_cclosure_new (G_CALLBACK (on_send_data), self, NULL));
+ (GstDtlsConnectionSendCallback) on_send_data, self, NULL);
} else {
GST_WARNING_OBJECT (self,
"trying to change state to ready without connection id");
if (self->connection) {
gst_dtls_connection_close (self->connection);
- gst_dtls_connection_set_send_callback (self->connection, NULL);
+ gst_dtls_connection_set_send_callback (self->connection, NULL, NULL,
+ NULL);
g_object_unref (self->connection);
self->connection = NULL;
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
switch (transition) {
- case GST_STATE_CHANGE_READY_TO_PAUSED:
+ case GST_STATE_CHANGE_READY_TO_PAUSED:{
+ GError *err = NULL;
+
GST_DEBUG_OBJECT (self, "starting connection %s", self->connection_id);
- gst_dtls_connection_start (self->connection, self->is_client);
+ if (!gst_dtls_connection_start (self->connection, self->is_client, &err)) {
+ GST_ELEMENT_ERROR (self, RESOURCE, OPEN_WRITE, (NULL), ("%s",
+ err->message));
+ g_clear_error (&err);
+ }
break;
+ }
default:
break;
}
GST_DEBUG_OBJECT (self, "src pad activating in push mode");
self->flushing = FALSE;
+ self->src_ret = GST_FLOW_OK;
self->send_initial_events = TRUE;
success =
gst_pad_start_task (pad, (GstTaskFunction) src_task_loop, self->src,
g_queue_foreach (&self->queue, (GFunc) gst_buffer_unref, NULL);
g_queue_clear (&self->queue);
self->flushing = TRUE;
+ self->src_ret = GST_FLOW_FLUSHING;
g_cond_signal (&self->queue_cond_add);
g_mutex_unlock (&self->queue_lock);
success = gst_pad_stop_task (pad);
GST_TRACE_OBJECT (self, "src loop: releasing lock");
- ret = gst_pad_push (self->src, buffer);
- if (check_connection_timeout)
- gst_dtls_connection_check_timeout (self->connection);
+ if (buffer) {
+ ret = gst_pad_push (self->src, buffer);
+ if (check_connection_timeout)
+ gst_dtls_connection_check_timeout (self->connection);
- if (G_UNLIKELY (ret != GST_FLOW_OK)) {
- GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s",
- gst_flow_get_name (ret));
+ if (G_UNLIKELY (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
+ GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s",
+ gst_flow_get_name (ret));
+ }
+ g_mutex_lock (&self->queue_lock);
+ self->src_ret = ret;
+ g_mutex_unlock (&self->queue_lock);
+ } else {
+ GST_DEBUG_OBJECT (self, "Peer and us closed the connection, sending EOS");
+ gst_pad_push_event (self->src, gst_event_new_eos ());
+ g_mutex_lock (&self->queue_lock);
+ self->src_ret = GST_FLOW_EOS;
+ g_mutex_unlock (&self->queue_lock);
}
}
{
GstDtlsEnc *self = GST_DTLS_ENC (parent);
GstMapInfo map_info;
- gint ret;
+ GError *err = NULL;
+ gsize to_write, written = 0;
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ g_mutex_lock (&self->queue_lock);
+ if (self->src_ret != GST_FLOW_OK) {
+ if (G_UNLIKELY (self->src_ret == GST_FLOW_NOT_LINKED
+ || self->src_ret < GST_FLOW_EOS))
+ GST_ERROR_OBJECT (self, "Pushing previous data returned an error: %s",
+ gst_flow_get_name (self->src_ret));
+
+ gst_buffer_unref (buffer);
+ g_mutex_unlock (&self->queue_lock);
+ return self->src_ret;
+ }
+ g_mutex_unlock (&self->queue_lock);
gst_buffer_map (buffer, &map_info, GST_MAP_READ);
- if (map_info.size) {
+ to_write = map_info.size;
+
+ while (to_write > 0 && ret == GST_FLOW_OK) {
ret =
gst_dtls_connection_send (self->connection, map_info.data,
- map_info.size);
- if (ret != map_info.size) {
- GST_WARNING_OBJECT (self,
- "error sending data: %d B were written, expected value was %"
- G_GSIZE_FORMAT " B", ret, map_info.size);
+ map_info.size, &written, &err);
+
+ switch (ret) {
+ case GST_FLOW_OK:
+ GST_DEBUG_OBJECT (self,
+ "Wrote %" G_GSIZE_FORMAT " B of %" G_GSIZE_FORMAT " B", written,
+ map_info.size);
+ g_assert (written <= to_write);
+ to_write -= written;
+ break;
+ case GST_FLOW_EOS:
+ GST_INFO_OBJECT (self, "Received data after the connection was closed");
+ break;
+ case GST_FLOW_ERROR:
+ GST_WARNING_OBJECT (self, "error sending data: %s", err->message);
+ GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL), ("%s", err->message));
+ g_clear_error (&err);
+ break;
+ default:
+ g_assert_not_reached ();
+ break;
}
+
+ g_assert (err == NULL);
}
gst_buffer_unmap (buffer, &map_info);
-
gst_buffer_unref (buffer);
- return GST_FLOW_OK;
+ return ret;
}
static gboolean
sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
+ GstDtlsEnc *self = GST_DTLS_ENC (parent);
gboolean ret = FALSE;
switch (GST_EVENT_TYPE (event)) {
gst_event_unref (event);
ret = TRUE;
break;
+ case GST_EVENT_EOS:{
+ GstFlowReturn flow_ret;
+
+ /* Close the write side of the connection now */
+ flow_ret =
+ gst_dtls_connection_send (self->connection, NULL, 0, NULL, NULL);
+
+ if (flow_ret != GST_FLOW_OK)
+ GST_ERROR_OBJECT (self, "Failed to send close_notify");
+
+ /* Do not forward the EOS event unless the peer already closed to the
+ * connection itself. If it didn't yet then we'll later get the send
+ * callback called with no data and send EOS from there */
+ if (flow_ret == GST_FLOW_EOS) {
+ ret = gst_pad_event_default (pad, parent, event);
+ } else {
+ gst_event_unref (event);
+ ret = TRUE;
+ }
+
+ break;
+ }
default:
ret = gst_pad_event_default (pad, parent, event);
break;
g_signal_emit (self, signals[SIGNAL_ON_KEY_RECEIVED], 0);
}
-static void
-on_send_data (GstDtlsConnection * connection, gconstpointer data, gint length,
+static gboolean
+on_send_data (GstDtlsConnection * connection, gconstpointer data, gsize length,
GstDtlsEnc * self)
{
GstBuffer *buffer;
+ gboolean ret;
- GST_DEBUG_OBJECT (self, "sending data from %s with length %d",
+ GST_DEBUG_OBJECT (self, "sending data from %s with length %" G_GSIZE_FORMAT,
self->connection_id, length);
- buffer = gst_buffer_new_wrapped (g_memdup (data, length), length);
+ buffer =
+ data ? gst_buffer_new_wrapped (g_memdup (data, length), length) : NULL;
GST_TRACE_OBJECT (self, "send data: acquiring lock");
g_mutex_lock (&self->queue_lock);
g_cond_signal (&self->queue_cond_add);
GST_TRACE_OBJECT (self, "send data: releasing lock");
+
+ ret = self->src_ret == GST_FLOW_OK;
g_mutex_unlock (&self->queue_lock);
+
+ return ret;
}
GstElement element;
GstPad *src;
+ GstFlowReturn src_ret;
GQueue queue;
GMutex queue_lock;
G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS);
g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
+
+#ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK
+ gst_type_mark_as_plugin_api (GST_TYPE_DTLS_SRTP_BIN, 0);
+#endif
}
static void
#endif
#include "gstdtlssrtpdec.h"
-
#include "gstdtlsconnection.h"
static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
PROP_0,
PROP_PEM,
PROP_PEER_PEM,
+ PROP_CONNECTION_STATE,
NUM_PROPERTIES
};
g_param_spec_string ("pem",
"PEM string",
"A string containing a X509 certificate and RSA private key in PEM format",
- DEFAULT_PEM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+ DEFAULT_PEM,
+#ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | GST_PARAM_DOC_SHOW_DEFAULT);
+#else
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+#endif
properties[PROP_PEER_PEM] =
g_param_spec_string ("peer-pem",
"The X509 certificate received in the DTLS handshake, in PEM format",
DEFAULT_PEER_PEM, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
+ properties[PROP_CONNECTION_STATE] =
+ g_param_spec_enum ("connection-state",
+ "Connection State",
+ "Current connection state",
+ GST_DTLS_TYPE_CONNECTION_STATE,
+ GST_DTLS_CONNECTION_STATE_NEW, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
+
g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
gst_element_class_add_static_pad_template (element_class, &sink_template);
}
static void
+on_connection_state_changed (GObject * object, GParamSpec * pspec,
+ gpointer user_data)
+{
+ GstDtlsSrtpDec *self = GST_DTLS_SRTP_DEC (user_data);
+
+ g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_CONNECTION_STATE]);
+}
+
+static void
gst_dtls_srtp_dec_init (GstDtlsSrtpDec * self)
{
GstElementClass *klass = GST_ELEMENT_GET_CLASS (GST_ELEMENT (self));
G_CALLBACK (on_decoder_request_key), self);
g_signal_connect (self->bin.dtls_element, "notify::peer-pem",
G_CALLBACK (on_peer_pem), self);
+ g_signal_connect (self->bin.dtls_element, "notify::connection-state",
+ G_CALLBACK (on_connection_state_changed), self);
}
static void
GST_WARNING_OBJECT (self, "tried to get peer-pem after disabling DTLS");
}
break;
+ case PROP_CONNECTION_STATE:
+ g_object_get_property (G_OBJECT (self->bin.dtls_element),
+ "connection-state", value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
}
#endif
#include "gstdtlssrtpenc.h"
+#include "gstdtlsconnection.h"
#include <stdio.h>
{
PROP_0,
PROP_IS_CLIENT,
+ PROP_CONNECTION_STATE,
+ PROP_RTP_SYNC,
NUM_PROPERTIES
};
static GParamSpec *properties[NUM_PROPERTIES];
#define DEFAULT_IS_CLIENT FALSE
+#define DEFAULT_RTP_SYNC FALSE
static gboolean transform_enum (GBinding *, const GValue * source_value,
GValue * target_value, GEnumClass *);
signals[SIGNAL_ON_KEY_SET] =
g_signal_new ("on-key-set", G_TYPE_FROM_CLASS (klass),
- G_SIGNAL_RUN_LAST, 0, NULL, NULL,
- g_cclosure_marshal_generic, G_TYPE_NONE, 0);
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
properties[PROP_IS_CLIENT] =
g_param_spec_boolean ("is-client",
DEFAULT_IS_CLIENT,
GST_PARAM_MUTABLE_READY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+ properties[PROP_CONNECTION_STATE] =
+ g_param_spec_enum ("connection-state",
+ "Connection State",
+ "Current connection state",
+ GST_DTLS_TYPE_CONNECTION_STATE,
+ GST_DTLS_CONNECTION_STATE_NEW, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
+
+
+ properties[PROP_RTP_SYNC] =
+ g_param_spec_boolean ("rtp-sync", "Synchronize RTP",
+ "Synchronize RTP to the pipeline clock before merging with RTCP",
+ DEFAULT_RTP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
gst_element_class_add_static_pad_template (element_class, &rtp_sink_template);
}
static void
+on_connection_state_changed (GObject * object, GParamSpec * pspec,
+ gpointer user_data)
+{
+ GstDtlsSrtpEnc *self = GST_DTLS_SRTP_ENC (user_data);
+
+ g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_CONNECTION_STATE]);
+}
+
+static void
gst_dtls_srtp_enc_init (GstDtlsSrtpEnc * self)
{
GstElementClass *klass = GST_ELEMENT_GET_CLASS (GST_ELEMENT (self));
gboolean ret;
/*
- +--------------------+ +-----------------+
- rtp_sink-R-o|rtp_sink rtp_src|o-R-o| |
- | srtpenc | | |
- rtcp_sink-R-o|srtcp_sink rtcp_src|o-R-o| |
- +--------------------+ | funnel |o---src
- | |
- +--------------------+ | |
- data_sink-R-o| dtlsenc |o---o| |
- +--------------------+ +-----------------+
+ +--------------------+ +--------------+ +-----------------+
+ rtp_sink-R-o|rtp_sink rtp_src|o-R-o clocksync |o-R-o| |
+ | srtpenc | +--------------+ | |
+ | | | |
+ rtcp_sink-R-o|srtcp_sink rtcp_src|o-----------R-----------o| |
+ +--------------------+ | funnel |o---src
+ | |
+ +--------------------+ | |
+ data_sink-R-o| dtlsenc |o-----------------------o| |
+ +--------------------+ +-----------------+
+
+ The clocksync element is tied to the sync property. If sync=true, RTP output will be
+ synchronised to the clock, so it doesn't slow down RTCP traffic by being synched later
+ in the pipeline
*/
self->srtp_enc = gst_element_factory_make ("srtpenc", NULL);
g_object_set (self->srtp_enc, "random-key", TRUE, NULL);
+ g_signal_connect (self->bin.dtls_element, "notify::connection-state",
+ G_CALLBACK (on_connection_state_changed), self);
+
g_object_bind_property (G_OBJECT (self), "key", self->srtp_enc, "key",
G_BINDING_DEFAULT);
g_object_bind_property_full (G_OBJECT (self), "srtp-cipher", self->srtp_enc,
"tried to set is-client after disabling DTLS");
}
break;
+ case PROP_RTP_SYNC:
+ self->rtp_sync = g_value_get_boolean (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
}
"tried to get is-client after disabling DTLS");
}
break;
+ case PROP_CONNECTION_STATE:
+ g_object_get_property (G_OBJECT (self->bin.dtls_element),
+ "connection-state", value);
+ break;
+ case PROP_RTP_SYNC:
+ g_value_set_boolean (value, self->rtp_sync);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
}
g_return_val_if_fail (self->srtp_enc, NULL);
if (templ == gst_element_class_get_pad_template (klass, "rtp_sink_%d")) {
+ gchar *clocksync_name;
+ GstElement *clocksync;
+
+ sscanf (name, "rtp_sink_%d", &pad_n);
+
+ clocksync_name = g_strdup_printf ("clocksync_%d", pad_n);
+ clocksync = gst_element_factory_make ("clocksync", clocksync_name);
+ g_free (clocksync_name);
+
+ if (clocksync == NULL) {
+ goto fail_create;
+ }
+
+ g_object_bind_property (self, "rtp-sync", clocksync, "sync",
+ G_BINDING_SYNC_CREATE);
+
+ gst_bin_add (GST_BIN (self), clocksync);
+ gst_element_sync_state_with_parent (clocksync);
+
target_pad = gst_element_get_request_pad (self->srtp_enc, name);
g_return_val_if_fail (target_pad, NULL);
- sscanf (GST_PAD_NAME (target_pad), "rtp_sink_%d", &pad_n);
srtp_src_name = g_strdup_printf ("rtp_src_%d", pad_n);
- gst_element_link_pads (self->srtp_enc, srtp_src_name, self->funnel, NULL);
+ gst_element_link_pads (self->srtp_enc, srtp_src_name, clocksync, NULL);
+ gst_element_link_pads (clocksync, "src", self->funnel, NULL);
g_free (srtp_src_name);
}
return ghost_pad;
+
+fail_create:
+ GST_ELEMENT_ERROR (self, CORE, MISSING_PLUGIN, NULL,
+ ("%s", "Failed to create internal clocksync element"));
+ return NULL;
}
static void
struct _GstDtlsSrtpEnc {
GstDtlsSrtpBin bin;
+ gboolean rtp_sync;
+
GstElement *srtp_enc;
GstElement *funnel;
};
install_dir : plugins_install_dir,
)
pkgconfig.generate(gstdtls, install_dir : plugins_pkgconfig_install_dir)
+ plugins += [gstdtls]
endif
static gboolean
plugin_init (GstPlugin * plugin)
{
+#ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK
+ gst_type_mark_as_plugin_api (GST_DTLS_TYPE_CONNECTION_STATE, 0);
+#endif
+
return gst_element_register (plugin, "dtlsenc", GST_RANK_NONE,
GST_TYPE_DTLS_ENC)
&& gst_element_register (plugin, "dtlsdec", GST_RANK_NONE,