basesrc: Protect segment values from concurrent access from different threads
authorSebastian Dröge <sebastian.droege@collabora.co.uk>
Tue, 9 Feb 2010 16:52:13 +0000 (17:52 +0100)
committerSebastian Dröge <sebastian.droege@collabora.co.uk>
Fri, 12 Feb 2010 10:02:08 +0000 (11:02 +0100)
This could happen easily in the query functions or when the size is set
on appsrc from some non-streaming thread.

libs/gst/base/gstbasesrc.c

index 39f748340c1434c41aa2dfde4ca3a50b687b90fc..da724fa484b9955fed1b8ed8156bcfe7f641c045 100644 (file)
@@ -560,7 +560,9 @@ gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
 {
   g_return_if_fail (GST_IS_BASE_SRC (src));
 
+  GST_OBJECT_LOCK (src);
   gst_segment_init (&src->segment, format);
+  GST_OBJECT_UNLOCK (src);
 }
 
 /**
@@ -733,6 +735,7 @@ gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
       GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
       GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
 
+  GST_OBJECT_LOCK (src);
   if (src->data.ABI.running) {
     if (src->priv->close_segment)
       gst_event_unref (src->priv->close_segment);
@@ -760,6 +763,7 @@ gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
         src->segment.rate, src->segment.applied_rate, src->segment.format,
         src->segment.start, src->segment.last_stop, src->segment.time);
   }
+  GST_OBJECT_UNLOCK (src);
 
   src->priv->discont = TRUE;
   src->data.ABI.running = TRUE;
@@ -840,8 +844,10 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
           gint64 position;
           gint64 duration;
 
+          GST_OBJECT_LOCK (src);
           position = src->segment.last_stop;
           duration = src->segment.duration;
+          GST_OBJECT_UNLOCK (src);
 
           if (position != -1 && duration != -1) {
             if (position < duration)
@@ -859,13 +865,17 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
         default:
         {
           gint64 position;
+          GstFormat seg_format;
 
+          GST_OBJECT_LOCK (src);
           position = src->segment.last_stop;
+          seg_format = src->segment.format;
+          GST_OBJECT_UNLOCK (src);
 
           if (position != -1) {
             /* convert to requested format */
             res =
-                gst_pad_query_convert (src->srcpad, src->segment.format,
+                gst_pad_query_convert (src->srcpad, seg_format,
                 position, &format, &position);
           } else
             res = TRUE;
@@ -894,15 +904,19 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
         default:
         {
           gint64 duration;
+          GstFormat seg_format;
 
+          GST_OBJECT_LOCK (src);
           /* this is the duration as configured by the subclass. */
           duration = src->segment.duration;
+          seg_format = src->segment.format;
+          GST_OBJECT_UNLOCK (src);
 
           if (duration != -1) {
             /* convert to requested format, if this fails, we have a duration
              * but we cannot answer the query, we must return FALSE. */
             res =
-                gst_pad_query_convert (src->srcpad, src->segment.format,
+                gst_pad_query_convert (src->srcpad, seg_format,
                 duration, &format, &duration);
           } else {
             /* The subclass did not configure a duration, we assume that the
@@ -920,12 +934,18 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
 
     case GST_QUERY_SEEKING:
     {
-      GstFormat format;
+      GstFormat format, seg_format;
+      gint64 duration;
+
+      GST_OBJECT_LOCK (src);
+      duration = src->segment.duration;
+      seg_format = src->segment.format;
+      GST_OBJECT_UNLOCK (src);
 
       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
-      if (format == src->segment.format) {
-        gst_query_set_seeking (query, src->segment.format,
-            gst_base_src_seekable (src), 0, src->segment.duration);
+      if (format == seg_format) {
+        gst_query_set_seeking (query, seg_format,
+            gst_base_src_seekable (src), 0, duration);
         res = TRUE;
       } else {
         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
@@ -940,6 +960,7 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
     {
       gint64 start, stop;
 
+      GST_OBJECT_LOCK (src);
       /* no end segment configured, current duration then */
       if ((stop = src->segment.stop) == -1)
         stop = src->segment.duration;
@@ -953,6 +974,7 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
       }
       gst_query_set_segment (query, src->segment.rate, src->segment.format,
           start, stop);
+      GST_OBJECT_UNLOCK (src);
       res = TRUE;
       break;
     }
@@ -1002,7 +1024,7 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
       break;
     case GST_QUERY_BUFFERING:
     {
-      GstFormat format;
+      GstFormat format, seg_format;
       gint64 start, stop, estimated;
 
       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
@@ -1010,6 +1032,7 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
       GST_DEBUG_OBJECT (src, "buffering query in format %s",
           gst_format_get_name (format));
 
+      GST_OBJECT_LOCK (src);
       if (src->random_access) {
         estimated = 0;
         start = 0;
@@ -1022,17 +1045,20 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
         start = -1;
         stop = -1;
       }
+      seg_format = src->segment.format;
+      GST_OBJECT_UNLOCK (src);
+
       /* convert to required format. When the conversion fails, we can't answer
        * the query. When the value is unknown, we can don't perform conversion
        * but report TRUE. */
       if (format != GST_FORMAT_PERCENT && stop != -1) {
-        res = gst_pad_query_convert (src->srcpad, src->segment.format,
+        res = gst_pad_query_convert (src->srcpad, seg_format,
             stop, &format, &stop);
       } else {
         res = TRUE;
       }
       if (res && format != GST_FORMAT_PERCENT && start != -1)
-        res = gst_pad_query_convert (src->srcpad, src->segment.format,
+        res = gst_pad_query_convert (src->srcpad, seg_format,
             start, &format, &start);
 
       gst_query_set_buffering_range (query, format, start, stop, estimated);
@@ -1252,7 +1278,9 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
 
   GST_DEBUG_OBJECT (src, "doing seek");
 
+  GST_OBJECT_LOCK (src);
   dest_format = src->segment.format;
+  GST_OBJECT_UNLOCK (src);
 
   if (event) {
     gst_event_parse_seek (event, &rate, &seek_format, &flags,
@@ -1312,11 +1340,13 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
    * copy the current segment info into the temp segment that we can actually
    * attempt the seek with. We only update the real segment if the seek suceeds. */
   if (!seekseg_configured) {
+    GST_OBJECT_LOCK (src);
     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
+    GST_OBJECT_UNLOCK (src);
 
     /* now configure the final seek segment */
     if (event) {
-      if (src->segment.format != seek_format) {
+      if (seeksegment.format != seek_format) {
         /* OK, here's where we give the subclass a chance to convert the relative
          * seek into an absolute one in the processing format. We set up any
          * absolute seek above, before taking the stream lock. */
@@ -1353,6 +1383,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
      * are not yet providing data as we still have the STREAM_LOCK. */
     gst_pad_push_event (src->srcpad, tevent);
   } else if (res && src->data.ABI.running) {
+    GST_OBJECT_LOCK (src);
     /* we are running the current segment and doing a non-flushing seek,
      * close the segment first based on the last_stop. */
     GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT
@@ -1366,6 +1397,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
         src->segment.rate, src->segment.applied_rate, src->segment.format,
         src->segment.start, src->segment.last_stop, src->segment.time);
     gst_event_set_seqnum (src->priv->close_segment, seqnum);
+    GST_OBJECT_UNLOCK (src);
   }
 
   /* The subclass must have converted the segment to the processing format
@@ -1379,13 +1411,15 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
   /* if the seek was successful, we update our real segment and push
    * out the new segment. */
   if (res) {
+    GST_OBJECT_LOCK (src);
     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
+    GST_OBJECT_UNLOCK (src);
 
-    if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
+    if (seeksegment.flags & GST_SEEK_FLAG_SEGMENT) {
       GstMessage *message;
 
       message = gst_message_new_segment_start (GST_OBJECT (src),
-          src->segment.format, src->segment.last_stop);
+          seeksegment.format, seeksegment.last_stop);
       gst_message_set_seqnum (message, seqnum);
 
       gst_element_post_message (GST_ELEMENT (src), message);
@@ -1393,28 +1427,28 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
 
     /* for deriving a stop position for the playback segment from the seek
      * segment, we must take the duration when the stop is not set */
-    if ((stop = src->segment.stop) == -1)
-      stop = src->segment.duration;
+    if ((stop = seeksegment.stop) == -1)
+      stop = seeksegment.duration;
 
     GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT
-        " to %" G_GINT64_FORMAT, src->segment.start, stop);
+        " to %" G_GINT64_FORMAT, seeksegment.start, stop);
 
     /* now replace the old segment so that we send it in the stream thread the
      * next time it is scheduled. */
     if (src->priv->start_segment)
       gst_event_unref (src->priv->start_segment);
-    if (src->segment.rate >= 0.0) {
+    if (seeksegment.rate >= 0.0) {
       /* forward, we send data from last_stop to stop */
       src->priv->start_segment =
           gst_event_new_new_segment_full (FALSE,
-          src->segment.rate, src->segment.applied_rate, src->segment.format,
-          src->segment.last_stop, stop, src->segment.time);
+          seeksegment.rate, seeksegment.applied_rate, seeksegment.format,
+          seeksegment.last_stop, stop, seeksegment.time);
     } else {
       /* reverse, we send data from last_stop to start */
       src->priv->start_segment =
           gst_event_new_new_segment_full (FALSE,
-          src->segment.rate, src->segment.applied_rate, src->segment.format,
-          src->segment.start, src->segment.last_stop, src->segment.time);
+          seeksegment.rate, seeksegment.applied_rate, seeksegment.format,
+          seeksegment.start, seeksegment.last_stop, seeksegment.time);
     }
     gst_event_set_seqnum (src->priv->start_segment, seqnum);
   }
@@ -1928,27 +1962,33 @@ gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
 {
   guint64 size, maxsize;
   GstBaseSrcClass *bclass;
+  GstFormat format;
+  gint64 stop;
 
   bclass = GST_BASE_SRC_GET_CLASS (src);
 
-  /* only operate if we are working with bytes */
-  if (src->segment.format != GST_FORMAT_BYTES)
-    return TRUE;
-
+  GST_OBJECT_LOCK (src);
+  format = src->segment.format;
+  stop = src->segment.stop;
   /* get total file size */
   size = (guint64) src->segment.duration;
+  GST_OBJECT_UNLOCK (src);
+
+  /* only operate if we are working with bytes */
+  if (format != GST_FORMAT_BYTES)
+    return TRUE;
 
   /* the max amount of bytes to read is the total size or
    * up to the segment.stop if present. */
-  if (src->segment.stop != -1)
-    maxsize = MIN (size, src->segment.stop);
+  if (stop != -1)
+    maxsize = MIN (size, stop);
   else
     maxsize = size;
 
   GST_DEBUG_OBJECT (src,
       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
-      *length, size, src->segment.stop, maxsize);
+      *length, size, stop, maxsize);
 
   /* check size if we have one */
   if (maxsize != -1) {
@@ -1960,12 +2000,14 @@ gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
         if (!bclass->get_size (src, &size))
           size = -1;
 
+      GST_OBJECT_LOCK (src);
       gst_segment_set_duration (&src->segment, GST_FORMAT_BYTES, size);
+      GST_OBJECT_UNLOCK (src);
 
       /* make sure we don't exceed the configured segment stop
        * if it was set */
-      if (src->segment.stop != -1)
-        maxsize = MIN (size, src->segment.stop);
+      if (stop != -1)
+        maxsize = MIN (size, stop);
       else
         maxsize = size;
 
@@ -1982,7 +2024,9 @@ gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
 
   /* keep track of current position. segment is in bytes, we checked
    * that above. */
+  GST_OBJECT_LOCK (src);
   gst_segment_set_last_stop (&src->segment, GST_FORMAT_BYTES, offset);
+  GST_OBJECT_UNLOCK (src);
 
   return TRUE;
 
@@ -2272,6 +2316,7 @@ gst_base_src_loop (GstPad * pad)
 
   blocksize = src->blocksize;
 
+  GST_OBJECT_LOCK (src);
   /* if we operate in bytes, we can calculate an offset */
   if (src->segment.format == GST_FORMAT_BYTES) {
     position = src->segment.last_stop;
@@ -2288,6 +2333,7 @@ gst_base_src_loop (GstPad * pad)
     }
   } else
     position = -1;
+  GST_OBJECT_UNLOCK (src);
 
   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %lu",
       GST_TIME_ARGS (position), blocksize);
@@ -2328,6 +2374,7 @@ gst_base_src_loop (GstPad * pad)
     g_list_free (tags);
   }
 
+  GST_OBJECT_LOCK (src);
   /* figure out the new position */
   switch (src->segment.format) {
     case GST_FORMAT_BYTES:
@@ -2392,6 +2439,7 @@ gst_base_src_loop (GstPad * pad)
     }
     gst_segment_set_last_stop (&src->segment, src->segment.format, position);
   }
+  GST_OBJECT_UNLOCK (src);
 
   if (G_UNLIKELY (src->priv->discont)) {
     buf = gst_buffer_make_metadata_writable (buf);
@@ -2434,12 +2482,22 @@ pause:
     gst_pad_pause_task (pad);
     if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
       if (ret == GST_FLOW_UNEXPECTED) {
+        gboolean flag_segment;
+        GstFormat format;
+        gint64 last_stop;
+
         /* perform EOS logic */
-        if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
+        GST_OBJECT_LOCK (src);
+        flag_segment = (src->segment.flags & GST_SEEK_FLAG_SEGMENT) != 0;
+        format = src->segment.format;
+        last_stop = src->segment.last_stop;
+        GST_OBJECT_UNLOCK (src);
+
+        if (flag_segment) {
           GstMessage *message;
 
           message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
-              src->segment.format, src->segment.last_stop);
+              format, last_stop);
           gst_message_set_seqnum (message, src->priv->seqnum);
           gst_element_post_message (GST_ELEMENT_CAST (src), message);
         } else {
@@ -2580,6 +2638,7 @@ gst_base_src_start (GstBaseSrc * basesrc)
   gboolean result;
   guint64 size;
   gboolean seekable;
+  GstFormat format;
 
   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
     return TRUE;
@@ -2588,7 +2647,10 @@ gst_base_src_start (GstBaseSrc * basesrc)
 
   basesrc->num_buffers_left = basesrc->num_buffers;
 
+  GST_OBJECT_LOCK (basesrc);
   gst_segment_init (&basesrc->segment, basesrc->segment.format);
+  GST_OBJECT_UNLOCK (basesrc);
+
   basesrc->data.ABI.running = FALSE;
 
   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
@@ -2602,8 +2664,12 @@ gst_base_src_start (GstBaseSrc * basesrc)
 
   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
 
+  GST_OBJECT_LOCK (basesrc);
+  format = basesrc->segment.format;
+  GST_OBJECT_UNLOCK (basesrc);
+
   /* figure out the size */
-  if (basesrc->segment.format == GST_FORMAT_BYTES) {
+  if (format == GST_FORMAT_BYTES) {
     if (bclass->get_size) {
       if (!(result = bclass->get_size (basesrc, &size)))
         size = -1;
@@ -2614,22 +2680,22 @@ gst_base_src_start (GstBaseSrc * basesrc)
     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
     /* only update the size when operating in bytes, subclass is supposed
      * to set duration in the start method for other formats */
+    GST_OBJECT_LOCK (basesrc);
     gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size);
+    GST_OBJECT_UNLOCK (basesrc);
   } else {
     size = -1;
   }
 
   GST_DEBUG_OBJECT (basesrc,
       "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
-      G_GINT64_FORMAT, basesrc->segment.format, result, size,
-      basesrc->segment.duration);
+      G_GINT64_FORMAT, format, result, size, basesrc->segment.duration);
 
   seekable = gst_base_src_seekable (basesrc);
   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
 
   /* update for random access flag */
-  basesrc->random_access = seekable &&
-      basesrc->segment.format == GST_FORMAT_BYTES;
+  basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
 
   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);