sctpenc: Correctly log/handle errors and handle short writes
authorSebastian Dröge <sebastian@centricular.com>
Thu, 30 Jan 2020 14:11:57 +0000 (16:11 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Fri, 31 Jan 2020 06:55:10 +0000 (08:55 +0200)
ext/sctp/gstsctpenc.c
ext/sctp/sctpassociation.c

index 9c5bf0b..ad8caf7 100644 (file)
@@ -564,6 +564,8 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   GstMeta *meta;
   const GstMetaInfo *meta_info = GST_SCTP_SEND_META_INFO;
   GstFlowReturn flow_ret = GST_FLOW_ERROR;
+  const guint8 *data;
+  guint32 length;
 
   GST_OBJECT_LOCK (self);
   if (self->src_ret != GST_FLOW_OK) {
@@ -611,23 +613,32 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
     goto error;
   }
 
+  data = map.data;
+  length = map.size;
+
   g_mutex_lock (&sctpenc_pad->lock);
   while (!sctpenc_pad->flushing) {
-    gboolean data_sent = FALSE;
+    gint32 bytes_sent;
 
     g_mutex_unlock (&sctpenc_pad->lock);
 
-    data_sent =
-        gst_sctp_association_send_data (self->sctp_association, map.data,
-        map.size, sctpenc_pad->stream_id, ppid, ordered, pr, pr_param);
+    bytes_sent =
+        gst_sctp_association_send_data (self->sctp_association, data,
+        length, sctpenc_pad->stream_id, ppid, ordered, pr, pr_param);
 
     g_mutex_lock (&sctpenc_pad->lock);
-    if (data_sent) {
-      sctpenc_pad->bytes_sent += map.size;
-      break;
-    } else if (!sctpenc_pad->flushing) {
+    if (bytes_sent < 0) {
+      GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL),
+          ("Failed to send data"));
+      flow_ret = GST_FLOW_ERROR;
+      goto out;
+    } else if (bytes_sent < length && !sctpenc_pad->flushing) {
       gint64 end_time = g_get_monotonic_time () + BUFFER_FULL_SLEEP_TIME;
 
+      sctpenc_pad->bytes_sent += bytes_sent;
+      data += bytes_sent;
+      length -= bytes_sent;
+
       /* The buffer was probably full. Retry in a while */
       GST_OBJECT_LOCK (self);
       g_queue_push_tail (&self->pending_pads, sctpenc_pad);
@@ -638,9 +649,14 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
       GST_OBJECT_LOCK (self);
       g_queue_remove (&self->pending_pads, sctpenc_pad);
       GST_OBJECT_UNLOCK (self);
+    } else if (bytes_sent == length) {
+      sctpenc_pad->bytes_sent += bytes_sent;
+      break;
     }
   }
   flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK;
+
+out:
   g_mutex_unlock (&sctpenc_pad->lock);
 
   gst_buffer_unmap (buffer, &map);
index 2a72364..8265db4 100644 (file)
@@ -29,6 +29,7 @@
 
 #include "sctpassociation.h"
 
+#include <gst/gst.h>
 #include <string.h>
 #include <errno.h>
 #include <stdlib.h>
@@ -430,19 +431,20 @@ gst_sctp_association_incoming_packet (GstSctpAssociation * self,
   usrsctp_conninput ((void *) self, (const void *) buf, (size_t) length, 0);
 }
 
-gboolean
+gint32
 gst_sctp_association_send_data (GstSctpAssociation * self, const guint8 * buf,
     guint32 length, guint16 stream_id, guint32 ppid, gboolean ordered,
     GstSctpAssociationPartialReliability pr, guint32 reliability_param)
 {
   struct sctp_sendv_spa spa;
-  gint32 bytes_sent;
-  gboolean result = FALSE;
+  gint32 bytes_sent = -1;
   struct sockaddr_conn remote_addr;
 
   g_rec_mutex_lock (&self->association_mutex);
-  if (self->state != GST_SCTP_ASSOCIATION_STATE_CONNECTED)
+  if (self->state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
+    GST_ERROR_OBJECT (self, "Association not connected yet");
     goto end;
+  }
 
   memset (&spa, 0, sizeof (spa));
 
@@ -470,19 +472,19 @@ gst_sctp_association_send_data (GstSctpAssociation * self, const guint8 * buf,
       (socklen_t) sizeof (struct sctp_sendv_spa), SCTP_SENDV_SPA, 0);
   if (bytes_sent < 0) {
     if (errno == EAGAIN || errno == EWOULDBLOCK) {
+      bytes_sent = 0;
       /* Resending this buffer is taken care of by the gstsctpenc */
       goto end;
     } else {
-      g_warning ("Error sending data on stream %u: (%u) %s", stream_id, errno,
-          strerror (errno));
+      GST_ERROR_OBJECT (self, "Error sending data on stream %u: (%u) %s",
+          stream_id, errno, g_strerror (errno));
       goto end;
     }
   }
 
-  result = TRUE;
 end:
   g_rec_mutex_unlock (&self->association_mutex);
-  return result;
+  return bytes_sent;
 }