GSocketConnection *connection;
GCancellable *cancellable;
GSocketClient *socket_client;
- GAsyncQueue *output_queue, *stats_queue;
+ GAsyncQueue *output_queue;
GMainContext *main_context;
GSource *input_source;
gboolean writing;
+ /* Protects the values below during concurrent access.
+ * - Taken by the loop thread when writing, but not reading.
+ * - Taken by other threads when reading (calling get_stats).
+ */
+ GMutex stats_lock;
+
/* RTMP configuration */
guint32 in_chunk_size;
guint32 out_chunk_size, out_chunk_size_pending;
rtmpconnection->cancellable = g_cancellable_new ();
rtmpconnection->output_queue =
g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
- rtmpconnection->stats_queue =
- g_async_queue_new_full ((GDestroyNotify) gst_structure_free);
rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
rtmpconnection->input_bytes = g_byte_array_sized_new (2 * READ_SIZE);
rtmpconnection->input_needed_bytes = 1;
+
+ g_mutex_init (&rtmpconnection->stats_lock);
}
void
/* clean up object here */
+ g_mutex_clear (&rtmpconnection->stats_lock);
g_clear_object (&rtmpconnection->cancellable);
g_clear_object (&rtmpconnection->connection);
g_clear_pointer (&rtmpconnection->output_queue, g_async_queue_unref);
- g_clear_pointer (&rtmpconnection->stats_queue, g_async_queue_unref);
g_clear_pointer (&rtmpconnection->input_streams, gst_rtmp_chunk_streams_free);
g_clear_pointer (&rtmpconnection->output_streams,
gst_rtmp_chunk_streams_free);
GST_TRACE_OBJECT (sc, "read %" G_GSIZE_FORMAT " bytes", ret);
+ g_mutex_lock (&sc->stats_lock);
sc->in_bytes_total += ret;
+ g_mutex_unlock (&sc->stats_lock);
+
bytes_since_ack = sc->in_bytes_total - sc->in_bytes_acked;
if (sc->in_window_ack_size && bytes_since_ack >= sc->in_window_ack_size) {
gst_rtmp_connection_send_ack (sc);
res = gst_rtmp_output_stream_write_all_buffer_finish (os, result,
&bytes_written, &error);
+ g_mutex_lock (&self->stats_lock);
self->out_bytes_total += bytes_written;
+ g_mutex_unlock (&self->stats_lock);
if (!res) {
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
"peer requested small chunk size %" G_GUINT32_FORMAT, chunk_size);
}
+ g_mutex_lock (&self->stats_lock);
self->in_chunk_size = chunk_size;
+ g_mutex_unlock (&self->stats_lock);
}
static void
GST_LOG_OBJECT (self, "Peer acknowledged %" G_GUINT64_FORMAT " bytes",
new_ack - last_ack);
+ g_mutex_lock (&self->stats_lock);
self->out_bytes_acked = new_ack;
+ g_mutex_unlock (&self->stats_lock);
}
static void
window_ack_size);
}
+ g_mutex_lock (&self->stats_lock);
self->in_window_ack_size = window_ack_size;
+ g_mutex_unlock (&self->stats_lock);
}
static gboolean
gst_rtmp_connection_queue_message (connection,
gst_rtmp_message_new_protocol_control (&pc));
+ g_mutex_lock (&connection->stats_lock);
connection->in_bytes_acked = in_bytes_total;
+ g_mutex_unlock (&connection->stats_lock);
}
static void
chunk_size = self->out_chunk_size_pending;
if (chunk_size) {
- self->out_chunk_size = chunk_size;
self->out_chunk_size_pending = 0;
+
+ g_mutex_lock (&self->stats_lock);
+ self->out_chunk_size = chunk_size;
+ g_mutex_unlock (&self->stats_lock);
+
GST_INFO_OBJECT (self, "applied chunk size %" G_GUINT32_FORMAT, chunk_size);
}
window_ack_size = self->out_window_ack_size_pending;
if (window_ack_size) {
- self->out_window_ack_size = window_ack_size;
self->out_window_ack_size_pending = 0;
+
+ g_mutex_lock (&self->stats_lock);
+ self->out_window_ack_size = window_ack_size;
+ g_mutex_unlock (&self->stats_lock);
+
GST_INFO_OBJECT (self, "applied window ack size %" G_GUINT32_FORMAT,
window_ack_size);
}
return get_stats (NULL);
}
-static gboolean
-get_stats_invoker (gpointer ptr)
-{
- GstRtmpConnection *self = ptr;
- g_async_queue_push (self->stats_queue, get_stats (self));
- return G_SOURCE_REMOVE;
-}
-
GstStructure *
gst_rtmp_connection_get_stats (GstRtmpConnection * self)
{
g_return_val_if_fail (GST_IS_RTMP_CONNECTION (self), NULL);
- if (g_main_context_acquire (self->main_context)) {
- s = get_stats (self);
- g_main_context_release (self->main_context);
- } else {
- g_main_context_invoke_full (self->main_context, G_PRIORITY_HIGH,
- get_stats_invoker, g_object_ref (self), g_object_unref);
- s = g_async_queue_pop (self->stats_queue);
- }
+ g_mutex_lock (&self->stats_lock);
+ s = get_stats (self);
+ g_mutex_unlock (&self->stats_lock);
return s;
}