/* queued message for transmission */
guint id;
- GAsyncQueue *messages;
+ GMutex *mutex;
+ GQueue *messages;
guint8 *write_data;
guint write_off;
guint write_size;
}
if (watch->writefd.revents & WRITE_COND) {
+ g_mutex_lock (watch->mutex);
do {
if (watch->write_data == NULL) {
GstRTSPRec *rec;
/* get a new message from the queue */
- rec = g_async_queue_try_pop (watch->messages);
+ rec = g_queue_pop_tail (watch->messages);
if (rec == NULL)
goto done;
res = write_bytes (watch->writefd.fd, watch->write_data,
&watch->write_off, watch->write_size);
+ g_mutex_unlock (watch->mutex);
if (res == GST_RTSP_EINTR)
goto write_blocked;
else if (G_LIKELY (res == GST_RTSP_OK)) {
else
goto error;
}
+ g_mutex_lock (watch->mutex);
g_free (watch->write_data);
watch->write_data = NULL;
watch->write_added = FALSE;
watch->writefd.revents = 0;
}
+
+ g_mutex_unlock (watch->mutex);
}
write_blocked:
build_reset (&watch->builder);
gst_rtsp_message_unset (&watch->message);
- g_async_queue_unref (watch->messages);
+ g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL);
+ g_queue_free (watch->messages);
watch->messages = NULL;
+ g_mutex_free (watch->mutex);
+
g_free (watch->write_data);
if (watch->notify)
result->conn = conn;
result->builder.state = STATE_START;
- result->messages = g_async_queue_new_full (gst_rtsp_rec_free);
+ result->mutex = g_mutex_new ();
+ result->messages = g_queue_new ();
result->readfd.fd = -1;
result->writefd.fd = -1;
g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (size != 0, GST_RTSP_EINVAL);
+ g_mutex_lock (watch->mutex);
+
/* make a record with the data and id */
rec = g_slice_new (GstRTSPRec);
rec->data = (guint8 *) data;
} while (G_UNLIKELY (rec->id == 0));
/* add the record to a queue. FIXME we would like to have an upper limit here */
- g_async_queue_push (watch->messages, rec);
+ g_queue_push_head (watch->messages, rec);
- /* FIXME: does the following need to be made thread-safe? (this might be
- * called from a streaming thread, like appsink's render function) */
/* make sure the main context will now also check for writability on the
* socket */
if (!watch->write_added) {
watch->write_added = TRUE;
}
+ g_mutex_unlock (watch->mutex);
return rec->id;
}