agent: Delay signal emission after the lock has been released
authorOlivier Crête <olivier.crete@collabora.com>
Mon, 24 Feb 2014 23:50:59 +0000 (18:50 -0500)
committerOlivier Crête <olivier.crete@collabora.com>
Mon, 24 Feb 2014 23:56:42 +0000 (18:56 -0500)
This way, there can be no annoying re-entrancy in our code.

agent/agent-priv.h
agent/agent.c
agent/component.c
agent/conncheck.c
agent/discovery.c

index 1745c12..37e908c 100644 (file)
@@ -158,6 +158,8 @@ struct _NiceAgent
 #endif
   gchar *software_attribute;       /* SOFTWARE attribute */
   gboolean reliable;               /* property: reliable */
+
+  GQueue pending_signals;
   /* XXX: add pointer to internal data struct for ABI-safe extensions */
 };
 
@@ -176,6 +178,7 @@ void agent_signal_gathering_done (NiceAgent *agent);
 
 void agent_lock (void);
 void agent_unlock (void);
+void agent_unlock_and_emit (NiceAgent *agent);
 
 void agent_signal_new_selected_pair (
   NiceAgent *agent,
index 2a0e429..021f0e7 100644 (file)
@@ -46,6 +46,7 @@
 #endif
 
 #include <glib.h>
+#include <gobject/gvaluecollector.h>
 
 #include <string.h>
 #include <errno.h>
@@ -157,6 +158,83 @@ void agent_unlock(void)
 
 #endif
 
+typedef struct {
+  guint signal_id;
+  GSignalQuery query;
+  GValue *params;
+} QueuedSignal;
+
+
+static void
+free_queued_signal (QueuedSignal *sig)
+{
+  guint i;
+
+  for (i = 0; i < sig->query.n_params; i++) {
+    if (G_VALUE_HOLDS_POINTER (&sig->params[i]))
+      g_free (g_value_get_pointer (&sig->params[i]));
+    g_value_unset (&sig->params[i]);
+  }
+
+  g_slice_free1 (sizeof(GValue) * (sig->query.n_params + 1), sig->params);
+  g_slice_free (QueuedSignal, sig);
+}
+
+void
+agent_unlock_and_emit (NiceAgent *agent)
+{
+  GQueue queue = G_QUEUE_INIT;
+  QueuedSignal *sig;
+
+  queue = agent->pending_signals;
+  g_queue_init (&agent->pending_signals);
+
+  agent_unlock ();
+
+  while ((sig = g_queue_pop_head (&queue))) {
+    g_signal_emitv (sig->params, sig->signal_id, 0, NULL);
+
+    free_queued_signal (sig);
+  }
+}
+
+static void
+agent_queue_signal (NiceAgent *agent, guint signal_id, ...)
+{
+  QueuedSignal *sig;
+  guint i;
+  gchar *error = NULL;
+  va_list var_args;
+
+  sig = g_slice_new (QueuedSignal);
+  g_signal_query (signal_id, &sig->query);
+
+  sig->signal_id = signal_id;
+  sig->params = g_slice_alloc0 (sizeof(GValue) * (sig->query.n_params + 1));
+
+  g_value_init (&sig->params[0], G_TYPE_OBJECT);
+  g_value_set_object (&sig->params[0], agent);
+
+  va_start (var_args, signal_id);
+  for (i = 0; i < sig->query.n_params; i++) {
+    G_VALUE_COLLECT_INIT (&sig->params[i + 1], sig->query.param_types[i],
+        var_args, 0, &error);
+    if (error)
+      break;
+  }
+  va_end (var_args);
+
+  if (error) {
+    free_queued_signal (sig);
+    g_critical ("Error collecting values for signal: %s", error);
+    g_free (error);
+    return;
+  }
+
+  g_queue_push_tail (&agent->pending_signals, sig);
+}
+
+
 StunUsageIceCompatibility
 agent_to_ice_compatibility (NiceAgent *agent)
 {
@@ -735,6 +813,8 @@ nice_agent_init (NiceAgent *agent)
 
   agent->rng = nice_rng_new ();
   priv_generate_tie_breaker (agent);
+
+  g_queue_init (&agent->pending_signals);
 }
 
 
@@ -854,7 +934,7 @@ nice_agent_get_property (
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
     }
 
-  agent_unlock();
+  agent_unlock_and_emit(agent);
 }
 
 
@@ -984,7 +1064,7 @@ nice_agent_set_property (
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
     }
 
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 
 }
 
@@ -1025,7 +1105,8 @@ pseudo_tcp_socket_opened (PseudoTcpSocket *sock, gpointer user_data)
   nice_debug ("Agent %p: s%d:%d pseudo Tcp socket Opened", agent,
       stream->id, component->id);
   g_cancellable_cancel (component->tcp_writable_cancellable);
-  g_signal_emit (agent, signals[SIGNAL_RELIABLE_TRANSPORT_WRITABLE], 0,
+
+  agent_queue_signal (agent, signals[SIGNAL_RELIABLE_TRANSPORT_WRITABLE],
       stream->id, component->id);
 }
 
@@ -1283,7 +1364,7 @@ pseudo_tcp_socket_writable (PseudoTcpSocket *sock, gpointer user_data)
   nice_debug ("Agent %p: s%d:%d pseudo Tcp socket writable", agent,
       stream->id, component->id);
   g_cancellable_cancel (component->tcp_writable_cancellable);
-  g_signal_emit (agent, signals[SIGNAL_RELIABLE_TRANSPORT_WRITABLE], 0,
+  agent_queue_signal (agent, signals[SIGNAL_RELIABLE_TRANSPORT_WRITABLE],
       stream->id, component->id);
 }
 
@@ -1356,7 +1437,7 @@ notify_pseudo_tcp_socket_clock (gpointer user_data)
   pseudo_tcp_socket_notify_clock (component->tcp);
   adjust_tcp_clock (agent, stream, component);
 
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 
   return G_SOURCE_CONTINUE;
 }
@@ -1456,7 +1537,8 @@ void agent_signal_gathering_done (NiceAgent *agent)
     Stream *stream = i->data;
     if (stream->gathering) {
       stream->gathering = FALSE;
-      g_signal_emit (agent, signals[SIGNAL_CANDIDATE_GATHERING_DONE], 0, stream->id);
+      agent_queue_signal (agent, signals[SIGNAL_CANDIDATE_GATHERING_DONE],
+          stream->id);
     }
   }
 }
@@ -1465,7 +1547,8 @@ void agent_signal_initial_binding_request_received (NiceAgent *agent, Stream *st
 {
   if (stream->initial_binding_request_received != TRUE) {
     stream->initial_binding_request_received = TRUE;
-    g_signal_emit (agent, signals[SIGNAL_INITIAL_BINDING_REQUEST_RECEIVED], 0, stream->id);
+    agent_queue_signal (agent, signals[SIGNAL_INITIAL_BINDING_REQUEST_RECEIVED],
+        stream->id);
   }
 }
 
@@ -1551,7 +1634,7 @@ void agent_signal_new_selected_pair (NiceAgent *agent, guint stream_id, guint co
   lf_copy = g_strdup (local_foundation);
   rf_copy = g_strdup (remote_foundation);
 
-  g_signal_emit (agent, signals[SIGNAL_NEW_SELECTED_PAIR], 0,
+  agent_queue_signal (agent, signals[SIGNAL_NEW_SELECTED_PAIR],
       stream_id, component_id, lf_copy, rf_copy);
 
   g_free (lf_copy);
@@ -1560,18 +1643,14 @@ void agent_signal_new_selected_pair (NiceAgent *agent, guint stream_id, guint co
 
 void agent_signal_new_candidate (NiceAgent *agent, NiceCandidate *candidate)
 {
-  g_signal_emit (agent, signals[SIGNAL_NEW_CANDIDATE], 0,
-                candidate->stream_id,
-                candidate->component_id,
-                candidate->foundation);
+  agent_queue_signal (agent, signals[SIGNAL_NEW_CANDIDATE],
+      candidate->stream_id, candidate->component_id, candidate->foundation);
 }
 
 void agent_signal_new_remote_candidate (NiceAgent *agent, NiceCandidate *candidate)
 {
-  g_signal_emit (agent, signals[SIGNAL_NEW_REMOTE_CANDIDATE], 0, 
-                candidate->stream_id, 
-                candidate->component_id, 
-                candidate->foundation);
+  agent_queue_signal (agent, signals[SIGNAL_NEW_REMOTE_CANDIDATE],
+      candidate->stream_id, candidate->component_id, candidate->foundation);
 }
 
 static const gchar *
@@ -1623,8 +1702,8 @@ void agent_signal_component_state_change (NiceAgent *agent, guint stream_id, gui
 
     process_queued_tcp_packets (agent, stream, component);
 
-    g_signal_emit (agent, signals[SIGNAL_COMPONENT_STATE_CHANGED], 0,
-                  stream_id, component_id, state);
+    agent_queue_signal (agent, signals[SIGNAL_COMPONENT_STATE_CHANGED],
+        stream_id, component_id, state);
   }
 }
 
@@ -1820,7 +1899,7 @@ nice_agent_add_stream (
 
   ret = stream->id;
 
-  agent_unlock();
+  agent_unlock_and_emit (agent);
   return ret;
 }
 
@@ -1851,7 +1930,7 @@ nice_agent_set_relay_info(NiceAgent *agent,
       nice_address_set_port (&turn->server, server_port);
     } else {
       g_slice_free (TurnServer, turn);
-      agent_unlock();
+      agent_unlock_and_emit (agent);
       return FALSE;
     }
 
@@ -1866,7 +1945,7 @@ nice_agent_set_relay_info(NiceAgent *agent,
     component->turn_servers = g_list_append (component->turn_servers, turn);
   }
 
-  agent_unlock();
+  agent_unlock_and_emit (agent);
   return TRUE;
 }
 
@@ -1901,7 +1980,7 @@ static gboolean priv_upnp_timeout_cb (gpointer user_data)
 
   agent_gathering_done (agent);
 
-  agent_unlock();
+  agent_unlock_and_emit (agent);
   return FALSE;
 }
 
@@ -1967,7 +2046,7 @@ static void _upnp_mapped_external_port (GUPnPSimpleIgd *self, gchar *proto,
     agent_gathering_done (agent);
   }
 
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 }
 
 static void _upnp_error_mapping_port (GUPnPSimpleIgd *self, GError *error,
@@ -2004,7 +2083,7 @@ static void _upnp_error_mapping_port (GUPnPSimpleIgd *self, GError *error,
     }
   }
 
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 }
 
 #endif
@@ -2024,7 +2103,7 @@ nice_agent_gather_candidates (
 
   stream = agent_find_stream (agent, stream_id);
   if (stream == NULL) {
-    agent_unlock();
+    agent_unlock_and_emit (agent);
     return FALSE;
   }
 
@@ -2229,7 +2308,7 @@ nice_agent_gather_candidates (
     discovery_prune_stream (agent, stream_id);
   }
 
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 
   return ret;
 }
@@ -2283,7 +2362,7 @@ nice_agent_remove_stream (
   stream = agent_find_stream (agent, stream_id);
 
   if (!stream) {
-    agent_unlock ();
+    agent_unlock_and_emit (agent);
     return;
   }
 
@@ -2299,10 +2378,9 @@ nice_agent_remove_stream (
   if (!agent->streams)
     priv_remove_keepalive_timer (agent);
 
-  agent_unlock ();
-
-  g_signal_emit (agent, signals[SIGNAL_STREAMS_REMOVED], 0, stream_ids);
+  agent_queue_signal (agent, signals[SIGNAL_STREAMS_REMOVED], stream_ids);
 
+  agent_unlock_and_emit (agent);
   return;
 }
 
@@ -2319,7 +2397,7 @@ nice_agent_set_port_range (NiceAgent *agent, guint stream_id, guint component_id
     component->max_port = max_port;
   }
 
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 }
 
 NICEAPI_EXPORT gboolean
@@ -2333,7 +2411,7 @@ nice_agent_add_local_address (NiceAgent *agent, NiceAddress *addr)
   nice_address_set_port (dup, 0);
   agent->local_addresses = g_slist_append (agent->local_addresses, dup);
 
-  agent_unlock();
+  agent_unlock_and_emit (agent);
   return TRUE;
 }
 
@@ -2466,7 +2544,7 @@ nice_agent_set_remote_credentials (
   }
 
  done:
-  agent_unlock();
+  agent_unlock_and_emit (agent);
   return ret;
 }
 
@@ -2497,7 +2575,7 @@ nice_agent_get_local_credentials (
 
  done:
 
-  agent_unlock();
+  agent_unlock_and_emit (agent);
   return ret;
 }
 
@@ -2571,7 +2649,7 @@ nice_agent_set_remote_candidates (NiceAgent *agent, guint stream_id, guint compo
   added = _set_remote_candidates_locked (agent, stream, component, candidates);
 
  done:
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 
   return added;
 }
@@ -3204,10 +3282,18 @@ nice_agent_recv_messages_blocking_or_nonblocking (NiceAgent *agent,
     memcpy (&prev_recv_messages_iter, &component->recv_messages_iter,
         sizeof (NiceInputMessageIter));
 
-    agent_unlock ();
+
+    agent_unlock_and_emit (agent);
     g_main_context_iteration (context, blocking);
     agent_lock ();
 
+    if (!agent_find_component (agent, stream_id, component_id,
+            &stream, &component)) {
+      g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE,
+          "Component removed during call.");
+      goto done;
+    }
+
     received_enough =
         nice_input_message_iter_is_at_end (&component->recv_messages_iter,
             component->recv_messages, component->n_recv_messages);
@@ -3249,7 +3335,7 @@ done:
   if (child_error != NULL)
     g_propagate_error (error, child_error);
 
-  agent_unlock ();
+  agent_unlock_and_emit (agent);
 
   if (messages_orig) {
     for (i = 0; i < n_messages; i++) {
@@ -3429,7 +3515,7 @@ done:
   if (child_error != NULL)
     g_propagate_error (error, child_error);
 
-  agent_unlock ();
+  agent_unlock_and_emit (agent);
 
   return n_sent;
 }
@@ -3502,7 +3588,7 @@ nice_agent_get_local_candidates (
     ret = g_slist_append (ret, nice_candidate_copy (item->data));
 
  done:
-  agent_unlock();
+  agent_unlock_and_emit (agent);
   return ret;
 }
 
@@ -3526,7 +3612,7 @@ nice_agent_get_remote_candidates (
     ret = g_slist_append (ret, nice_candidate_copy (item->data));
 
  done:
-  agent_unlock();
+  agent_unlock_and_emit (agent);
   return ret;
 }
 
@@ -3554,7 +3640,7 @@ nice_agent_restart (
     res = stream_restart (stream, agent->rng);
   }
 
-  agent_unlock();
+  agent_unlock_and_emit (agent);
   return res;
 }
 
@@ -3791,7 +3877,7 @@ component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data)
 done:
   g_object_unref (agent);
 
-  agent_unlock ();
+  agent_unlock_and_emit (agent);
 
   return !remove_source;
 }
@@ -3840,7 +3926,7 @@ nice_agent_attach_recv (
   }
 
  done:
-  agent_unlock();
+  agent_unlock_and_emit (agent);
   return ret;
 }
 
@@ -3888,7 +3974,7 @@ nice_agent_set_selected_pair (
   ret = TRUE;
 
  done:
-  agent_unlock();
+  agent_unlock_and_emit (agent);
   return ret;
 }
 
@@ -3914,7 +4000,7 @@ nice_agent_get_selected_pair (NiceAgent *agent, guint stream_id,
   }
 
  done:
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 
   return ret;
 }
@@ -3946,7 +4032,7 @@ nice_agent_get_selected_socket (NiceAgent *agent, guint stream_id,
     g_socket = g_object_ref (nice_socket->fileno);
 
  done:
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 
   return g_socket;
 }
@@ -4013,7 +4099,7 @@ nice_agent_set_selected_remote_candidate (
   ret = TRUE;
 
  done:
-  agent_unlock();
+  agent_unlock_and_emit (agent);
   return ret;
 }
 
@@ -4060,7 +4146,7 @@ nice_agent_set_stream_tos (NiceAgent *agent,
   }
 
  done:
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 }
 
 NICEAPI_EXPORT void
@@ -4075,7 +4161,7 @@ nice_agent_set_software (NiceAgent *agent, const gchar *software)
 
   stun_agent_set_software (&agent->stun_agent, agent->software_attribute);
 
-  agent_unlock ();
+  agent_unlock_and_emit (agent);
 }
 
 NICEAPI_EXPORT gboolean
@@ -4109,7 +4195,7 @@ nice_agent_set_stream_name (NiceAgent *agent, guint stream_id,
   ret = TRUE;
 
  done:
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 
   return ret;
 }
@@ -4129,7 +4215,7 @@ nice_agent_get_stream_name (NiceAgent *agent, guint stream_id)
   name = stream->name;
 
  done:
-  agent_unlock();
+  agent_unlock_and_emit (agent);
   return name;
 }
 
@@ -4199,7 +4285,7 @@ nice_agent_get_default_local_candidate (NiceAgent *agent,
     default_candidate = nice_candidate_copy (default_candidate);
 
  done:
-  agent_unlock ();
+  agent_unlock_and_emit (agent);
 
   return default_candidate;
 }
@@ -4311,7 +4397,7 @@ nice_agent_generate_local_sdp (NiceAgent *agent)
     _generate_stream_sdp (agent, stream, sdp, TRUE);
   }
 
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 
   return g_string_free (sdp, FALSE);
 }
@@ -4335,7 +4421,7 @@ nice_agent_generate_local_stream_sdp (NiceAgent *agent, guint stream_id,
   ret = g_string_free (sdp, FALSE);
 
  done:
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 
   return ret;
 }
@@ -4353,7 +4439,7 @@ nice_agent_generate_local_candidate_sdp (NiceAgent *agent,
   sdp = g_string_new (NULL);
   _generate_candidate_sdp (agent, candidate, sdp);
 
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 
   return g_string_free (sdp, FALSE);
 }
@@ -4447,7 +4533,7 @@ nice_agent_parse_remote_sdp (NiceAgent *agent, const gchar *sdp)
   if (sdp_lines)
     g_strfreev(sdp_lines);
 
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 
   return ret;
 }
@@ -4492,7 +4578,7 @@ nice_agent_parse_remote_stream_sdp (NiceAgent *agent, guint stream_id,
   if (sdp_lines)
     g_strfreev(sdp_lines);
 
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 
   return candidates;
 }
@@ -4625,7 +4711,7 @@ nice_agent_get_io_stream (NiceAgent *agent, guint stream_id,
   iostream = g_object_ref (component->iostream);
 
  done:
-  agent_unlock ();
+  agent_unlock_and_emit (agent);
 
   return iostream;
 }
index 29c134d..1ddd586 100644 (file)
@@ -752,7 +752,7 @@ component_emit_io_callback (Component *component,
    * handler. */
   if (g_main_context_is_owner (component->ctx)) {
     /* Thread owns the main context, so invoke the callback directly. */
-    agent_unlock ();
+    agent_unlock_and_emit (agent);
     io_callback (agent, stream_id,
         component_id, buf_len, (gchar *) buf, io_user_data);
     agent_lock ();
@@ -907,7 +907,7 @@ component_source_prepare (GSource *source, gint *timeout_)
 
  done:
 
-  agent_unlock ();
+  agent_unlock_and_emit (agent);
 
   /* We can’t be sure if the ComponentSource itself needs to be dispatched until
    * poll() is called on all the child sources. */
index 1ec0976..8577941 100644 (file)
@@ -388,10 +388,9 @@ static gboolean priv_conn_check_tick_stream (Stream *stream, NiceAgent *agent, G
  *
  * @return will return FALSE when no more pending timers.
  */
-static gboolean priv_conn_check_tick_unlocked (gpointer pointer)
+static gboolean priv_conn_check_tick_unlocked (NiceAgent *agent)
 {
   CandidateCheckPair *pair = NULL;
-  NiceAgent *agent = pointer;
   gboolean keep_timer_going = FALSE;
   GSList *i, *j;
   GTimeVal now;
@@ -454,6 +453,7 @@ static gboolean priv_conn_check_tick_unlocked (gpointer pointer)
 static gboolean priv_conn_check_tick (gpointer pointer)
 {
   gboolean ret;
+  NiceAgent *agent = pointer;
 
   agent_lock();
   if (g_source_is_destroyed (g_main_current_source ())) {
@@ -462,8 +462,9 @@ static gboolean priv_conn_check_tick (gpointer pointer)
     agent_unlock ();
     return FALSE;
   }
-  ret = priv_conn_check_tick_unlocked (pointer);
-  agent_unlock();
+
+  ret = priv_conn_check_tick_unlocked (agent);
+  agent_unlock_and_emit (agent);
 
   return ret;
 }
@@ -539,7 +540,7 @@ static gboolean priv_conn_keepalive_retransmissions_tick (gpointer pointer)
   }
 
 
-  agent_unlock ();
+  agent_unlock_and_emit (pair->keepalive.agent);
   return FALSE;
 }
 
@@ -723,7 +724,7 @@ static gboolean priv_conn_keepalive_tick (gpointer pointer)
       agent->keepalive_timer_source = NULL;
     }
   }
-  agent_unlock();
+  agent_unlock_and_emit (agent);
   return ret;
 }
 
@@ -782,7 +783,7 @@ static gboolean priv_turn_allocate_refresh_retransmissions_tick (gpointer pointe
   }
 
 
-  agent_unlock ();
+  agent_unlock_and_emit (cand->agent);
   return FALSE;
 }
 
@@ -867,7 +868,7 @@ static gboolean priv_turn_allocate_refresh_tick (gpointer pointer)
   }
 
   priv_turn_allocate_refresh_tick_unlocked (cand);
-  agent_unlock ();
+  agent_unlock_and_emit (cand->agent);
 
   return FALSE;
 }
@@ -887,7 +888,7 @@ gboolean conn_check_schedule_next (NiceAgent *agent)
     nice_debug ("Agent %p : WARN: starting conn checks before local candidate gathering is finished.", agent);
 
   /* step: call once imediately */
-  res = priv_conn_check_tick_unlocked ((gpointer) agent);
+  res = priv_conn_check_tick_unlocked (agent);
   nice_debug ("Agent %p : priv_conn_check_tick_unlocked returned %d", agent, res);
 
   /* step: schedule timer if not running yet */
index d4c2ab2..049aa56 100644 (file)
@@ -1029,7 +1029,7 @@ static gboolean priv_discovery_tick (gpointer pointer)
       agent->discovery_timer_source = NULL;
     }
   }
-  agent_unlock();
+  agent_unlock_and_emit (agent);
 
   return ret;
 }
@@ -1045,7 +1045,7 @@ void discovery_schedule (NiceAgent *agent)
   g_assert (agent->discovery_list != NULL);
 
   if (agent->discovery_unsched_items > 0) {
-    
+
     if (agent->discovery_timer_source == NULL) {
       /* step: run first iteration immediately */
       gboolean res = priv_discovery_tick_unlocked (agent);