static void
+component_schedule_io_callback (Component *component);
+static void
+component_deschedule_io_callback (Component *component);
+
+
+/* Must *not* take the agent lock, since it’s called from within
+ * component_set_io_callback(), which holds the Component’s I/O lock. */
+static void
socket_source_detach (SocketSource *source)
{
if (source->source != NULL) {
g_slice_free (SocketSource, source);
}
-
Component *
component_new (guint id, NiceAgent *agent, Stream *stream)
{
component->agent = agent;
component->stream = stream;
+ g_mutex_init (&component->io_mutex);
+ g_queue_init (&component->pending_io_messages);
+ component->io_callback_id = 0;
+
+ /* Start off with a fresh main context and all I/O paused. This
+ * will be updated when nice_agent_attach_recv() or nice_agent_recv() are
+ * called. */
+ component_set_io_context (component, NULL);
+ component_set_io_callback (component, NULL, NULL);
+
return component;
}
{
GSList *i;
GList *item;
+ IOCallbackData *data;
for (i = cmp->local_candidates; i; i = i->next) {
NiceCandidate *candidate = i->data;
cmp->tcp = NULL;
}
+ while ((data = g_queue_pop_head (&cmp->pending_io_messages)) != NULL)
+ io_callback_data_free (data);
+
+ component_deschedule_io_callback (cmp);
+
if (cmp->ctx != NULL) {
g_main_context_unref (cmp->ctx);
cmp->ctx = NULL;
}
+ g_mutex_clear (&cmp->io_mutex);
+
g_slice_free (Component, cmp);
}
/*
* Detaches socket handles of @component from the main context. Leaves the
* sockets themselves untouched.
+ *
+ * Must *not* take the agent lock, since it’s called from within
+ * component_set_io_callback(), which holds the Component’s I/O lock.
*/
void
component_detach_socket_sources (Component *component)
component_set_io_callback (Component *component, NiceAgentRecvFunc func,
gpointer user_data, GMainContext *context)
{
+ g_mutex_lock (&component->io_mutex);
+
/* Reference the context early so we don’t accidentally free it below. */
if (context != NULL && func != NULL)
g_main_context_ref (context);
component->io_callback = func;
component->io_user_data = user_data;
component->ctx = context; /* referenced above */
+
+ component_schedule_io_callback (component);
+ } else {
+ component_deschedule_io_callback (component);
}
+
+ g_mutex_unlock (&component->io_mutex);
+}
+
+gboolean
+component_has_io_callback (Component *component)
+{
+ gboolean has_io_callback;
+
+ g_mutex_lock (&component->io_mutex);
+ has_io_callback = (component->io_callback != NULL);
+ g_mutex_unlock (&component->io_mutex);
+
+ return has_io_callback;
+}
+
+IOCallbackData *
+io_callback_data_new (const guint8 *buf, gsize buf_len)
+{
+ IOCallbackData *data;
+
+ data = g_slice_new0 (IOCallbackData);
+ data->buf = g_memdup (buf, buf_len);
+ data->buf_len = buf_len;
+ data->offset = 0;
+
+ return data;
+}
+
+void
+io_callback_data_free (IOCallbackData *data)
+{
+ g_free (data->buf);
+ g_slice_free (IOCallbackData, data);
+}
+
+/* This is called with the global agent lock released. It does not take that
+ * lock, but does take the io_mutex. */
+static gboolean
+emit_io_callback_cb (gpointer user_data)
+{
+ Component *component = user_data;
+ IOCallbackData *data;
+ NiceAgentRecvFunc io_callback;
+ gpointer io_user_data;
+ guint stream_id, component_id;
+ NiceAgent *agent;
+
+ agent = component->agent;
+ g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
+
+ stream_id = component->stream->id;
+ component_id = component->id;
+
+ g_mutex_lock (&component->io_mutex);
+
+ /* The members of Component are guaranteed not to have changed since this
+ * GSource was attached in component_emit_io_callback(). The Component’s agent
+ * and stream are immutable after construction, as are the stream and
+ * component IDs. The callback and its user data may have changed, but are
+ * guaranteed to be non-%NULL at the start as the idle source is removed when
+ * the callback is set to %NULL. They may become %NULL during the io_callback,
+ * so must be re-checked every loop iteration. The data buffer is copied into
+ * the #IOCallbackData closure.
+ *
+ * If the component is destroyed (which happens if the agent or stream are
+ * destroyed) between attaching the GSource and firing it, the GSource is
+ * detached in component_free() and this callback is never invoked. If the
+ * agent is destroyed during an io_callback, its weak pointer will be
+ * nullified. Similarly, the Component needs to be re-queried for after every
+ * iteration, just in case the client has removed the stream in the
+ * callback. */
+ while (TRUE) {
+ io_callback = component->io_callback;
+ io_user_data = component->io_user_data;
+ data = g_queue_peek_head (&component->pending_io_messages);
+
+ if (data == NULL || io_callback == NULL)
+ break;
+
+ g_mutex_unlock (&component->io_mutex);
+
+ io_callback (agent, stream_id, component_id,
+ data->buf_len - data->offset, (gchar *) data->buf + data->offset,
+ io_user_data);
+
+ /* Check for the user destroying things underneath our feet. */
+ if (agent == NULL ||
+ !agent_find_component (agent, stream_id, component_id,
+ NULL, &component)) {
+ nice_debug ("%s: Agent or component destroyed.", G_STRFUNC);
+ return G_SOURCE_REMOVE;
+ }
+
+ g_queue_pop_head (&component->pending_io_messages);
+ io_callback_data_free (data);
+
+ g_mutex_lock (&component->io_mutex);
+ }
+
+ component->io_callback_id = 0;
+ g_mutex_unlock (&component->io_mutex);
+
+ g_object_remove_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
+
+ return G_SOURCE_REMOVE;
}
/* This must be called with the agent lock *held*. */
agent = component->agent;
stream_id = component->stream->id;
component_id = component->id;
+
+ g_mutex_lock (&component->io_mutex);
io_callback = component->io_callback;
io_user_data = component->io_user_data;
+ g_mutex_unlock (&component->io_mutex);
+
+ /* Allow this to be called with a NULL io_callback, since the caller can’t
+ * lock io_mutex to check beforehand. */
+ if (io_callback == NULL)
+ return;
g_assert (NICE_IS_AGENT (agent));
g_assert (stream_id > 0);
g_assert (component_id > 0);
g_assert (io_callback != NULL);
- agent_unlock ();
+ /* Only allocate a closure if the callback is being deferred to an idle
+ * handler. */
+ if (g_main_context_is_owner (component->ctx)) {
+ /* Thread owns the main context, so invoke the callback directly. */
+ agent_unlock ();
+ io_callback (agent, stream_id,
+ component_id, buf_len, (gchar *) buf, io_user_data);
+ agent_lock ();
+ } else {
+ IOCallbackData *data;
+
+ g_mutex_lock (&component->io_mutex);
+
+ /* Slow path: Current thread doesn’t own the Component’s context at the
+ * moment, so schedule the callback in an idle handler. */
+ data = io_callback_data_new (buf, buf_len);
+ g_queue_push_tail (&component->pending_io_messages,
+ data); /* transfer ownership */
+
+ nice_debug ("%s: **WARNING: SLOW PATH**", G_STRFUNC);
+
+ component_schedule_io_callback (component);
+
+ g_mutex_unlock (&component->io_mutex);
+ }
+}
+
+/* Note: Must be called with the io_mutex held. */
+static void
+component_schedule_io_callback (Component *component)
+{
+ GSource *source;
- io_callback (agent, stream_id, component_id,
- buf_len, (gchar *) buf, io_user_data);
+ /* Already scheduled or nothing to schedule? */
+ if (component->io_callback_id != 0 ||
+ g_queue_is_empty (&component->pending_io_messages))
+ return;
+
+ /* Add the idle callback. If nice_agent_attach_recv() is called with a
+ * NULL callback before this source is dispatched, the source will be
+ * destroyed, but any pending data will remain in
+ * component->pending_io_messages, ready to be picked up when a callback
+ * is re-attached, or if nice_agent_recv() is called. */
+ source = g_idle_source_new ();
+ g_source_set_priority (source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (source, emit_io_callback_cb, component, NULL);
+ component->io_callback_id = g_source_attach (source, component->ctx);
+ g_source_unref (source);
+}
+
+/* Note: Must be called with the io_mutex held. */
+static void
+component_deschedule_io_callback (Component *component)
+{
+ /* Already descheduled? */
+ if (component->io_callback_id == 0)
+ return;
- agent_lock ();
+ g_source_remove (component->io_callback_id);
+ component->io_callback_id = 0;
}
GSource *source;
} SocketSource;
+
+/* A buffer of data which has been received and processed (so is guaranteed not
+ * to be a STUN packet, or to contain pseudo-TCP header bytes, for example), but
+ * which hasn’t yet been sent to the client in an I/O callback. This could be
+ * due to the main context not being run, or due to the I/O callback being
+ * detached.
+ *
+ * The @offset member gives the byte offset into @buf which has already been
+ * sent to the client. #IOCallbackData buffers remain in the
+ * #Component::pending_io_messages queue until all of their bytes have been sent
+ * to the client.
+ *
+ * @offset is guaranteed to be smaller than @buf_len. */
+typedef struct {
+ guint8 *buf; /* owned */
+ gsize buf_len;
+ gsize offset;
+} IOCallbackData;
+
+IOCallbackData *
+io_callback_data_new (const guint8 *buf, gsize buf_len);
+void
+io_callback_data_free (IOCallbackData *data);
+
+
struct _Component
{
NiceComponentType type;
see ICE 11.1. "Sending Media" (ID-19) */
NiceCandidate *restart_candidate; /**< for storing active remote candidate during a restart */
+ GMutex io_mutex; /**< protects io_callback, io_user_data,
+ pending_io_messages and io_callback_id.
+ immutable: can be accessed without
+ holding the agent lock; if the agent
+ lock is to be taken, it must always be
+ taken before this one */
NiceAgentRecvFunc io_callback; /**< function called on io cb */
gpointer io_user_data; /**< data passed to the io function */
GMainContext *ctx; /**< context for GSources for this
component */
-
- NiceAgent *agent; /* unowned */
- Stream *stream; /* unowned */
+ GQueue pending_io_messages; /**< queue of packets which have been
+ received but not passed to the client
+ in an I/O callback or recv() call yet.
+ each element is an owned
+ IOCallbackData */
+ guint io_callback_id; /* GSource ID of the I/O callback */
+
+ NiceAgent *agent; /* unowned, immutable: can be accessed without holding the
+ * agent lock */
+ Stream *stream; /* unowned, immutable: can be accessed without holding the
+ * agent lock */
PseudoTcpSocket *tcp;
GSource* tcp_clock;
component_emit_io_callback (Component *component,
const guint8 *buf, gsize buf_len);
+gboolean
+component_has_io_callback (Component *component);
+
G_END_DECLS
#endif /* _NICE_COMPONENT_H */