aggregator: Replace event lock with pad's object lock
authorOlivier Crête <olivier.crete@collabora.com>
Wed, 14 Jan 2015 19:35:15 +0000 (14:35 -0500)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 2 Dec 2017 15:10:26 +0000 (15:10 +0000)
Reduce the number of locks simplify code, what is protects
is exposed, but the lock was not.

Also means adding an _unlocked version of gst_aggregator_pad_steal_buffer().

https://bugzilla.gnome.org/show_bug.cgi?id=742684

libs/gst/base/gstaggregator.c
libs/gst/base/gstaggregator.h

index 3b95e2b..4f67b85 100644 (file)
@@ -78,19 +78,19 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
 #define GST_CAT_DEFAULT aggregator_debug
 
 /* GstAggregatorPad definitions */
-#define PAD_LOCK_EVENT(pad)   G_STMT_START {                            \
-  GST_TRACE_OBJECT (pad, "Taking EVENT lock from thread %p",            \
+#define PAD_LOCK(pad)   G_STMT_START {                            \
+  GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",            \
         g_thread_self());                                               \
-  g_mutex_lock(&pad->priv->event_lock);                                 \
-  GST_TRACE_OBJECT (pad, "Took EVENT lock from thread %p",              \
+  GST_OBJECT_LOCK (pad);                                                \
+  GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",              \
         g_thread_self());                                               \
   } G_STMT_END
 
-#define PAD_UNLOCK_EVENT(pad)  G_STMT_START {                           \
-  GST_TRACE_OBJECT (pad, "Releasing EVENT lock from thread %p",         \
+#define PAD_UNLOCK(pad)  G_STMT_START {                           \
+  GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",         \
         g_thread_self());                                               \
-  g_mutex_unlock(&pad->priv->event_lock);                               \
-  GST_TRACE_OBJECT (pad, "Release EVENT lock from thread %p",           \
+  GST_OBJECT_UNLOCK (pad);                                              \
+  GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",           \
         g_thread_self());                                               \
   } G_STMT_END
 
@@ -99,7 +99,7 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
   GST_LOG_OBJECT (pad, "Waiting for EVENT on thread %p",                \
         g_thread_self());                                               \
   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
-      &(pad->priv->event_lock));                                        \
+      GST_OBJECT_GET_LOCK (pad));                                       \
   GST_LOG_OBJECT (pad, "DONE Waiting for EVENT on thread %p",           \
         g_thread_self());                                               \
   } G_STMT_END
@@ -181,7 +181,6 @@ struct _GstAggregatorPadPrivate
   gboolean pending_eos;
   gboolean flushing;
 
-  GMutex event_lock;
   GCond event_cond;
 
   GMutex stream_lock;
@@ -847,13 +846,13 @@ gst_aggregator_default_sink_event (GstAggregator * self,
        * called
        */
       SRC_STREAM_LOCK (self);
-      PAD_LOCK_EVENT (aggpad);
+      PAD_LOCK (aggpad);
       if (!aggpad->buffer) {
         aggpad->eos = TRUE;
       } else {
         aggpad->priv->pending_eos = TRUE;
       }
-      PAD_UNLOCK_EVENT (aggpad);
+      PAD_UNLOCK (aggpad);
 
       SRC_STREAM_BROADCAST (self);
       SRC_STREAM_UNLOCK (self);
@@ -861,10 +860,10 @@ gst_aggregator_default_sink_event (GstAggregator * self,
     }
     case GST_EVENT_SEGMENT:
     {
-      PAD_LOCK_EVENT (aggpad);
+      PAD_LOCK (aggpad);
       gst_event_copy_segment (event, &aggpad->segment);
       self->priv->seqnum = gst_event_get_seqnum (event);
-      PAD_UNLOCK_EVENT (aggpad);
+      PAD_UNLOCK (aggpad);
       goto eat;
     }
     case GST_EVENT_STREAM_START:
@@ -1747,13 +1746,13 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
   if (g_atomic_int_get (&aggpad->priv->pending_eos) == TRUE)
     goto eos;
 
-  PAD_LOCK_EVENT (aggpad);
+  PAD_LOCK (aggpad);
 
   while (aggpad->buffer && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
     PAD_WAIT_EVENT (aggpad);
   }
-  PAD_UNLOCK_EVENT (aggpad);
+  PAD_UNLOCK (aggpad);
 
   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
     goto flushing;
@@ -1763,11 +1762,11 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
   }
 
   SRC_STREAM_LOCK (self);
-  PAD_LOCK_EVENT (aggpad);
+  PAD_LOCK (aggpad);
   if (aggpad->buffer)
     gst_buffer_unref (aggpad->buffer);
   aggpad->buffer = actual_buf;
-  PAD_UNLOCK_EVENT (aggpad);
+  PAD_UNLOCK (aggpad);
   PAD_STREAM_UNLOCK (aggpad);
 
   if (gst_aggregator_check_pads_ready (self))
@@ -1803,10 +1802,10 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
 
   if (GST_QUERY_IS_SERIALIZED (query)) {
-    PAD_LOCK_EVENT (aggpad);
+    PAD_LOCK (aggpad);
 
     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) {
-      PAD_UNLOCK_EVENT (aggpad);
+      PAD_UNLOCK (aggpad);
       goto flushing;
     }
 
@@ -1815,7 +1814,7 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
       PAD_WAIT_EVENT (aggpad);
     }
-    PAD_UNLOCK_EVENT (aggpad);
+    PAD_UNLOCK (aggpad);
 
     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
       goto flushing;
@@ -1838,11 +1837,11 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
 
   if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
       && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) {
-    PAD_LOCK_EVENT (aggpad);
+    PAD_LOCK (aggpad);
 
     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
-      PAD_UNLOCK_EVENT (aggpad);
+      PAD_UNLOCK (aggpad);
       goto flushing;
     }
 
@@ -1851,7 +1850,7 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
       PAD_WAIT_EVENT (aggpad);
     }
-    PAD_UNLOCK_EVENT (aggpad);
+    PAD_UNLOCK (aggpad);
 
     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
@@ -1876,16 +1875,16 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad,
   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
 
   if (active == FALSE) {
-    PAD_LOCK_EVENT (aggpad);
+    PAD_LOCK (aggpad);
     g_atomic_int_set (&aggpad->priv->flushing, TRUE);
     gst_buffer_replace (&aggpad->buffer, NULL);
     PAD_BROADCAST_EVENT (aggpad);
-    PAD_UNLOCK_EVENT (aggpad);
+    PAD_UNLOCK (aggpad);
   } else {
-    PAD_LOCK_EVENT (aggpad);
+    PAD_LOCK (aggpad);
     g_atomic_int_set (&aggpad->priv->flushing, FALSE);
     PAD_BROADCAST_EVENT (aggpad);
-    PAD_UNLOCK_EVENT (aggpad);
+    PAD_UNLOCK (aggpad);
   }
 
   return TRUE;
@@ -1916,7 +1915,6 @@ gst_aggregator_pad_finalize (GObject * object)
 {
   GstAggregatorPad *pad = (GstAggregatorPad *) object;
 
-  g_mutex_clear (&pad->priv->event_lock);
   g_cond_clear (&pad->priv->event_cond);
   g_mutex_clear (&pad->priv->stream_lock);
 
@@ -1956,27 +1954,27 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
       GstAggregatorPadPrivate);
 
   pad->buffer = NULL;
-  g_mutex_init (&pad->priv->event_lock);
   g_cond_init (&pad->priv->event_cond);
 
   g_mutex_init (&pad->priv->stream_lock);
 }
 
 /**
- * gst_aggregator_pad_steal_buffer:
+ * gst_aggregator_pad_steal_buffer_unlocked:
  * @pad: the pad to get buffer from
  *
  * Steal the ref to the buffer currently queued in @pad.
  *
+ * MUST be called with the pad's object lock held.
+ *
  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
  *   queued. You should unref the buffer after usage.
  */
 GstBuffer *
-gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
+gst_aggregator_pad_steal_buffer_unlocked (GstAggregatorPad * pad)
 {
   GstBuffer *buffer = NULL;
 
-  PAD_LOCK_EVENT (pad);
   if (pad->buffer) {
     GST_TRACE_OBJECT (pad, "Consuming buffer");
     buffer = pad->buffer;
@@ -1988,7 +1986,27 @@ gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
     PAD_BROADCAST_EVENT (pad);
     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
   }
-  PAD_UNLOCK_EVENT (pad);
+
+  return buffer;
+}
+
+/**
+ * gst_aggregator_pad_steal_buffer:
+ * @pad: the pad to get buffer from
+ *
+ * Steal the ref to the buffer currently queued in @pad.
+ *
+ * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
+ *   queued. You should unref the buffer after usage.
+ */
+GstBuffer *
+gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
+{
+  GstBuffer *buffer = NULL;
+
+  PAD_LOCK (pad);
+  buffer = gst_aggregator_pad_steal_buffer_unlocked (pad);
+  PAD_UNLOCK (pad);
 
   return buffer;
 }
@@ -2006,10 +2024,10 @@ gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
 {
   GstBuffer *buffer = NULL;
 
-  PAD_LOCK_EVENT (pad);
+  PAD_LOCK (pad);
   if (pad->buffer)
     buffer = gst_buffer_ref (pad->buffer);
-  PAD_UNLOCK_EVENT (pad);
+  PAD_UNLOCK (pad);
 
   return buffer;
 }
index 638030a..33c88cc 100644 (file)
@@ -104,6 +104,7 @@ GType gst_aggregator_pad_get_type           (void);
  ***************************/
 
 GstBuffer * gst_aggregator_pad_steal_buffer (GstAggregatorPad *  pad);
+GstBuffer * gst_aggregator_pad_steal_buffer_unlocked (GstAggregatorPad *  pad);
 GstBuffer * gst_aggregator_pad_get_buffer   (GstAggregatorPad *  pad);
 
 /*********************