sctpenc: Propagate downstream flow errors upstream
authorSebastian Dröge <sebastian@centricular.com>
Thu, 30 Jan 2020 13:58:30 +0000 (15:58 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Thu, 30 Jan 2020 13:58:30 +0000 (15:58 +0200)
Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/issues/1180

ext/sctp/gstsctpenc.c
ext/sctp/gstsctpenc.h

index 0122c52..48df206 100644 (file)
@@ -269,6 +269,7 @@ gst_sctp_enc_init (GstSctpEnc * self)
   gst_element_add_pad (GST_ELEMENT (self), self->src_pad);
 
   g_queue_init (&self->pending_pads);
+  self->src_ret = GST_FLOW_FLUSHING;
 }
 
 static void
@@ -338,6 +339,7 @@ gst_sctp_enc_change_state (GstElement * element, GstStateChange transition)
       break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
       self->need_segment = self->need_stream_start_caps = TRUE;
+      self->src_ret = GST_FLOW_OK;
       gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
       res = configure_association (self);
       break;
@@ -345,6 +347,7 @@ gst_sctp_enc_change_state (GstElement * element, GstStateChange transition)
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
       sctpenc_cleanup (self);
+      self->src_ret = GST_FLOW_FLUSHING;
       break;
     case GST_STATE_CHANGE_READY_TO_NULL:
       break;
@@ -500,9 +503,15 @@ gst_sctp_enc_srcpad_loop (GstPad * pad)
   }
 
   if (gst_data_queue_pop (self->outbound_sctp_packet_queue, &item)) {
-    flow_ret = gst_pad_push (self->src_pad, GST_BUFFER (item->object));
+    GstBuffer *buffer = GST_BUFFER (item->object);
+
+    flow_ret = gst_pad_push (self->src_pad, buffer);
     item->object = NULL;
 
+    GST_OBJECT_LOCK (self);
+    self->src_ret = flow_ret;
+    GST_OBJECT_UNLOCK (self);
+
     if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
             || flow_ret == GST_FLOW_NOT_LINKED)) {
       GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
@@ -521,6 +530,10 @@ gst_sctp_enc_srcpad_loop (GstPad * pad)
 
     item->destroy (item);
   } else {
+    GST_OBJECT_LOCK (self);
+    self->src_ret = GST_FLOW_FLUSHING;
+    GST_OBJECT_UNLOCK (self);
+
     GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
     gst_pad_pause_task (pad);
   }
@@ -541,6 +554,17 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   const GstMetaInfo *meta_info = GST_SCTP_SEND_META_INFO;
   GstFlowReturn flow_ret = GST_FLOW_ERROR;
 
+  GST_OBJECT_LOCK (self);
+  if (self->src_ret != GST_FLOW_OK) {
+    GST_ERROR_OBJECT (pad, "Pushing on source pad failed before: %s",
+        gst_flow_get_name (self->src_ret));
+    flow_ret = self->src_ret;
+    GST_OBJECT_UNLOCK (self);
+    gst_buffer_unref (buffer);
+    return flow_ret;
+  }
+  GST_OBJECT_UNLOCK (self);
+
   ppid = sctpenc_pad->ppid;
   ordered = sctpenc_pad->ordered;
   pr = sctpenc_pad->reliability;
@@ -617,6 +641,7 @@ error:
 static gboolean
 gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 {
+  GstSctpEnc *self = GST_SCTP_ENC (parent);
   GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
   gboolean ret, is_new_ppid;
   guint32 new_ppid;
@@ -656,6 +681,9 @@ gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       break;
     case GST_EVENT_FLUSH_STOP:
       sctpenc_pad->flushing = FALSE;
+      GST_OBJECT_LOCK (self);
+      self->src_ret = GST_FLOW_OK;
+      GST_OBJECT_UNLOCK (self);
       ret = gst_pad_event_default (pad, parent, event);
       break;
     default:
@@ -715,6 +743,9 @@ gst_sctp_enc_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
       gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
       self->need_segment = TRUE;
+      GST_OBJECT_LOCK (self);
+      self->src_ret = GST_FLOW_OK;
+      GST_OBJECT_UNLOCK (self);
       gst_pad_start_task (self->src_pad,
           (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
 
index a05bd4e..fd4e28e 100644 (file)
@@ -46,6 +46,7 @@ struct _GstSctpEnc
   GstElement element;
 
   GstPad *src_pad;
+  GstFlowReturn src_ret;
   gboolean need_stream_start_caps, need_segment;
   guint32 sctp_association_id;
   guint16 remote_sctp_port;