}
static void
-gst_rtsp_stream_init (GstRTSPStream * media)
+gst_rtsp_stream_init (GstRTSPStream * stream)
{
+ g_mutex_init (&stream->lock);
}
static void
gst_object_unref (stream->payloader);
gst_object_unref (stream->srcpad);
+ g_mutex_clear (&stream->lock);
G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
}
return mtu;
}
+/* must be called with lock */
static gboolean
alloc_ports (GstRTSPStream * stream)
{
newcaps = gst_pad_get_current_caps (pad);
+ GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
+ newcaps);
+
+ g_mutex_lock (&stream->lock);
oldcaps = stream->caps;
stream->caps = newcaps;
+ g_mutex_unlock (&stream->lock);
if (oldcaps)
gst_caps_unref (oldcaps);
-
- GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
- newcaps);
}
static void
port = atoi (tmp + 1);
dest = g_strndup (rtcp_from, tmp - rtcp_from);
+ g_mutex_lock (&stream->lock);
GST_INFO ("finding %s:%d in %d transports", dest, port,
g_list_length (stream->transports));
break;
}
}
+ g_mutex_unlock (&stream->lock);
+
g_free (dest);
return result;
stream = (GstRTSPStream *) user_data;
buffer = gst_sample_get_buffer (sample);
+ g_mutex_lock (&stream->lock);
for (walk = stream->transports; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
gst_rtsp_stream_transport_send_rtcp (tr, buffer);
}
}
+ g_mutex_unlock (&stream->lock);
+
gst_sample_unref (sample);
return GST_FLOW_OK;
g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
+ g_mutex_lock (&stream->lock);
if (stream->is_joined)
- return TRUE;
+ goto was_joined;
/* create a session with the same index as the stream */
idx = stream->idx;
(GCallback) caps_notify, stream);
stream->is_joined = TRUE;
+ g_mutex_unlock (&stream->lock);
return TRUE;
/* ERRORS */
+was_joined:
+ {
+ g_mutex_unlock (&stream->lock);
+ return TRUE;
+ }
no_ports:
{
+ g_mutex_unlock (&stream->lock);
GST_WARNING ("failed to allocate ports %d", idx);
return FALSE;
}
GST_WARNING ("failed to link stream %d", idx);
gst_object_unref (stream->send_rtp_sink);
stream->send_rtp_sink = NULL;
+ g_mutex_unlock (&stream->lock);
return FALSE;
}
}
g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
+ g_mutex_lock (&stream->lock);
if (!stream->is_joined)
- return TRUE;
+ goto was_not_joined;
/* all transports must be removed by now */
g_return_val_if_fail (stream->transports == NULL, FALSE);
gst_caps_unref (stream->caps);
stream->is_joined = FALSE;
+ g_mutex_unlock (&stream->lock);
return TRUE;
+
+was_not_joined:
+ {
+ return TRUE;
+ }
}
/**
gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
{
GstFlowReturn ret;
+ GstElement *element;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
g_return_val_if_fail (stream->is_joined, FALSE);
- ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
+ g_mutex_lock (&stream->lock);
+ element = gst_object_ref (stream->appsrc[0]);
+ g_mutex_unlock (&stream->lock);
+
+ ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
+
+ gst_object_unref (element);
return ret;
}
gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
{
GstFlowReturn ret;
+ GstElement *element;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
g_return_val_if_fail (stream->is_joined, FALSE);
- ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
+ g_mutex_lock (&stream->lock);
+ element = gst_object_ref (stream->appsrc[1]);
+ g_mutex_unlock (&stream->lock);
+
+ ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
+
+ gst_object_unref (element);
return ret;
}
+/* must be called with lock */
static gboolean
update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
gboolean add)
gst_rtsp_stream_add_transport (GstRTSPStream * stream,
GstRTSPStreamTransport * trans)
{
+ gboolean res;
+
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
g_return_val_if_fail (stream->is_joined, FALSE);
g_return_val_if_fail (trans->transport != NULL, FALSE);
- return update_transport (stream, trans, TRUE);
+ g_mutex_lock (&stream->lock);
+ res = update_transport (stream, trans, TRUE);
+ g_mutex_unlock (&stream->lock);
+
+ return res;
}
/**
gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
GstRTSPStreamTransport * trans)
{
+ gboolean res;
+
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
g_return_val_if_fail (stream->is_joined, FALSE);
g_return_val_if_fail (trans->transport != NULL, FALSE);
- return update_transport (stream, trans, FALSE);
+ g_mutex_lock (&stream->lock);
+ res = update_transport (stream, trans, FALSE);
+ g_mutex_unlock (&stream->lock);
+
+ return res;
}