media: add lock to protect state changes
authorWim Taymans <wim.taymans@collabora.co.uk>
Tue, 13 Nov 2012 10:15:35 +0000 (11:15 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Tue, 13 Nov 2012 10:15:35 +0000 (11:15 +0100)
gst/rtsp-server/rtsp-media.c
gst/rtsp-server/rtsp-media.h

index 352a045..17bc52a 100644 (file)
@@ -156,6 +156,7 @@ gst_rtsp_media_init (GstRTSPMedia * media)
   media->streams = g_ptr_array_new_with_free_func (g_object_unref);
   g_mutex_init (&media->lock);
   g_cond_init (&media->cond);
+  g_rec_mutex_init (&media->state_lock);
 
   media->shared = DEFAULT_SHARED;
   media->reusable = DEFAULT_REUSABLE;
@@ -187,6 +188,7 @@ gst_rtsp_media_finalize (GObject * obj)
   g_free (media->multicast_group);
   g_mutex_clear (&media->lock);
   g_cond_clear (&media->cond);
+  g_rec_mutex_clear (&media->state_lock);
 
   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
 }
@@ -267,6 +269,7 @@ do_loop (GstRTSPMediaClass * klass)
   return NULL;
 }
 
+/* must be called with state lock */
 static void
 collect_media_stats (GstRTSPMedia * media)
 {
@@ -349,7 +352,9 @@ gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
 {
   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
 
+  g_mutex_lock (&media->lock);
   media->shared = shared;
+  g_mutex_unlock (&media->lock);
 }
 
 /**
@@ -363,9 +368,15 @@ gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
 gboolean
 gst_rtsp_media_is_shared (GstRTSPMedia * media)
 {
+  gboolean res;
+
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
 
-  return media->shared;
+  g_mutex_lock (&media->lock);
+  res = media->shared;
+  g_mutex_unlock (&media->lock);
+
+  return res;
 }
 
 /**
@@ -381,7 +392,9 @@ gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
 {
   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
 
+  g_mutex_lock (&media->lock);
   media->reusable = reusable;
+  g_mutex_unlock (&media->lock);
 }
 
 /**
@@ -395,9 +408,15 @@ gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
 gboolean
 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
 {
+  gboolean res;
+
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
 
-  return media->reusable;
+  g_mutex_lock (&media->lock);
+  res = media->reusable;
+  g_mutex_unlock (&media->lock);
+
+  return res;
 }
 
 /**
@@ -412,7 +431,9 @@ gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
 {
   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
 
+  g_mutex_lock (&media->lock);
   media->protocols = protocols;
+  g_mutex_unlock (&media->lock);
 }
 
 /**
@@ -426,10 +447,16 @@ gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
 GstRTSPLowerTrans
 gst_rtsp_media_get_protocols (GstRTSPMedia * media)
 {
+  GstRTSPLowerTrans res;
+
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media),
       GST_RTSP_LOWER_TRANS_UNKNOWN);
 
-  return media->protocols;
+  g_mutex_lock (&media->lock);
+  res = media->protocols;
+  g_mutex_unlock (&media->lock);
+
+  return res;
 }
 
 /**
@@ -445,7 +472,9 @@ gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
 {
   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
 
+  g_mutex_lock (&media->lock);
   media->eos_shutdown = eos_shutdown;
+  g_mutex_unlock (&media->lock);
 }
 
 /**
@@ -460,9 +489,15 @@ gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
 gboolean
 gst_rtsp_media_is_eos_shutdown (GstRTSPMedia * media)
 {
+  gboolean res;
+
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
 
-  return media->eos_shutdown;
+  g_mutex_lock (&media->lock);
+  res = media->eos_shutdown;
+  g_mutex_unlock (&media->lock);
+
+  return res;
 }
 
 /**
@@ -477,7 +512,9 @@ gst_rtsp_media_set_buffer_size (GstRTSPMedia * media, guint size)
 {
   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
 
+  g_mutex_lock (&media->lock);
   media->buffer_size = size;
+  g_mutex_unlock (&media->lock);
 }
 
 /**
@@ -491,9 +528,15 @@ gst_rtsp_media_set_buffer_size (GstRTSPMedia * media, guint size)
 guint
 gst_rtsp_media_get_buffer_size (GstRTSPMedia * media)
 {
+  guint res;
+
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
 
-  return media->buffer_size;
+  g_mutex_unlock (&media->lock);
+  res = media->buffer_size;
+  g_mutex_unlock (&media->lock);
+
+  return res;
 }
 
 /**
@@ -516,11 +559,11 @@ gst_rtsp_media_set_multicast_group (GstRTSPMedia * media, const gchar * mc)
 
 /**
  * gst_rtsp_media_get_multicast_group:
- * @media: a #GstRTSPMedia
+ * @media: (transfer full): a #GstRTSPMedia
  *
  * Get the multicast group that media from @media will be streamed to.
  *
- * Returns: the multicast group
+ * Returns: the multicast group, g_free after usage.
  */
 gchar *
 gst_rtsp_media_get_multicast_group (GstRTSPMedia * media)
@@ -550,15 +593,15 @@ gst_rtsp_media_set_auth (GstRTSPMedia * media, GstRTSPAuth * auth)
 
   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
 
-  old = media->auth;
+  g_mutex_lock (&media->lock);
+  if ((old = media->auth) != auth)
+    media->auth = auth ? g_object_ref (auth) : NULL;
+  else
+    old = NULL;
+  g_mutex_unlock (&media->lock);
 
-  if (old != auth) {
-    if (auth)
-      g_object_ref (auth);
-    media->auth = auth;
-    if (old)
-      g_object_unref (old);
-  }
+  if (old)
+    g_object_unref (old);
 }
 
 /**
@@ -577,8 +620,10 @@ gst_rtsp_media_get_auth (GstRTSPMedia * media)
 
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
 
+  g_mutex_lock (&media->lock);
   if ((result = media->auth))
     g_object_ref (result);
+  g_mutex_unlock (&media->lock);
 
   return result;
 }
@@ -598,6 +643,7 @@ gst_rtsp_media_set_mtu (GstRTSPMedia * media, guint mtu)
 
   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
 
+  g_mutex_lock (&media->lock);
   media->mtu = mtu;
   for (i = 0; i < media->streams->len; i++) {
     GstRTSPStream *stream;
@@ -607,6 +653,7 @@ gst_rtsp_media_set_mtu (GstRTSPMedia * media, guint mtu)
     stream = g_ptr_array_index (media->streams, i);
     gst_rtsp_stream_set_mtu (stream, mtu);
   }
+  g_mutex_unlock (&media->lock);
 }
 
 /**
@@ -618,9 +665,15 @@ gst_rtsp_media_set_mtu (GstRTSPMedia * media, guint mtu)
 guint
 gst_rtsp_media_get_mtu (GstRTSPMedia * media)
 {
+  guint res;
+
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
 
-  return media->mtu;
+  g_mutex_lock (&media->lock);
+  res = media->mtu;
+  g_mutex_unlock (&media->lock);
+
+  return res;
 }
 
 /**
@@ -673,7 +726,9 @@ gst_rtsp_media_collect_streams (GstRTSPMedia * media)
 
       GST_INFO ("found dynamic element %d, %p", i, elem);
 
+      g_mutex_lock (&media->lock);
       media->dynamic = g_list_prepend (media->dynamic, elem);
+      g_mutex_unlock (&media->lock);
 
       have_elem = TRUE;
     }
@@ -707,6 +762,7 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
   g_return_val_if_fail (GST_PAD_IS_SRC (pad), NULL);
 
+  g_mutex_lock (&media->lock);
   idx = media->streams->len;
 
   name = g_strdup_printf ("src_%u", idx);
@@ -720,6 +776,7 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
     gst_rtsp_stream_set_mtu (stream, media->mtu);
 
   g_ptr_array_add (media->streams, stream);
+  g_mutex_unlock (&media->lock);
 
   return stream;
 }
@@ -735,9 +792,15 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
 guint
 gst_rtsp_media_n_streams (GstRTSPMedia * media)
 {
+  guint res;
+
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
 
-  return media->streams->len;
+  g_mutex_lock (&media->lock);
+  res = media->streams->len;
+  g_mutex_unlock (&media->lock);
+
+  return res;
 }
 
 /**
@@ -757,10 +820,12 @@ gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
 
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
 
+  g_mutex_lock (&media->lock);
   if (idx < media->streams->len)
     res = g_ptr_array_index (media->streams, idx);
   else
     res = NULL;
+  g_mutex_unlock (&media->lock);
 
   return res;
 }
@@ -780,6 +845,7 @@ gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
   gchar *result;
   GstRTSPTimeRange range;
 
+  g_mutex_lock (&media->lock);
   /* make copy */
   range = media->range;
 
@@ -787,6 +853,7 @@ gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
     range.min.type = GST_RTSP_TIME_NOW;
     range.min.seconds = -1;
   }
+  g_mutex_unlock (&media->lock);
 
   result = gst_rtsp_range_to_string (&range);
 
@@ -813,10 +880,9 @@ gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
   g_return_val_if_fail (range != NULL, FALSE);
 
-  if (!media->seekable) {
-    GST_INFO ("pipeline is not seekable");
-    return TRUE;
-  }
+  g_rec_mutex_lock (&media->state_lock);
+  if (!media->seekable)
+    goto not_seekable;
 
   if (range->unit != GST_RTSP_RANGE_NPT)
     goto not_supported;
@@ -880,17 +946,26 @@ gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
     GST_INFO ("no seek needed");
     res = TRUE;
   }
+  g_rec_mutex_unlock (&media->state_lock);
 
   return res;
 
   /* ERRORS */
+not_seekable:
+  {
+    g_rec_mutex_unlock (&media->state_lock);
+    GST_INFO ("pipeline is not seekable");
+    return TRUE;
+  }
 not_supported:
   {
+    g_rec_mutex_unlock (&media->state_lock);
     GST_WARNING ("seek unit %d not supported", range->unit);
     return FALSE;
   }
 weird_type:
   {
+    g_rec_mutex_unlock (&media->state_lock);
     GST_WARNING ("weird range type %d not supported", range->min.type);
     return FALSE;
   }
@@ -952,6 +1027,7 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message)
       if (media->is_live)
         break;
 
+      g_rec_mutex_lock (&media->state_lock);
       if (percent == 100) {
         /* a 100% message means buffering is done */
         media->buffering = FALSE;
@@ -974,6 +1050,7 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message)
           }
         }
         media->buffering = TRUE;
+        g_rec_mutex_unlock (&media->state_lock);
       }
       break;
     }
@@ -1011,6 +1088,7 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message)
     case GST_MESSAGE_STREAM_STATUS:
       break;
     case GST_MESSAGE_ASYNC_DONE:
+      g_rec_mutex_lock (&media->state_lock);
       if (!media->adding) {
         /* when we are dynamically adding pads, the addition of the udpsrc will
          * temporarily produce ASYNC_DONE messages. We have to ignore them and
@@ -1022,14 +1100,18 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message)
       } else {
         GST_INFO ("%p: ignoring ASYNC_DONE", media);
       }
+      g_rec_mutex_unlock (&media->state_lock);
       break;
     case GST_MESSAGE_EOS:
       GST_INFO ("%p: got EOS", media);
+
+      g_rec_mutex_lock (&media->state_lock);
       if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARING) {
         GST_DEBUG ("shutting down after EOS");
         finish_unprepare (media);
         g_object_unref (media);
       }
+      g_rec_mutex_unlock (&media->state_lock);
       break;
     default:
       GST_INFO ("%p: got message type %s", media,
@@ -1067,6 +1149,7 @@ pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad),
       stream->idx);
 
+  g_rec_mutex_lock (&media->state_lock);
   /* we will be adding elements below that will cause ASYNC_DONE to be
    * posted in the bus. We want to ignore those messages until the
    * pipeline really prerolled. */
@@ -1078,18 +1161,24 @@ pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
       media->rtpbin, GST_STATE_PAUSED);
 
   media->adding = FALSE;
+  g_rec_mutex_unlock (&media->state_lock);
 }
 
 static void
 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
 {
+  GstElement *fakesink;
+
+  g_mutex_lock (&media->lock);
   GST_INFO ("no more pads");
-  if (media->fakesink) {
-    gst_object_ref (media->fakesink);
-    gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
-    gst_element_set_state (media->fakesink, GST_STATE_NULL);
-    gst_object_unref (media->fakesink);
+  if ((fakesink = media->fakesink)) {
+    gst_object_ref (fakesink);
     media->fakesink = NULL;
+    g_mutex_unlock (&media->lock);
+
+    gst_bin_remove (GST_BIN (media->pipeline), fakesink);
+    gst_element_set_state (fakesink, GST_STATE_NULL);
+    gst_object_unref (fakesink);
     GST_INFO ("removed fakesink");
   }
 }
@@ -1116,6 +1205,7 @@ gst_rtsp_media_prepare (GstRTSPMedia * media)
   GstBus *bus;
   GList *walk;
 
+  g_rec_mutex_lock (&media->state_lock);
   if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
     goto was_prepared;
 
@@ -1202,6 +1292,7 @@ gst_rtsp_media_prepare (GstRTSPMedia * media)
     case GST_STATE_CHANGE_FAILURE:
       goto state_failed;
   }
+  g_rec_mutex_unlock (&media->state_lock);
 
   /* now wait for all pads to be prerolled, FIXME, we should somehow be
    * able to do this async so that we don't block the server thread. */
@@ -1218,16 +1309,19 @@ gst_rtsp_media_prepare (GstRTSPMedia * media)
   /* OK */
 was_prepared:
   {
+    g_rec_mutex_unlock (&media->state_lock);
     return TRUE;
   }
   /* ERRORS */
 is_reused:
   {
+    g_rec_mutex_unlock (&media->state_lock);
     GST_WARNING ("can not reuse media %p", media);
     return FALSE;
   }
 no_rtpbin:
   {
+    g_rec_mutex_unlock (&media->state_lock);
     GST_WARNING ("no rtpbin element");
     g_warning ("failed to create element 'rtpbin', check your installation");
     return FALSE;
@@ -1236,10 +1330,12 @@ state_failed:
   {
     GST_WARNING ("failed to preroll pipeline");
     gst_rtsp_media_unprepare (media);
+    g_rec_mutex_unlock (&media->state_lock);
     return FALSE;
   }
 }
 
+/* must be called with state-lock */
 static void
 finish_unprepare (GstRTSPMedia * media)
 {
@@ -1275,6 +1371,7 @@ finish_unprepare (GstRTSPMedia * media)
   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
 }
 
+/* called with state-lock */
 static gboolean
 default_unprepare (GstRTSPMedia * media)
 {
@@ -1308,8 +1405,9 @@ gst_rtsp_media_unprepare (GstRTSPMedia * media)
 {
   gboolean success;
 
+  g_rec_mutex_lock (&media->state_lock);
   if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
-    return TRUE;
+    goto was_unprepared;
 
   GST_INFO ("unprepare media %p", media);
   media->target_state = GST_STATE_NULL;
@@ -1324,8 +1422,16 @@ gst_rtsp_media_unprepare (GstRTSPMedia * media)
   } else {
     finish_unprepare (media);
   }
+  g_rec_mutex_unlock (&media->state_lock);
 
   return success;
+
+was_unprepared:
+  {
+    g_rec_mutex_unlock (&media->state_lock);
+    GST_INFO ("media %p was already unprepared", media);
+    return TRUE;
+  }
 }
 
 /**
@@ -1349,6 +1455,8 @@ gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
   g_return_val_if_fail (transports != NULL, FALSE);
 
+  g_rec_mutex_lock (&media->state_lock);
+
   /* NULL and READY are the same */
   if (state == GST_STATE_READY)
     state = GST_STATE_NULL;
@@ -1427,5 +1535,7 @@ gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
           old_active != media->n_active))
     collect_media_stats (media);
 
+  g_rec_mutex_unlock (&media->state_lock);
+
   return TRUE;
 }
index 9a99f99..bf2d0e5 100644 (file)
@@ -116,6 +116,7 @@ struct _GstRTSPMedia {
   guint              mtu;
 
   GstElement        *element;
+  GRecMutex          state_lock;
   GPtrArray         *streams;
   GList             *dynamic;
   GstRTSPMediaStatus status;