From 4753588b09f0b16908be5186e7b7d3ab212d0cdc Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 13 Nov 2012 11:14:49 +0100 Subject: [PATCH] stream: add locking --- gst/rtsp-server/rtsp-stream.c | 74 +++++++++++++++++++++++++++++++++++++------ gst/rtsp-server/rtsp-stream.h | 2 ++ 2 files changed, 66 insertions(+), 10 deletions(-) diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index de35bc7..6a207a9 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -57,8 +57,9 @@ gst_rtsp_stream_class_init (GstRTSPStreamClass * klass) } static void -gst_rtsp_stream_init (GstRTSPStream * media) +gst_rtsp_stream_init (GstRTSPStream * stream) { + g_mutex_init (&stream->lock); } static void @@ -73,6 +74,7 @@ gst_rtsp_stream_finalize (GObject * obj) gst_object_unref (stream->payloader); gst_object_unref (stream->srcpad); + g_mutex_clear (&stream->lock); G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj); } @@ -140,6 +142,7 @@ gst_rtsp_stream_get_mtu (GstRTSPStream * stream) return mtu; } +/* must be called with lock */ static gboolean alloc_ports (GstRTSPStream * stream) { @@ -336,14 +339,16 @@ caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream) newcaps = gst_pad_get_current_caps (pad); + GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps, + newcaps); + + g_mutex_lock (&stream->lock); oldcaps = stream->caps; stream->caps = newcaps; + g_mutex_unlock (&stream->lock); if (oldcaps) gst_caps_unref (oldcaps); - - GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps, - newcaps); } static void @@ -375,6 +380,7 @@ find_transport (GstRTSPStream * stream, const gchar * rtcp_from) port = atoi (tmp + 1); dest = g_strndup (rtcp_from, tmp - rtcp_from); + g_mutex_lock (&stream->lock); GST_INFO ("finding %s:%d in %d transports", dest, port, g_list_length (stream->transports)); @@ -391,6 +397,8 @@ find_transport (GstRTSPStream * stream, const gchar * rtcp_from) break; } } + g_mutex_unlock (&stream->lock); + g_free (dest); return result; @@ -518,6 +526,7 @@ handle_new_sample (GstAppSink * sink, gpointer user_data) stream = (GstRTSPStream *) user_data; buffer = gst_sample_get_buffer (sample); + g_mutex_lock (&stream->lock); for (walk = stream->transports; walk; walk = g_list_next (walk)) { GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; @@ -527,6 +536,8 @@ handle_new_sample (GstAppSink * sink, gpointer user_data) gst_rtsp_stream_transport_send_rtcp (tr, buffer); } } + g_mutex_unlock (&stream->lock); + gst_sample_unref (sample); return GST_FLOW_OK; @@ -565,8 +576,9 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, g_return_val_if_fail (GST_IS_BIN (bin), FALSE); g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE); + g_mutex_lock (&stream->lock); if (stream->is_joined) - return TRUE; + goto was_joined; /* create a session with the same index as the stream */ idx = stream->idx; @@ -736,12 +748,19 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, (GCallback) caps_notify, stream); stream->is_joined = TRUE; + g_mutex_unlock (&stream->lock); return TRUE; /* ERRORS */ +was_joined: + { + g_mutex_unlock (&stream->lock); + return TRUE; + } no_ports: { + g_mutex_unlock (&stream->lock); GST_WARNING ("failed to allocate ports %d", idx); return FALSE; } @@ -750,6 +769,7 @@ link_failed: GST_WARNING ("failed to link stream %d", idx); gst_object_unref (stream->send_rtp_sink); stream->send_rtp_sink = NULL; + g_mutex_unlock (&stream->lock); return FALSE; } } @@ -775,8 +795,9 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, g_return_val_if_fail (GST_IS_BIN (bin), FALSE); g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE); + g_mutex_lock (&stream->lock); if (!stream->is_joined) - return TRUE; + goto was_not_joined; /* all transports must be removed by now */ g_return_val_if_fail (stream->transports == NULL, FALSE); @@ -828,8 +849,14 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, gst_caps_unref (stream->caps); stream->is_joined = FALSE; + g_mutex_unlock (&stream->lock); return TRUE; + +was_not_joined: + { + return TRUE; + } } /** @@ -876,12 +903,19 @@ GstFlowReturn gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer) { GstFlowReturn ret; + GstElement *element; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); g_return_val_if_fail (stream->is_joined, FALSE); - ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer); + g_mutex_lock (&stream->lock); + element = gst_object_ref (stream->appsrc[0]); + g_mutex_unlock (&stream->lock); + + ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer); + + gst_object_unref (element); return ret; } @@ -902,16 +936,24 @@ GstFlowReturn gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer) { GstFlowReturn ret; + GstElement *element; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); g_return_val_if_fail (stream->is_joined, FALSE); - ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer); + g_mutex_lock (&stream->lock); + element = gst_object_ref (stream->appsrc[1]); + g_mutex_unlock (&stream->lock); + + ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer); + + gst_object_unref (element); return ret; } +/* must be called with lock */ static gboolean update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans, gboolean add) @@ -1002,12 +1044,18 @@ gboolean gst_rtsp_stream_add_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans) { + gboolean res; + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE); g_return_val_if_fail (stream->is_joined, FALSE); g_return_val_if_fail (trans->transport != NULL, FALSE); - return update_transport (stream, trans, TRUE); + g_mutex_lock (&stream->lock); + res = update_transport (stream, trans, TRUE); + g_mutex_unlock (&stream->lock); + + return res; } /** @@ -1028,10 +1076,16 @@ gboolean gst_rtsp_stream_remove_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans) { + gboolean res; + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE); g_return_val_if_fail (stream->is_joined, FALSE); g_return_val_if_fail (trans->transport != NULL, FALSE); - return update_transport (stream, trans, FALSE); + g_mutex_lock (&stream->lock); + res = update_transport (stream, trans, FALSE); + g_mutex_unlock (&stream->lock); + + return res; } diff --git a/gst/rtsp-server/rtsp-stream.h b/gst/rtsp-server/rtsp-stream.h index 36d4767..166baff 100644 --- a/gst/rtsp-server/rtsp-stream.h +++ b/gst/rtsp-server/rtsp-stream.h @@ -44,6 +44,7 @@ typedef struct _GstRTSPStreamClass GstRTSPStreamClass; /** * GstRTSPStream: * @parent: the parent instance + * @lock: mutex protecting the stream * @idx: the stream index * @srcpad: the srcpad of the stream * @payloader: the payloader of the format @@ -72,6 +73,7 @@ typedef struct _GstRTSPStreamClass GstRTSPStreamClass; struct _GstRTSPStream { GObject parent; + GMutex lock; guint idx; GstPad *srcpad; GstElement *payloader; -- 2.7.4