gboolean is_alive;
gboolean keys_exported;
+ gboolean sent_close_notify;
+ gboolean received_close_notify;
+ gboolean fatal_error;
+
GMutex mutex;
GCond condition;
gpointer bio_buffer;
static void log_state (GstDtlsConnection *, const gchar * str);
static void export_srtp_keys (GstDtlsConnection *);
-static void openssl_poll (GstDtlsConnection *);
+static GstFlowReturn openssl_poll (GstDtlsConnection *, GError ** err);
+static GstFlowReturn handle_error (GstDtlsConnection * self, int ret,
+ GstResourceError error_type, GError ** err);
static int openssl_verify_callback (int preverify_ok,
X509_STORE_CTX * x509_ctx);
}
}
-void
-gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client)
+gboolean
+gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client,
+ GError ** err)
{
GstDtlsConnectionPrivate *priv;
+ gboolean ret;
priv = self->priv;
- g_return_if_fail (priv->send_callback);
- g_return_if_fail (priv->ssl);
- g_return_if_fail (priv->bio);
+ g_return_val_if_fail (priv->send_callback, FALSE);
+ g_return_val_if_fail (priv->ssl, FALSE);
+ g_return_val_if_fail (priv->bio, FALSE);
GST_TRACE_OBJECT (self, "locking @ start");
g_mutex_lock (&priv->mutex);
priv->bio_buffer_offset = 0;
priv->keys_exported = FALSE;
+ priv->fatal_error = FALSE;
+ priv->sent_close_notify = FALSE;
+ priv->received_close_notify = FALSE;
+
priv->is_client = is_client;
if (priv->is_client) {
SSL_set_connect_state (priv->ssl);
}
log_state (self, "initial state set");
- openssl_poll (self);
+ ret = openssl_poll (self, err);
+ if (ret == GST_FLOW_EOS && err) {
+ *err =
+ g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_WRITE,
+ "Connection closed");
+ }
log_state (self, "first poll done");
GST_TRACE_OBJECT (self, "unlocking @ start");
g_mutex_unlock (&priv->mutex);
+
+ return ret == GST_FLOW_OK;
}
static void
GST_WARNING_OBJECT (self, "handling timeout failed");
} else if (ret > 0) {
log_state (self, "handling timeout before poll");
- openssl_poll (self);
+ openssl_poll (self, NULL);
log_state (self, "handling timeout after poll");
}
}
g_mutex_unlock (&priv->mutex);
}
-gint
-gst_dtls_connection_process (GstDtlsConnection * self, gpointer data, gint len)
+GstFlowReturn
+gst_dtls_connection_process (GstDtlsConnection * self, gpointer data, gsize len,
+ gsize * written, GError ** err)
{
+ GstFlowReturn flow_ret = GST_FLOW_OK;
GstDtlsConnectionPrivate *priv;
- gint result;
+ int ret;
g_return_val_if_fail (GST_IS_DTLS_CONNECTION (self), 0);
g_return_val_if_fail (self->priv->ssl, 0);
g_warn_if_fail (!priv->bio_buffer);
+ if (self->priv->received_close_notify) {
+ GST_DEBUG_OBJECT (self, "Already received close_notify");
+ g_mutex_unlock (&priv->mutex);
+ return GST_FLOW_EOS;
+ }
+
+ if (self->priv->fatal_error) {
+ GST_ERROR_OBJECT (self, "Had a fatal error before");
+ g_mutex_unlock (&priv->mutex);
+ if (err)
+ *err =
+ g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
+ "Had fatal error before");
+ return GST_FLOW_ERROR;
+ }
+
priv->bio_buffer = data;
priv->bio_buffer_len = len;
priv->bio_buffer_offset = 0;
log_state (self, "process start");
if (SSL_want_write (priv->ssl)) {
- openssl_poll (self);
+ flow_ret = openssl_poll (self, err);
log_state (self, "process want write, after poll");
+ if (flow_ret != GST_FLOW_OK) {
+ g_mutex_unlock (&priv->mutex);
+ return flow_ret;
+ }
}
- result = SSL_read (priv->ssl, data, len);
+ ret = SSL_read (priv->ssl, data, len);
+ *written = ret >= 0 ? ret : 0;
+ GST_DEBUG_OBJECT (self, "read result: %d", ret);
+
+ flow_ret = handle_error (self, ret, GST_RESOURCE_ERROR_READ, err);
+ if (flow_ret == GST_FLOW_EOS) {
+ self->priv->received_close_notify = TRUE;
+ /* Notify about the connection being properly closed now if both
+ * sides did so */
+ if (self->priv->sent_close_notify && self->priv->send_callback)
+ self->priv->send_callback (self, NULL, 0, NULL);
+
+ g_mutex_unlock (&priv->mutex);
+ return flow_ret;
+ } else if (flow_ret != GST_FLOW_OK) {
+ g_mutex_unlock (&priv->mutex);
+ return flow_ret;
+ }
log_state (self, "process after read");
- openssl_poll (self);
+ flow_ret = openssl_poll (self, err);
log_state (self, "process after poll");
- GST_DEBUG_OBJECT (self, "read result: %d", result);
-
GST_TRACE_OBJECT (self, "unlocking @ process");
g_mutex_unlock (&priv->mutex);
- return result;
+ return flow_ret;
}
-gint
-gst_dtls_connection_send (GstDtlsConnection * self, gpointer data, gint len)
+GstFlowReturn
+gst_dtls_connection_send (GstDtlsConnection * self, gconstpointer data,
+ gsize len, gsize * written, GError ** err)
{
+ GstFlowReturn flow_ret;
int ret = 0;
g_return_val_if_fail (GST_IS_DTLS_CONNECTION (self), 0);
g_mutex_lock (&self->priv->mutex);
GST_TRACE_OBJECT (self, "locked @ send");
- if (SSL_is_init_finished (self->priv->ssl)) {
+ if (self->priv->fatal_error) {
+ GST_ERROR_OBJECT (self, "Had a fatal error before");
+ g_mutex_unlock (&self->priv->mutex);
+ if (err)
+ *err =
+ g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
+ "Had fatal error before");
+ return GST_FLOW_ERROR;
+ }
+
+ if (self->priv->sent_close_notify) {
+ len = 0;
+ GST_DEBUG_OBJECT (self, "Not sending new data after close_notify");
+ }
+
+ if (len == 0) {
+ if (written)
+ *written = 0;
+ GST_DEBUG_OBJECT (self, "Sending close_notify");
+ ret = SSL_shutdown (self->priv->ssl);
+ self->priv->sent_close_notify = TRUE;
+ if (ret == 1) {
+ GST_LOG_OBJECT (self, "received peer close_notify already");
+ self->priv->received_close_notify = TRUE;
+ flow_ret = GST_FLOW_EOS;
+ } else if (ret == 0) {
+ GST_LOG_OBJECT (self, "did not receive peer close_notify yet");
+ flow_ret = GST_FLOW_OK;
+ } else {
+ flow_ret = handle_error (self, ret, GST_RESOURCE_ERROR_WRITE, err);
+ }
+ } else if (SSL_is_init_finished (self->priv->ssl)) {
+ GST_DEBUG_OBJECT (self, "sending data of %" G_GSIZE_FORMAT " B", len);
ret = SSL_write (self->priv->ssl, data, len);
- GST_DEBUG_OBJECT (self, "data sent: input was %d B, output is %d B", len,
- ret);
+ if (ret <= 0) {
+ if (written)
+ *written = 0;
+ flow_ret = handle_error (self, ret, GST_RESOURCE_ERROR_WRITE, err);
+ } else {
+ if (written)
+ *written = ret;
+ flow_ret = GST_FLOW_OK;
+ }
} else {
+ if (written)
+ *written = ret;
GST_WARNING_OBJECT (self,
"tried to send data before handshake was complete");
- ret = 0;
+ if (err)
+ *err =
+ g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
+ "Tried to send data before handshake was complete");
+ flow_ret = GST_FLOW_ERROR;
}
GST_TRACE_OBJECT (self, "unlocking @ send");
g_mutex_unlock (&self->priv->mutex);
- return ret;
+ return flow_ret;
}
/*
return 0;
}
-static void
-openssl_poll (GstDtlsConnection * self)
+static GstFlowReturn
+handle_error (GstDtlsConnection * self, int ret, GstResourceError error_type,
+ GError ** err)
{
- int ret;
int error;
+ error = SSL_get_error (self->priv->ssl, ret);
+
+ switch (error) {
+ case SSL_ERROR_NONE:
+ GST_TRACE_OBJECT (self, "No error");
+ return GST_FLOW_OK;
+ case SSL_ERROR_SSL:
+ GST_ERROR_OBJECT (self, "Fatal SSL error");
+ self->priv->fatal_error = TRUE;
+ ERR_print_errors_cb (ssl_err_cb, self);
+ if (err)
+ *err =
+ g_error_new_literal (GST_RESOURCE_ERROR, error_type,
+ "Fatal SSL error");
+ return GST_FLOW_ERROR;
+ case SSL_ERROR_ZERO_RETURN:
+ GST_LOG_OBJECT (self, "Connection was closed");
+ return GST_FLOW_EOS;
+ case SSL_ERROR_WANT_READ:
+ GST_LOG_OBJECT (self, "SSL wants read");
+ return GST_FLOW_OK;
+ case SSL_ERROR_WANT_WRITE:
+ GST_LOG_OBJECT (self, "SSL wants write");
+ return GST_FLOW_OK;
+ case SSL_ERROR_SYSCALL:{
+ gchar message[1024] = "<unknown>";
+ gint syserror;
+#ifdef G_OS_WIN32
+ syserror = WSAGetLastError ();
+ FormatMessage (FORMAT_MESSAGE_FROM_SYSTEM, NULL, syserror, 0, message,
+ sizeof message, NULL);
+#else
+ syserror = errno;
+ strerror_r (syserror, message, sizeof message);
+#endif
+
+ if (syserror == 0) {
+ GST_TRACE_OBJECT (self, "No error");
+ return GST_FLOW_OK;
+ } else {
+ GST_ERROR_OBJECT (self, "Fatal SSL syscall error: errno %d: %s",
+ syserror, message);
+ if (err)
+ *err =
+ g_error_new (GST_RESOURCE_ERROR, error_type,
+ "Fatal SSL syscall error: errno %d: %s", syserror, message);
+ self->priv->fatal_error = TRUE;
+ return GST_FLOW_ERROR;
+ }
+ }
+ default:
+ self->priv->fatal_error = TRUE;
+ GST_ERROR_OBJECT (self, "Unknown SSL error: %d, ret: %d", error, ret);
+ if (err)
+ *err =
+ g_error_new (GST_RESOURCE_ERROR, error_type,
+ "Unknown SSL error: %d, ret: %d", error, ret);
+ return GST_FLOW_ERROR;
+ }
+}
+
+static GstFlowReturn
+openssl_poll (GstDtlsConnection * self, GError ** err)
+{
+ int ret;
+ GstFlowReturn flow_ret;
+
log_state (self, "poll: before handshake");
ERR_clear_error ();
} else {
GST_INFO_OBJECT (self, "handshake is completed");
}
- return;
+ return GST_FLOW_OK;
case 0:
GST_DEBUG_OBJECT (self, "do_handshake encountered EOF");
break;
case -1:
- GST_DEBUG_OBJECT (self, "do_handshake encountered BIO error");
+ GST_DEBUG_OBJECT (self, "do_handshake encountered potential BIO error");
break;
default:
GST_DEBUG_OBJECT (self, "do_handshake returned %d", ret);
- }
-
- error = SSL_get_error (self->priv->ssl, ret);
-
- switch (error) {
- case SSL_ERROR_NONE:
- GST_WARNING_OBJECT (self, "no error, handshake should be done");
- break;
- case SSL_ERROR_SSL:
- GST_ERROR_OBJECT (self, "SSL error");
- ERR_print_errors_cb (ssl_err_cb, self);
- return;
- case SSL_ERROR_WANT_READ:
- GST_LOG_OBJECT (self, "SSL wants read");
- break;
- case SSL_ERROR_WANT_WRITE:
- GST_LOG_OBJECT (self, "SSL wants write");
break;
- case SSL_ERROR_SYSCALL:{
- gchar message[1024] = "<unknown>";
- gint syserror;
-#ifdef G_OS_WIN32
- syserror = WSAGetLastError ();
- FormatMessage (FORMAT_MESSAGE_FROM_SYSTEM, NULL, syserror, 0, message,
- sizeof message, NULL);
-#else
- syserror = errno;
- strerror_r (syserror, message, sizeof message);
-#endif
- GST_CAT_LEVEL_LOG (GST_CAT_DEFAULT,
- syserror != 0 ? GST_LEVEL_WARNING : GST_LEVEL_LOG,
- self, "SSL syscall error: errno %d: %s", syserror, message);
- break;
- }
- default:
- GST_WARNING_OBJECT (self, "Unknown SSL error: %d, ret: %d", error, ret);
}
+ flow_ret = handle_error (self, ret, GST_RESOURCE_ERROR_OPEN_WRITE, err);
+
ERR_print_errors_cb (ssl_warn_cb, self);
+
+ return flow_ret;
}
static int
#ifndef gstdtlsconnection_h
#define gstdtlsconnection_h
-#include <glib-object.h>
+#include <gst/gst.h>
G_BEGIN_DECLS
GType gst_dtls_connection_get_type(void) G_GNUC_CONST;
-void gst_dtls_connection_start(GstDtlsConnection *, gboolean is_client);
+gboolean gst_dtls_connection_start(GstDtlsConnection *, gboolean is_client, GError **err);
void gst_dtls_connection_check_timeout(GstDtlsConnection *);
/*
/*
* Processes data that has been received, the transformation is done in-place.
- * Returns the length of the plaintext data that was decoded, if no data is available, 0<= will be returned.
+ *
+ * Returns:
+ * - GST_FLOW_EOS if the receive side of the DTLS connection was closed by
+ * the peer, i.e. close_notify was sent by the peer
+ * - GST_FLOW_ERROR + err if an error happened
+ * - GST_FLOW_OK + written >= 0 if processing was successful. ptr then
+ * contains the decoded bytes
*/
-gint gst_dtls_connection_process(GstDtlsConnection *, gpointer ptr, gint len);
+GstFlowReturn gst_dtls_connection_process(GstDtlsConnection *, gpointer ptr, gsize len, gsize *written, GError **err);
/*
- * If the DTLS handshake is completed this function will encode the given data.
- * Returns the length of the data sent, or 0 if the DTLS handshake is not completed.
+ * Will encode and send the given data.
+ *
+ * Sending with len == 0 will close the send side of the DTLS connection and
+ * no further data can be sent anymore in the future. This will also send the
+ * close_notify to the peer.
+ *
+ * Returns:
+ * - GST_FLOW_EOS if the send side of the DTLS connection was closed, i.e.
+ * we received an EOS before.
+ * - GST_FLOW_ERROR + err if an error happened
+ * - GST_FLOW_OK + written >= 0 if processing was successful
*/
-gint gst_dtls_connection_send(GstDtlsConnection *, gpointer ptr, gint len);
+GstFlowReturn gst_dtls_connection_send(GstDtlsConnection *, gconstpointer ptr, gsize len, gsize *written, GError **err);
G_END_DECLS
return TRUE;
}
-static gint
+static GstFlowReturn
process_buffer (GstDtlsDec * self, GstBuffer * buffer)
{
+ GstFlowReturn flow_ret;
GstMapInfo map_info;
- gint size;
+ GError *err = NULL;
+ gsize written = 0;
if (!gst_buffer_map (buffer, &map_info, GST_MAP_READWRITE))
- return 0;
+ return GST_FLOW_ERROR;
if (!map_info.size) {
gst_buffer_unmap (buffer, &map_info);
- return 0;
+ return GST_FLOW_ERROR;
}
- size =
+ flow_ret =
gst_dtls_connection_process (self->connection, map_info.data,
- map_info.size);
+ map_info.size, &written, &err);
gst_buffer_unmap (buffer, &map_info);
- if (size <= 0)
- return size;
-
- gst_buffer_set_size (buffer, size);
+ switch (flow_ret) {
+ case GST_FLOW_OK:
+ GST_LOG_OBJECT (self,
+ "Decoded buffer of size %" G_GSIZE_FORMAT " B to %" G_GSIZE_FORMAT,
+ map_info.size, written);
+ gst_buffer_set_size (buffer, written);
+ break;
+ case GST_FLOW_EOS:
+ gst_buffer_set_size (buffer, written);
+ GST_DEBUG_OBJECT (self, "Peer closed the connection");
+ break;
+ case GST_FLOW_ERROR:
+ GST_ERROR_OBJECT (self, "Error processing buffer: %s", err->message);
+ GST_ELEMENT_ERROR (self, RESOURCE, READ, (NULL), ("%s", err->message));
+ g_clear_error (&err);
+ break;
+ default:
+ g_assert_not_reached ();
+ }
+ g_assert (err == NULL);
- return size;
+ return flow_ret;
}
+typedef struct
+{
+ GstDtlsDec *self;
+ GstFlowReturn flow_ret;
+ guint processed;
+} ProcessListData;
+
static gboolean
process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
{
- GstDtlsDec *self = GST_DTLS_DEC (user_data);
- gint size;
+ ProcessListData *process_list_data = user_data;
+ GstDtlsDec *self = GST_DTLS_DEC (process_list_data->self);
+ GstFlowReturn flow_ret;
*buffer = gst_buffer_make_writable (*buffer);
- size = process_buffer (self, *buffer);
- if (size <= 0)
+ flow_ret = process_buffer (self, *buffer);
+
+ process_list_data->flow_ret = flow_ret;
+ if (gst_buffer_get_size (*buffer) == 0)
gst_buffer_replace (buffer, NULL);
+ else if (flow_ret != GST_FLOW_ERROR)
+ process_list_data->processed++;
- return TRUE;
+ return flow_ret == GST_FLOW_OK;
}
static GstFlowReturn
sink_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
{
GstDtlsDec *self = GST_DTLS_DEC (parent);
- GstFlowReturn ret = GST_FLOW_OK;
GstPad *other_pad;
+ ProcessListData process_list_data = { self, GST_FLOW_OK, 0 };
list = gst_buffer_list_make_writable (list);
- gst_buffer_list_foreach (list, process_buffer_from_list, self);
+ gst_buffer_list_foreach (list, process_buffer_from_list, &process_list_data);
+
+ /* If we successfully processed at least some buffers then forward those */
+ if (process_list_data.flow_ret != GST_FLOW_OK
+ && process_list_data.processed == 0) {
+ GST_ERROR_OBJECT (self, "Failed to process buffer list: %s",
+ gst_flow_get_name (process_list_data.flow_ret));
+ gst_buffer_list_unref (list);
+ return process_list_data.flow_ret;
+ }
+
+ /* Remove all buffers after the first one that failed to be processed */
+ gst_buffer_list_remove (list, process_list_data.processed,
+ gst_buffer_list_length (list) - process_list_data.processed);
if (gst_buffer_list_length (list) == 0) {
GST_DEBUG_OBJECT (self, "Not produced any buffers");
gst_buffer_list_unref (list);
- return GST_FLOW_OK;
+ return process_list_data.flow_ret;
}
g_mutex_lock (&self->src_mutex);
g_mutex_unlock (&self->src_mutex);
if (other_pad) {
- GST_LOG_OBJECT (self, "decoded buffer list with length %u, pushing",
+ gboolean was_eos = process_list_data.flow_ret == GST_FLOW_EOS;
+
+ GST_LOG_OBJECT (self, "pushing buffer list with length %u",
gst_buffer_list_length (list));
- ret = gst_pad_push_list (other_pad, list);
+ process_list_data.flow_ret = gst_pad_push_list (other_pad, list);
+
+ /* If the peer closed the connection, signal that we're done here now */
+ if (was_eos)
+ gst_pad_push_event (other_pad, gst_event_new_eos ());
+
gst_object_unref (other_pad);
} else {
- GST_LOG_OBJECT (self, "dropped buffer list with length %d, not linked",
+ GST_LOG_OBJECT (self,
+ "dropping buffer list with length %d, have no source pad",
gst_buffer_list_length (list));
gst_buffer_list_unref (list);
}
- return ret;
+ return process_list_data.flow_ret;
}
static GstFlowReturn
{
GstDtlsDec *self = GST_DTLS_DEC (parent);
GstFlowReturn ret = GST_FLOW_OK;
- gint size;
GstPad *other_pad;
if (!self->agent) {
self->connection_id, gst_buffer_get_size (buffer));
buffer = gst_buffer_make_writable (buffer);
- size = process_buffer (self, buffer);
-
- if (size <= 0) {
+ ret = process_buffer (self, buffer);
+ if (ret == GST_FLOW_ERROR) {
+ GST_ERROR_OBJECT (self, "Failed to process buffer: %s",
+ gst_flow_get_name (ret));
gst_buffer_unref (buffer);
-
- return GST_FLOW_OK;
+ return ret;
}
g_mutex_lock (&self->src_mutex);
g_mutex_unlock (&self->src_mutex);
if (other_pad) {
- GST_LOG_OBJECT (self, "decoded buffer with length %d, pushing", size);
- ret = gst_pad_push (other_pad, buffer);
+ gboolean was_eos = (ret == GST_FLOW_EOS);
+
+ if (gst_buffer_get_size (buffer) > 0) {
+ GST_LOG_OBJECT (self, "pushing buffer");
+ ret = gst_pad_push (other_pad, buffer);
+ } else {
+ gst_buffer_unref (buffer);
+ }
+
+ /* If the peer closed the connection, signal that we're done here now */
+ if (was_eos) {
+ gst_pad_push_event (other_pad, gst_event_new_eos ());
+ if (ret == GST_FLOW_OK)
+ ret = GST_FLOW_EOS;
+ }
+
gst_object_unref (other_pad);
} else {
- GST_LOG_OBJECT (self, "dropped buffer with length %d, not linked", size);
+ GST_LOG_OBJECT (self, "dropping buffer, have no source pad");
gst_buffer_unref (buffer);
}
static void on_key_received (GstDtlsConnection *, gpointer key, guint cipher,
guint auth, GstDtlsEnc *);
static gboolean on_send_data (GstDtlsConnection *, gconstpointer data,
- gint length, GstDtlsEnc *);
+ gsize length, GstDtlsEnc *);
static void
gst_dtls_enc_class_init (GstDtlsEncClass * klass)
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
switch (transition) {
- case GST_STATE_CHANGE_READY_TO_PAUSED:
+ case GST_STATE_CHANGE_READY_TO_PAUSED:{
+ GError *err = NULL;
+
GST_DEBUG_OBJECT (self, "starting connection %s", self->connection_id);
- gst_dtls_connection_start (self->connection, self->is_client);
+ if (!gst_dtls_connection_start (self->connection, self->is_client, &err)) {
+ GST_ELEMENT_ERROR (self, RESOURCE, OPEN_WRITE, (NULL), ("%s",
+ err->message));
+ g_clear_error (&err);
+ }
break;
+ }
default:
break;
}
GST_TRACE_OBJECT (self, "src loop: releasing lock");
- ret = gst_pad_push (self->src, buffer);
- if (check_connection_timeout)
- gst_dtls_connection_check_timeout (self->connection);
+ if (buffer) {
+ ret = gst_pad_push (self->src, buffer);
+ if (check_connection_timeout)
+ gst_dtls_connection_check_timeout (self->connection);
- if (G_UNLIKELY (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
- GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s",
- gst_flow_get_name (ret));
+ if (G_UNLIKELY (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
+ GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s",
+ gst_flow_get_name (ret));
+ }
+ g_mutex_lock (&self->queue_lock);
+ self->src_ret = ret;
+ g_mutex_unlock (&self->queue_lock);
+ } else {
+ GST_DEBUG_OBJECT (self, "Peer and us closed the connection, sending EOS");
+ gst_pad_push_event (self->src, gst_event_new_eos ());
+ g_mutex_lock (&self->queue_lock);
+ self->src_ret = GST_FLOW_EOS;
+ g_mutex_unlock (&self->queue_lock);
}
- g_mutex_lock (&self->queue_lock);
- self->src_ret = ret;
- g_mutex_unlock (&self->queue_lock);
}
static GstFlowReturn
{
GstDtlsEnc *self = GST_DTLS_ENC (parent);
GstMapInfo map_info;
- gint ret;
+ GError *err = NULL;
+ gsize to_write, written = 0;
+ GstFlowReturn ret = GST_FLOW_OK;
g_mutex_lock (&self->queue_lock);
if (self->src_ret != GST_FLOW_OK) {
gst_buffer_map (buffer, &map_info, GST_MAP_READ);
- if (map_info.size) {
+ to_write = map_info.size;
+
+ while (to_write > 0 && ret == GST_FLOW_OK) {
ret =
gst_dtls_connection_send (self->connection, map_info.data,
- map_info.size);
- if (ret != map_info.size) {
- GST_WARNING_OBJECT (self,
- "error sending data: %d B were written, expected value was %"
- G_GSIZE_FORMAT " B", ret, map_info.size);
+ map_info.size, &written, &err);
+
+ switch (ret) {
+ case GST_FLOW_OK:
+ GST_DEBUG_OBJECT (self,
+ "Wrote %" G_GSIZE_FORMAT " B of %" G_GSIZE_FORMAT " B", written,
+ map_info.size);
+ g_assert (written <= to_write);
+ to_write -= written;
+ break;
+ case GST_FLOW_EOS:
+ GST_INFO_OBJECT (self, "Received data after the connection was closed");
+ break;
+ case GST_FLOW_ERROR:
+ GST_WARNING_OBJECT (self, "error sending data: %s", err->message);
+ GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL), ("%s", err->message));
+ g_clear_error (&err);
+ break;
+ default:
+ g_assert_not_reached ();
+ break;
}
+
+ g_assert (err == NULL);
}
gst_buffer_unmap (buffer, &map_info);
-
gst_buffer_unref (buffer);
- return GST_FLOW_OK;
+ return ret;
}
static gboolean
sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
+ GstDtlsEnc *self = GST_DTLS_ENC (parent);
gboolean ret = FALSE;
switch (GST_EVENT_TYPE (event)) {
gst_event_unref (event);
ret = TRUE;
break;
+ case GST_EVENT_EOS:{
+ GstFlowReturn flow_ret;
+
+ /* Close the write side of the connection now */
+ flow_ret =
+ gst_dtls_connection_send (self->connection, NULL, 0, NULL, NULL);
+
+ if (flow_ret != GST_FLOW_OK)
+ GST_ERROR_OBJECT (self, "Failed to send close_notify");
+
+ /* Do not forward the EOS event unless the peer already closed to the
+ * connection itself. If it didn't yet then we'll later get the send
+ * callback called with no data and send EOS from there */
+ if (flow_ret == GST_FLOW_EOS) {
+ ret = gst_pad_event_default (pad, parent, event);
+ } else {
+ gst_event_unref (event);
+ ret = TRUE;
+ }
+
+ break;
+ }
default:
ret = gst_pad_event_default (pad, parent, event);
break;
}
static gboolean
-on_send_data (GstDtlsConnection * connection, gconstpointer data, gint length,
+on_send_data (GstDtlsConnection * connection, gconstpointer data, gsize length,
GstDtlsEnc * self)
{
GstBuffer *buffer;
gboolean ret;
- GST_DEBUG_OBJECT (self, "sending data from %s with length %d",
+ GST_DEBUG_OBJECT (self, "sending data from %s with length %" G_GSIZE_FORMAT,
self->connection_id, length);
- buffer = gst_buffer_new_wrapped (g_memdup (data, length), length);
+ buffer =
+ data ? gst_buffer_new_wrapped (g_memdup (data, length), length) : NULL;
GST_TRACE_OBJECT (self, "send data: acquiring lock");
g_mutex_lock (&self->queue_lock);