aggregator: Consistently lock some members
authorOlivier Crête <olivier.crete@collabora.com>
Wed, 21 Jan 2015 23:45:36 +0000 (18:45 -0500)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 2 Dec 2017 15:10:26 +0000 (15:10 +0000)
Some members sometimes used atomic access, sometimes where not locked at
all. Instead consistently use a mutex to protect them, also document
that.

https://bugzilla.gnome.org/show_bug.cgi?id=742684

libs/gst/base/gstaggregator.c

index adc70dc..0b9d8a2 100644 (file)
@@ -391,10 +391,12 @@ pad_not_ready:
 static void
 gst_aggregator_reset_flow_values (GstAggregator * self)
 {
+  GST_OBJECT_LOCK (self);
   self->priv->flow_return = GST_FLOW_FLUSHING;
   self->priv->send_stream_start = TRUE;
   self->priv->send_segment = TRUE;
   gst_segment_init (&self->segment, GST_FORMAT_TIME);
+  GST_OBJECT_UNLOCK (self);
 }
 
 static inline void
@@ -426,19 +428,21 @@ gst_aggregator_push_mandatory_events (GstAggregator * self)
     self->priv->srccaps = NULL;
   }
 
-  if (g_atomic_int_get (&self->priv->send_segment)) {
-    if (!g_atomic_int_get (&self->priv->flush_seeking)) {
-      GstEvent *segev = gst_event_new_segment (&self->segment);
-
-      if (!self->priv->seqnum)
-        self->priv->seqnum = gst_event_get_seqnum (segev);
-      else
-        gst_event_set_seqnum (segev, self->priv->seqnum);
+  GST_OBJECT_LOCK (self);
+  if (self->priv->send_segment && !self->priv->flush_seeking) {
+    GstEvent *segev = gst_event_new_segment (&self->segment);
+
+    if (!self->priv->seqnum)
+      self->priv->seqnum = gst_event_get_seqnum (segev);
+    else
+      gst_event_set_seqnum (segev, self->priv->seqnum);
+    self->priv->send_segment = FALSE;
+    GST_OBJECT_UNLOCK (self);
 
-      GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segev);
-      gst_pad_push_event (self->srcpad, segev);
-      g_atomic_int_set (&self->priv->send_segment, FALSE);
-    }
+    GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segev);
+    gst_pad_push_event (self->srcpad, segev);
+  } else {
+    GST_OBJECT_UNLOCK (self);
   }
 
   if (priv->tags && priv->tags_changed) {
@@ -478,14 +482,15 @@ gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
 {
   gst_aggregator_push_mandatory_events (self);
 
-  if (!g_atomic_int_get (&self->priv->flush_seeking) &&
-      gst_pad_is_active (self->srcpad)) {
+  GST_OBJECT_LOCK (self);
+  if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
+    GST_OBJECT_UNLOCK (self);
     return gst_pad_push (self->srcpad, buffer);
   } else {
     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
-        g_atomic_int_get (&self->priv->flush_seeking),
-        gst_pad_is_active (self->srcpad));
+        self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
+    GST_OBJECT_UNLOCK (self);
     gst_buffer_unref (buffer);
     return GST_FLOW_OK;
   }
@@ -641,9 +646,10 @@ gst_aggregator_aggregate_func (GstAggregator * self)
       gst_aggregator_push_eos (self);
     }
 
-    if (priv->flow_return == GST_FLOW_FLUSHING &&
-        g_atomic_int_get (&priv->flush_seeking))
+    GST_OBJECT_LOCK (self);
+    if (priv->flow_return == GST_FLOW_FLUSHING && priv->flush_seeking)
       priv->flow_return = GST_FLOW_OK;
+    GST_OBJECT_UNLOCK (self);
 
     GST_LOG_OBJECT (self, "flow return is %s",
         gst_flow_get_name (priv->flow_return));
@@ -722,33 +728,35 @@ gst_aggregator_flush (GstAggregator * self)
   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
 
   GST_DEBUG_OBJECT (self, "Flushing everything");
-  g_atomic_int_set (&priv->send_segment, TRUE);
-  g_atomic_int_set (&priv->flush_seeking, FALSE);
+  GST_OBJECT_LOCK (self);
+  priv->send_segment = TRUE;
+  priv->flush_seeking = FALSE;
   g_atomic_int_set (&priv->tags_changed, FALSE);
+  GST_OBJECT_UNLOCK (self);
   if (klass->flush)
     ret = klass->flush (self);
 
   return ret;
 }
 
+
+/* Called with GstAggregator's object lock held */
+
 static gboolean
-gst_aggregator_all_flush_stop_received (GstAggregator * self)
+gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
 {
   GList *tmp;
   GstAggregatorPad *tmppad;
 
-  GST_OBJECT_LOCK (self);
   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
     tmppad = (GstAggregatorPad *) tmp->data;
 
     if (_check_pending_flush_stop (tmppad) == FALSE) {
       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
-      GST_OBJECT_UNLOCK (self);
       return FALSE;
     }
   }
-  GST_OBJECT_UNLOCK (self);
 
   return TRUE;
 }
@@ -772,10 +780,12 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
     g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
   }
 
-  if (g_atomic_int_get (&priv->flush_seeking)) {
+  GST_OBJECT_LOCK (self);
+  if (priv->flush_seeking) {
     /* If flush_seeking we forward the first FLUSH_START */
-    if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start,
-            TRUE, FALSE) == TRUE) {
+    if (priv->pending_flush_start) {
+      priv->pending_flush_start = FALSE;
+      GST_OBJECT_UNLOCK (self);
 
       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
       gst_aggregator_stop_srcpad_task (self, event);
@@ -786,9 +796,11 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
       event = NULL;
     } else {
+      GST_OBJECT_UNLOCK (self);
       gst_event_unref (event);
     }
   } else {
+    GST_OBJECT_UNLOCK (self);
     gst_event_unref (event);
   }
   PAD_STREAM_UNLOCK (aggpad);
@@ -819,26 +831,29 @@ gst_aggregator_default_sink_event (GstAggregator * self,
       GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
 
       gst_aggregator_pad_flush (aggpad, self);
-      if (g_atomic_int_get (&priv->flush_seeking)) {
+      GST_OBJECT_LOCK (self);
+      if (priv->flush_seeking) {
         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
-
-        if (g_atomic_int_get (&priv->flush_seeking)) {
-          if (gst_aggregator_all_flush_stop_received (self)) {
-            /* That means we received FLUSH_STOP/FLUSH_STOP on
-             * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
-            gst_aggregator_flush (self);
-            gst_pad_push_event (self->srcpad, event);
-            event = NULL;
-            SRC_STREAM_LOCK (self);
-            priv->send_eos = TRUE;
-            SRC_STREAM_BROADCAST (self);
-            SRC_STREAM_UNLOCK (self);
-
-            GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
-            GST_PAD_STREAM_UNLOCK (self->srcpad);
-            gst_aggregator_start_srcpad_task (self);
-          }
+        if (gst_aggregator_all_flush_stop_received_locked (self)) {
+          GST_OBJECT_UNLOCK (self);
+          /* That means we received FLUSH_STOP/FLUSH_STOP on
+           * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
+          gst_aggregator_flush (self);
+          gst_pad_push_event (self->srcpad, event);
+          event = NULL;
+          SRC_STREAM_LOCK (self);
+          priv->send_eos = TRUE;
+          SRC_STREAM_BROADCAST (self);
+          SRC_STREAM_UNLOCK (self);
+
+          GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
+          GST_PAD_STREAM_UNLOCK (self->srcpad);
+          gst_aggregator_start_srcpad_task (self);
+        } else {
+          GST_OBJECT_UNLOCK (self);
         }
+      } else {
+        GST_OBJECT_UNLOCK (self);
       }
 
       /* We never forward the event */
@@ -1391,21 +1406,25 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
 
   flush = flags & GST_SEEK_FLAG_FLUSH;
 
+  GST_OBJECT_LOCK (self);
   if (flush) {
-    g_atomic_int_set (&priv->pending_flush_start, TRUE);
-    g_atomic_int_set (&priv->flush_seeking, TRUE);
+    priv->pending_flush_start = TRUE;
+    priv->flush_seeking = TRUE;
   }
 
   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
       stop_type, stop, NULL);
+  GST_OBJECT_UNLOCK (self);
 
   /* forward the seek upstream */
   evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush);
   event = NULL;
 
   if (!evdata.result || !evdata.one_actually_seeked) {
-    g_atomic_int_set (&priv->flush_seeking, FALSE);
-    g_atomic_int_set (&priv->pending_flush_start, FALSE);
+    GST_OBJECT_LOCK (self);
+    priv->flush_seeking = FALSE;
+    priv->pending_flush_start = FALSE;
+    GST_OBJECT_UNLOCK (self);
   }
 
   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);