rtsp: Made gst_rtsp_watch_queue_data() thread safe.
authorPeter Kjellerstedt <pkj@axis.com>
Mon, 17 Aug 2009 09:46:32 +0000 (11:46 +0200)
committerPeter Kjellerstedt <pkj@axis.com>
Mon, 24 Aug 2009 11:19:46 +0000 (13:19 +0200)
gst-libs/gst/rtsp/gstrtspconnection.c

index 43ab6d3..26972c1 100644 (file)
@@ -2963,7 +2963,8 @@ struct _GstRTSPWatch
 
   /* queued message for transmission */
   guint id;
-  GAsyncQueue *messages;
+  GMutex *mutex;
+  GQueue *messages;
   guint8 *write_data;
   guint write_off;
   guint write_size;
@@ -3087,12 +3088,13 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
   }
 
   if (watch->writefd.revents & WRITE_COND) {
+    g_mutex_lock (watch->mutex);
     do {
       if (watch->write_data == NULL) {
         GstRTSPRec *rec;
 
         /* get a new message from the queue */
-        rec = g_async_queue_try_pop (watch->messages);
+        rec = g_queue_pop_tail (watch->messages);
         if (rec == NULL)
           goto done;
 
@@ -3106,6 +3108,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
 
       res = write_bytes (watch->writefd.fd, watch->write_data,
           &watch->write_off, watch->write_size);
+      g_mutex_unlock (watch->mutex);
       if (res == GST_RTSP_EINTR)
         goto write_blocked;
       else if (G_LIKELY (res == GST_RTSP_OK)) {
@@ -3118,6 +3121,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
         else
           goto error;
       }
+      g_mutex_lock (watch->mutex);
 
       g_free (watch->write_data);
       watch->write_data = NULL;
@@ -3129,6 +3133,8 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
       watch->write_added = FALSE;
       watch->writefd.revents = 0;
     }
+
+    g_mutex_unlock (watch->mutex);
   }
 
 write_blocked:
@@ -3166,9 +3172,12 @@ gst_rtsp_source_finalize (GSource * source)
   build_reset (&watch->builder);
   gst_rtsp_message_unset (&watch->message);
 
-  g_async_queue_unref (watch->messages);
+  g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL);
+  g_queue_free (watch->messages);
   watch->messages = NULL;
 
+  g_mutex_free (watch->mutex);
+
   g_free (watch->write_data);
 
   if (watch->notify)
@@ -3221,7 +3230,8 @@ gst_rtsp_watch_new (GstRTSPConnection * conn,
   result->conn = conn;
   result->builder.state = STATE_START;
 
-  result->messages = g_async_queue_new_full (gst_rtsp_rec_free);
+  result->mutex = g_mutex_new ();
+  result->messages = g_queue_new ();
 
   result->readfd.fd = -1;
   result->writefd.fd = -1;
@@ -3332,6 +3342,8 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (size != 0, GST_RTSP_EINVAL);
 
+  g_mutex_lock (watch->mutex);
+
   /* make a record with the data and id */
   rec = g_slice_new (GstRTSPRec);
   rec->data = (guint8 *) data;
@@ -3342,10 +3354,8 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
   } while (G_UNLIKELY (rec->id == 0));
 
   /* add the record to a queue. FIXME we would like to have an upper limit here */
-  g_async_queue_push (watch->messages, rec);
+  g_queue_push_head (watch->messages, rec);
 
-  /* FIXME: does the following need to be made thread-safe? (this might be
-   * called from a streaming thread, like appsink's render function) */
   /* make sure the main context will now also check for writability on the
    * socket */
   if (!watch->write_added) {
@@ -3353,6 +3363,7 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
     watch->write_added = TRUE;
   }
 
+  g_mutex_unlock (watch->mutex);
   return rec->id;
 }