stream: add locking
authorWim Taymans <wim.taymans@collabora.co.uk>
Tue, 13 Nov 2012 10:14:49 +0000 (11:14 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Tue, 13 Nov 2012 10:14:49 +0000 (11:14 +0100)
gst/rtsp-server/rtsp-stream.c
gst/rtsp-server/rtsp-stream.h

index de35bc7..6a207a9 100644 (file)
@@ -57,8 +57,9 @@ gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
 }
 
 static void
-gst_rtsp_stream_init (GstRTSPStream * media)
+gst_rtsp_stream_init (GstRTSPStream * stream)
 {
+  g_mutex_init (&stream->lock);
 }
 
 static void
@@ -73,6 +74,7 @@ gst_rtsp_stream_finalize (GObject * obj)
 
   gst_object_unref (stream->payloader);
   gst_object_unref (stream->srcpad);
+  g_mutex_clear (&stream->lock);
 
   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
 }
@@ -140,6 +142,7 @@ gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
   return mtu;
 }
 
+/* must be called with lock */
 static gboolean
 alloc_ports (GstRTSPStream * stream)
 {
@@ -336,14 +339,16 @@ caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
 
   newcaps = gst_pad_get_current_caps (pad);
 
+  GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
+      newcaps);
+
+  g_mutex_lock (&stream->lock);
   oldcaps = stream->caps;
   stream->caps = newcaps;
+  g_mutex_unlock (&stream->lock);
 
   if (oldcaps)
     gst_caps_unref (oldcaps);
-
-  GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
-      newcaps);
 }
 
 static void
@@ -375,6 +380,7 @@ find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
   port = atoi (tmp + 1);
   dest = g_strndup (rtcp_from, tmp - rtcp_from);
 
+  g_mutex_lock (&stream->lock);
   GST_INFO ("finding %s:%d in %d transports", dest, port,
       g_list_length (stream->transports));
 
@@ -391,6 +397,8 @@ find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
       break;
     }
   }
+  g_mutex_unlock (&stream->lock);
+
   g_free (dest);
 
   return result;
@@ -518,6 +526,7 @@ handle_new_sample (GstAppSink * sink, gpointer user_data)
   stream = (GstRTSPStream *) user_data;
   buffer = gst_sample_get_buffer (sample);
 
+  g_mutex_lock (&stream->lock);
   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
 
@@ -527,6 +536,8 @@ handle_new_sample (GstAppSink * sink, gpointer user_data)
       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
     }
   }
+  g_mutex_unlock (&stream->lock);
+
   gst_sample_unref (sample);
 
   return GST_FLOW_OK;
@@ -565,8 +576,9 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
 
+  g_mutex_lock (&stream->lock);
   if (stream->is_joined)
-    return TRUE;
+    goto was_joined;
 
   /* create a session with the same index as the stream */
   idx = stream->idx;
@@ -736,12 +748,19 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
       (GCallback) caps_notify, stream);
 
   stream->is_joined = TRUE;
+  g_mutex_unlock (&stream->lock);
 
   return TRUE;
 
   /* ERRORS */
+was_joined:
+  {
+    g_mutex_unlock (&stream->lock);
+    return TRUE;
+  }
 no_ports:
   {
+    g_mutex_unlock (&stream->lock);
     GST_WARNING ("failed to allocate ports %d", idx);
     return FALSE;
   }
@@ -750,6 +769,7 @@ link_failed:
     GST_WARNING ("failed to link stream %d", idx);
     gst_object_unref (stream->send_rtp_sink);
     stream->send_rtp_sink = NULL;
+    g_mutex_unlock (&stream->lock);
     return FALSE;
   }
 }
@@ -775,8 +795,9 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
 
+  g_mutex_lock (&stream->lock);
   if (!stream->is_joined)
-    return TRUE;
+    goto was_not_joined;
 
   /* all transports must be removed by now */
   g_return_val_if_fail (stream->transports == NULL, FALSE);
@@ -828,8 +849,14 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
     gst_caps_unref (stream->caps);
 
   stream->is_joined = FALSE;
+  g_mutex_unlock (&stream->lock);
 
   return TRUE;
+
+was_not_joined:
+  {
+    return TRUE;
+  }
 }
 
 /**
@@ -876,12 +903,19 @@ GstFlowReturn
 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
 {
   GstFlowReturn ret;
+  GstElement *element;
 
   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
   g_return_val_if_fail (stream->is_joined, FALSE);
 
-  ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
+  g_mutex_lock (&stream->lock);
+  element = gst_object_ref (stream->appsrc[0]);
+  g_mutex_unlock (&stream->lock);
+
+  ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
+
+  gst_object_unref (element);
 
   return ret;
 }
@@ -902,16 +936,24 @@ GstFlowReturn
 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
 {
   GstFlowReturn ret;
+  GstElement *element;
 
   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
   g_return_val_if_fail (stream->is_joined, FALSE);
 
-  ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
+  g_mutex_lock (&stream->lock);
+  element = gst_object_ref (stream->appsrc[1]);
+  g_mutex_unlock (&stream->lock);
+
+  ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
+
+  gst_object_unref (element);
 
   return ret;
 }
 
+/* must be called with lock */
 static gboolean
 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
     gboolean add)
@@ -1002,12 +1044,18 @@ gboolean
 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
     GstRTSPStreamTransport * trans)
 {
+  gboolean res;
+
   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
   g_return_val_if_fail (stream->is_joined, FALSE);
   g_return_val_if_fail (trans->transport != NULL, FALSE);
 
-  return update_transport (stream, trans, TRUE);
+  g_mutex_lock (&stream->lock);
+  res = update_transport (stream, trans, TRUE);
+  g_mutex_unlock (&stream->lock);
+
+  return res;
 }
 
 /**
@@ -1028,10 +1076,16 @@ gboolean
 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
     GstRTSPStreamTransport * trans)
 {
+  gboolean res;
+
   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
   g_return_val_if_fail (stream->is_joined, FALSE);
   g_return_val_if_fail (trans->transport != NULL, FALSE);
 
-  return update_transport (stream, trans, FALSE);
+  g_mutex_lock (&stream->lock);
+  res = update_transport (stream, trans, FALSE);
+  g_mutex_unlock (&stream->lock);
+
+  return res;
 }
index 36d4767..166baff 100644 (file)
@@ -44,6 +44,7 @@ typedef struct _GstRTSPStreamClass GstRTSPStreamClass;
 /**
  * GstRTSPStream:
  * @parent: the parent instance
+ * @lock: mutex protecting the stream
  * @idx: the stream index
  * @srcpad: the srcpad of the stream
  * @payloader: the payloader of the format
@@ -72,6 +73,7 @@ typedef struct _GstRTSPStreamClass GstRTSPStreamClass;
 struct _GstRTSPStream {
   GObject       parent;
 
+  GMutex        lock;
   guint         idx;
   GstPad       *srcpad;
   GstElement   *payloader;