sctpenc: Prohibit sending of interleaved message parts
authorJohan Sternerup <johast@axis.com>
Thu, 10 Dec 2020 15:25:26 +0000 (16:25 +0100)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Tue, 11 Oct 2022 09:36:13 +0000 (09:36 +0000)
Apparently we cannot start sending messages from another datachannel
before the previous message was completely sent. usrsctplib will
complain about being locked on another stream id and set
errno=EINVAL.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2454>

subprojects/gst-plugins-bad/ext/sctp/gstsctpenc.c

index eb55904..3d94068 100644 (file)
@@ -103,6 +103,7 @@ struct _GstSctpEncPad
   GMutex lock;
   GCond cond;
   gboolean flushing;
+  gboolean clear_to_send;
 };
 
 G_DEFINE_TYPE (GstSctpEncPad, gst_sctp_enc_pad, GST_TYPE_PAD);
@@ -132,6 +133,7 @@ gst_sctp_enc_pad_init (GstSctpEncPad * self)
   g_mutex_init (&self->lock);
   g_cond_init (&self->cond);
   self->flushing = FALSE;
+  self->clear_to_send = FALSE;
 }
 
 static void gst_sctp_enc_finalize (GObject * object);
@@ -563,6 +565,7 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 {
   GstSctpEnc *self = GST_SCTP_ENC (parent);
   GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
+  GstSctpEncPad *sctpenc_pad_next = NULL;
   GstMapInfo map;
   guint32 ppid;
   gboolean ordered;
@@ -574,6 +577,7 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   GstFlowReturn flow_ret = GST_FLOW_ERROR;
   const guint8 *data;
   guint32 length;
+  gboolean clear_to_send;
 
   GST_OBJECT_LOCK (self);
   if (self->src_ret != GST_FLOW_OK) {
@@ -629,7 +633,21 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   data = map.data;
   length = map.size;
 
+  GST_OBJECT_LOCK (self);
+  clear_to_send = g_queue_is_empty (&self->pending_pads);
+  g_queue_push_tail (&self->pending_pads, sctpenc_pad);
+  GST_OBJECT_UNLOCK (self);
+
   g_mutex_lock (&sctpenc_pad->lock);
+
+  if (clear_to_send) {
+    sctpenc_pad->clear_to_send = TRUE;
+  }
+
+  while (!sctpenc_pad->flushing && !sctpenc_pad->clear_to_send) {
+    g_cond_wait (&sctpenc_pad->cond, &sctpenc_pad->lock);
+  }
+
   while (!sctpenc_pad->flushing) {
     guint32 bytes_sent;
 
@@ -658,15 +676,8 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
       length -= bytes_sent;
 
       /* The buffer was probably full. Retry in a while */
-      GST_OBJECT_LOCK (self);
-      g_queue_push_tail (&self->pending_pads, sctpenc_pad);
-      GST_OBJECT_UNLOCK (self);
-
       g_cond_wait_until (&sctpenc_pad->cond, &sctpenc_pad->lock, end_time);
 
-      GST_OBJECT_LOCK (self);
-      g_queue_remove (&self->pending_pads, sctpenc_pad);
-      GST_OBJECT_UNLOCK (self);
     } else if (bytes_sent == length) {
       GST_DEBUG_OBJECT (pad, "Successfully sent buffer");
       sctpenc_pad->bytes_sent += bytes_sent;
@@ -676,8 +687,21 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK;
 
 out:
+  sctpenc_pad->clear_to_send = FALSE;
   g_mutex_unlock (&sctpenc_pad->lock);
 
+  GST_OBJECT_LOCK (self);
+  g_queue_remove (&self->pending_pads, sctpenc_pad);
+  sctpenc_pad_next = g_queue_peek_head (&self->pending_pads);
+  GST_OBJECT_UNLOCK (self);
+
+  if (sctpenc_pad_next) {
+    g_mutex_lock (&sctpenc_pad_next->lock);
+    sctpenc_pad_next->clear_to_send = TRUE;
+    g_cond_signal (&sctpenc_pad_next->cond);
+    g_mutex_unlock (&sctpenc_pad_next->lock);
+  }
+
   gst_buffer_unmap (buffer, &map);
 error:
   gst_buffer_unref (buffer);
@@ -890,7 +914,6 @@ on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf,
   GstSctpEnc *self = user_data;
   GstBuffer *gstbuf;
   GstDataQueueItem *item;
-  GList *pending_pads, *l;
   GstSctpEncPad *sctpenc_pad;
 
   GST_DEBUG_OBJECT (self, "Received output packet of size %" G_GSIZE_FORMAT,
@@ -909,21 +932,22 @@ on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf,
     GST_DEBUG_OBJECT (self, "Failed to push item because we're flushing");
   }
 
-  /* Wake up pads in the order they waited, oldest pad first */
+  /* Wake up the oldest pad which is the one that needs to finish first */
   GST_OBJECT_LOCK (self);
-  pending_pads = NULL;
-  while ((sctpenc_pad = g_queue_pop_tail (&self->pending_pads))) {
-    pending_pads = g_list_prepend (pending_pads, sctpenc_pad);
-  }
-  GST_OBJECT_UNLOCK (self);
+  sctpenc_pad = g_queue_peek_head (&self->pending_pads);
+  if (sctpenc_pad) {
+    gst_object_ref (sctpenc_pad);
+
+    GST_OBJECT_UNLOCK (self);
 
-  for (l = pending_pads; l; l = l->next) {
-    sctpenc_pad = l->data;
     g_mutex_lock (&sctpenc_pad->lock);
     g_cond_signal (&sctpenc_pad->cond);
     g_mutex_unlock (&sctpenc_pad->lock);
+
+    gst_object_unref (sctpenc_pad);
+  } else {
+    GST_OBJECT_UNLOCK (self);
   }
-  g_list_free (pending_pads);
 }
 
 static void