bio_method_write (BIO * bio, const char *data, int size)
{
GstDtlsConnection *self = GST_DTLS_CONNECTION (BIO_get_data (bio));
+ gboolean ret = TRUE;
GST_LOG_OBJECT (self, "BIO: writing %d", size);
if (self->priv->send_callback)
- self->priv->send_callback (self, data, size,
+ ret = self->priv->send_callback (self, data, size,
self->priv->send_callback_user_data);
- return size;
+ return ret ? size : -1;
}
static int
static void on_key_received (GstDtlsConnection *, gpointer key, guint cipher,
guint auth, GstDtlsEnc *);
-static void on_send_data (GstDtlsConnection *, gconstpointer data, gint length,
- GstDtlsEnc *);
+static gboolean on_send_data (GstDtlsConnection *, gconstpointer data,
+ gint length, GstDtlsEnc *);
static void
gst_dtls_enc_class_init (GstDtlsEncClass * klass)
GST_DEBUG_OBJECT (self, "src pad activating in push mode");
self->flushing = FALSE;
+ self->src_ret = GST_FLOW_OK;
self->send_initial_events = TRUE;
success =
gst_pad_start_task (pad, (GstTaskFunction) src_task_loop, self->src,
g_queue_foreach (&self->queue, (GFunc) gst_buffer_unref, NULL);
g_queue_clear (&self->queue);
self->flushing = TRUE;
+ self->src_ret = GST_FLOW_FLUSHING;
g_cond_signal (&self->queue_cond_add);
g_mutex_unlock (&self->queue_lock);
success = gst_pad_stop_task (pad);
GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s",
gst_flow_get_name (ret));
}
+ g_mutex_lock (&self->queue_lock);
+ self->src_ret = ret;
+ g_mutex_unlock (&self->queue_lock);
}
static GstFlowReturn
GstMapInfo map_info;
gint ret;
+ g_mutex_lock (&self->queue_lock);
+ if (self->src_ret != GST_FLOW_OK) {
+ if (G_UNLIKELY (self->src_ret == GST_FLOW_NOT_LINKED
+ || self->src_ret < GST_FLOW_EOS))
+ GST_ERROR_OBJECT (self, "Pushing previous data returned an error: %s",
+ gst_flow_get_name (self->src_ret));
+
+ gst_buffer_unref (buffer);
+ g_mutex_unlock (&self->queue_lock);
+ return self->src_ret;
+ }
+ g_mutex_unlock (&self->queue_lock);
+
gst_buffer_map (buffer, &map_info, GST_MAP_READ);
if (map_info.size) {
g_signal_emit (self, signals[SIGNAL_ON_KEY_RECEIVED], 0);
}
-static void
+static gboolean
on_send_data (GstDtlsConnection * connection, gconstpointer data, gint length,
GstDtlsEnc * self)
{
GstBuffer *buffer;
+ gboolean ret;
GST_DEBUG_OBJECT (self, "sending data from %s with length %d",
self->connection_id, length);
g_cond_signal (&self->queue_cond_add);
GST_TRACE_OBJECT (self, "send data: releasing lock");
+
+ ret = self->src_ret == GST_FLOW_OK;
g_mutex_unlock (&self->queue_lock);
+
+ return ret;
}