From: Peter Kjellerstedt Date: Mon, 17 Aug 2009 09:46:32 +0000 (+0200) Subject: rtsp: Made gst_rtsp_watch_queue_data() thread safe. X-Git-Tag: 1.19.3~511^2~9307 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=0af04aa4a8bdb8837f9290fde7c729a17fdb5947;p=platform%2Fupstream%2Fgstreamer.git rtsp: Made gst_rtsp_watch_queue_data() thread safe. --- diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c index 43ab6d3..26972c1 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.c +++ b/gst-libs/gst/rtsp/gstrtspconnection.c @@ -2963,7 +2963,8 @@ struct _GstRTSPWatch /* queued message for transmission */ guint id; - GAsyncQueue *messages; + GMutex *mutex; + GQueue *messages; guint8 *write_data; guint write_off; guint write_size; @@ -3087,12 +3088,13 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, } if (watch->writefd.revents & WRITE_COND) { + g_mutex_lock (watch->mutex); do { if (watch->write_data == NULL) { GstRTSPRec *rec; /* get a new message from the queue */ - rec = g_async_queue_try_pop (watch->messages); + rec = g_queue_pop_tail (watch->messages); if (rec == NULL) goto done; @@ -3106,6 +3108,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, res = write_bytes (watch->writefd.fd, watch->write_data, &watch->write_off, watch->write_size); + g_mutex_unlock (watch->mutex); if (res == GST_RTSP_EINTR) goto write_blocked; else if (G_LIKELY (res == GST_RTSP_OK)) { @@ -3118,6 +3121,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, else goto error; } + g_mutex_lock (watch->mutex); g_free (watch->write_data); watch->write_data = NULL; @@ -3129,6 +3133,8 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, watch->write_added = FALSE; watch->writefd.revents = 0; } + + g_mutex_unlock (watch->mutex); } write_blocked: @@ -3166,9 +3172,12 @@ gst_rtsp_source_finalize (GSource * source) build_reset (&watch->builder); gst_rtsp_message_unset (&watch->message); - g_async_queue_unref (watch->messages); + g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL); + g_queue_free (watch->messages); watch->messages = NULL; + g_mutex_free (watch->mutex); + g_free (watch->write_data); if (watch->notify) @@ -3221,7 +3230,8 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, result->conn = conn; result->builder.state = STATE_START; - result->messages = g_async_queue_new_full (gst_rtsp_rec_free); + result->mutex = g_mutex_new (); + result->messages = g_queue_new (); result->readfd.fd = -1; result->writefd.fd = -1; @@ -3332,6 +3342,8 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data, g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (size != 0, GST_RTSP_EINVAL); + g_mutex_lock (watch->mutex); + /* make a record with the data and id */ rec = g_slice_new (GstRTSPRec); rec->data = (guint8 *) data; @@ -3342,10 +3354,8 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data, } while (G_UNLIKELY (rec->id == 0)); /* add the record to a queue. FIXME we would like to have an upper limit here */ - g_async_queue_push (watch->messages, rec); + g_queue_push_head (watch->messages, rec); - /* FIXME: does the following need to be made thread-safe? (this might be - * called from a streaming thread, like appsink's render function) */ /* make sure the main context will now also check for writability on the * socket */ if (!watch->write_added) { @@ -3353,6 +3363,7 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data, watch->write_added = TRUE; } + g_mutex_unlock (watch->mutex); return rec->id; }