aggregator: Protect data with the same mutex as GCond
authorOlivier Crête <olivier.crete@collabora.com>
Sat, 10 Jan 2015 03:01:00 +0000 (22:01 -0500)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 2 Dec 2017 15:10:26 +0000 (15:10 +0000)
Whenever a GCond is used, the safest paradigm is to protect
the variable which change is signalled by the GCond with the same
mutex that the GCond depends on.

https://bugzilla.gnome.org/show_bug.cgi?id=742684

libs/gst/base/gstaggregator.c

index b735837..3b95e2b 100644 (file)
@@ -166,7 +166,7 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
         g_thread_self());                                                  \
   } G_STMT_END
 
-#define SRC_STREAM_BROADCAST_UNLOCKED(self) G_STMT_START {                 \
+#define SRC_STREAM_BROADCAST(self) G_STMT_START {                 \
     GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p",           \
         g_thread_self());                                                  \
     if (self->priv->aggregate_id)                                          \
@@ -174,12 +174,6 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
     g_cond_broadcast(&(self->priv->src_cond));                             \
   } G_STMT_END
 
-#define SRC_STREAM_BROADCAST(self) G_STMT_START {                          \
-    SRC_STREAM_LOCK (self);                                                \
-    SRC_STREAM_BROADCAST_UNLOCKED (self);                                  \
-    SRC_STREAM_UNLOCK (self);                                              \
-  } G_STMT_END
-
 struct _GstAggregatorPadPrivate
 {
   gboolean pending_flush_start;
@@ -217,7 +211,7 @@ struct _GstAggregatorPrivate
   gint padcount;
 
   /* Our state is >= PAUSED */
-  gboolean running;
+  gboolean running;             /* protected by SRC_STREAM_LOCK */
 
 
   gint seqnum;
@@ -690,15 +684,16 @@ gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
   GST_INFO_OBJECT (self, "%s srcpad task",
       flush_start ? "Pausing" : "Stopping");
 
+  SRC_STREAM_LOCK (self);
   self->priv->running = FALSE;
   SRC_STREAM_BROADCAST (self);
+  SRC_STREAM_UNLOCK (self);
 
   if (flush_start) {
     res = gst_pad_push_event (self->srcpad, flush_start);
   }
 
   gst_pad_stop_task (self->srcpad);
-  SRC_STREAM_BROADCAST (self);
 
   return res;
 }
@@ -827,9 +822,11 @@ gst_aggregator_default_sink_event (GstAggregator * self,
              * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
             gst_aggregator_flush (self);
             gst_pad_push_event (self->srcpad, event);
-            priv->send_eos = TRUE;
             event = NULL;
+            SRC_STREAM_LOCK (self);
+            priv->send_eos = TRUE;
             SRC_STREAM_BROADCAST (self);
+            SRC_STREAM_UNLOCK (self);
 
             GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
             GST_PAD_STREAM_UNLOCK (self->srcpad);
@@ -849,6 +846,7 @@ gst_aggregator_default_sink_event (GstAggregator * self,
        * check for it. Mark pending_eos, eos will be set when steal_buffer is
        * called
        */
+      SRC_STREAM_LOCK (self);
       PAD_LOCK_EVENT (aggpad);
       if (!aggpad->buffer) {
         aggpad->eos = TRUE;
@@ -858,6 +856,7 @@ gst_aggregator_default_sink_event (GstAggregator * self,
       PAD_UNLOCK_EVENT (aggpad);
 
       SRC_STREAM_BROADCAST (self);
+      SRC_STREAM_UNLOCK (self);
       goto eat;
     }
     case GST_EVENT_SEGMENT:
@@ -994,12 +993,14 @@ gst_aggregator_release_pad (GstElement * element, GstPad * pad)
 
   GST_INFO_OBJECT (pad, "Removing pad");
 
+  SRC_STREAM_LOCK (self);
   g_atomic_int_set (&aggpad->priv->flushing, TRUE);
   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
   gst_buffer_replace (&tmpbuf, NULL);
   gst_element_remove_pad (element, pad);
 
   SRC_STREAM_BROADCAST (self);
+  SRC_STREAM_UNLOCK (self);
 }
 
 static GstPad *
@@ -1260,8 +1261,12 @@ gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
       /* Wake up the src thread again, due to changed latencies
        * or changed live-ness we might have to adjust if we wait
        * on a deadline at all and how long.
+       * This is only to unschedule the clock id, we don't really care
+       * about the GCond here.
        */
+      SRC_STREAM_LOCK (self);
       SRC_STREAM_BROADCAST (self);
+      SRC_STREAM_UNLOCK (self);
       return ret;
     }
     default:
@@ -1757,6 +1762,7 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
     aggclass->clip (self, aggpad, buffer, &actual_buf);
   }
 
+  SRC_STREAM_LOCK (self);
   PAD_LOCK_EVENT (aggpad);
   if (aggpad->buffer)
     gst_buffer_unref (aggpad->buffer);
@@ -1764,9 +1770,8 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
   PAD_UNLOCK_EVENT (aggpad);
   PAD_STREAM_UNLOCK (aggpad);
 
-  SRC_STREAM_LOCK (self);
   if (gst_aggregator_check_pads_ready (self))
-    SRC_STREAM_BROADCAST_UNLOCKED (self);
+    SRC_STREAM_BROADCAST (self);
   SRC_STREAM_UNLOCK (self);
 
   GST_DEBUG_OBJECT (aggpad, "Done chaining");
@@ -1877,8 +1882,8 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad,
     PAD_BROADCAST_EVENT (aggpad);
     PAD_UNLOCK_EVENT (aggpad);
   } else {
-    g_atomic_int_set (&aggpad->priv->flushing, FALSE);
     PAD_LOCK_EVENT (aggpad);
+    g_atomic_int_set (&aggpad->priv->flushing, FALSE);
     PAD_BROADCAST_EVENT (aggpad);
     PAD_UNLOCK_EVENT (aggpad);
   }