gnutls: handle simultaneous ops, do handshaking in a thread
authorDan Winship <danw@gnome.org>
Mon, 2 Jul 2012 16:58:02 +0000 (12:58 -0400)
committerDan Winship <danw@gnome.org>
Wed, 18 Jul 2012 21:46:11 +0000 (17:46 -0400)
Keep separate internal read/write states so that you can do
simultaneous reads/writes (either simultaneous async ops in the same
thread, or simultaneous sync ops in different threads). Add
locking/blocking so that this works correctly even in the presence of
rehandshakes (and add test cases for this).

As part of this, change handshaking so that the I/O part of it always
happens in a separate thread, which has three advantages:

  1. It simplifies GTlsConnectionGnutlsSource by making it not have to
     worry about flipping back and forth between read and write.

  2. (Future) It will let the caller asynchronously handle
     certificate-related functionality that is implemented via
     synchronous callbacks in gnutls.

  3. (Future) We can use g_task_set_return_on_cancel() to allow
     cancellation even during uncancellable PKCS#11 operations.

https://bugzilla.gnome.org/show_bug.cgi?id=656343
https://bugzilla.gnome.org/show_bug.cgi?id=660252

tls/gnutls/gtlsconnection-gnutls.c
tls/tests/connection.c

index 46db1c7..33cc9ac 100644 (file)
@@ -60,6 +60,14 @@ static P11KitPin*    on_pin_prompt_callback  (const char     *pinfile,
 
 static void g_tls_connection_gnutls_init_priorities (void);
 
+static gboolean do_implicit_handshake (GTlsConnectionGnutls  *gnutls,
+                                      gboolean               blocking,
+                                      GCancellable          *cancellable,
+                                      GError               **error);
+static gboolean finish_handshake (GTlsConnectionGnutls  *gnutls,
+                                 GSimpleAsyncResult    *thread_result,
+                                 GError               **error);
+
 G_DEFINE_ABSTRACT_TYPE_WITH_CODE (GTlsConnectionGnutls, g_tls_connection_gnutls, G_TYPE_TLS_CONNECTION,
                                  G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE,
                                                         g_tls_connection_gnutls_initable_iface_init);
@@ -97,9 +105,34 @@ struct _GTlsConnectionGnutlsPrivate
   gboolean is_system_certdb;
   GTlsDatabase *database;
   gboolean database_is_unset;
-  gboolean need_handshake, handshaking, ever_handshaked;
+
+  /* need_handshake means the next claim_op() will get diverted into
+   * an implicit handshake (unless it's an OP_HANDSHAKE itself).
+   * need_finish_handshake means the next claim_op() will get
+   * diverted into finish_handshake().
+   *
+   * handshaking is TRUE as soon as a handshake thread is queued.
+   * Normally it becomes FALSE after finish_handshake() completes. For
+   * an implicit handshake, but in the case of an async implicit
+   * handshake, it becomes FALSE at the end of handshake_thread(),
+   * (and then the next read/write op will call finish_handshake()).
+   * This is because we don't want to call finish_handshake() (and
+   * possibly emit signals) if the caller is not actually in a TLS op
+   * at the time. (Eg, if they're waiting to try a nonblocking call
+   * again, we don't want to emit the signal until they do.)
+   *
+   * started_handshake indicates that the current handshake attempt
+   * got at least as far as calling gnutls_handshake() (and so any
+   * error should be copied to handshake_error and returned on all
+   * future operations). ever_handshaked indicates that TLS has
+   * been successfully negotiated at some point.
+   */
+  gboolean need_handshake, need_finish_handshake;
+  gboolean started_handshake, handshaking, ever_handshaked;
+  GSimpleAsyncResult *implicit_handshake;
   GError *handshake_error;
-  gboolean closing;
+
+  gboolean closing, closed;
 
   GInputStream *tls_istream;
   GOutputStream *tls_ostream;
@@ -107,13 +140,22 @@ struct _GTlsConnectionGnutlsPrivate
   GTlsInteraction *interaction;
   gchar *interaction_id;
 
-  GError *error;
-  GCancellable *cancellable;
-  gboolean blocking;
+  GMutex        op_mutex;
+  GCancellable *waiting_for_op;
+
+  gboolean      reading;
+  gboolean      read_blocking;
+  GError       *read_error;
+  GCancellable *read_cancellable;
+
+  gboolean      writing;
+  gboolean      write_blocking;
+  GError       *write_error;
+  GCancellable *write_cancellable;
+
 #ifndef GNUTLS_E_PREMATURE_TERMINATION
   gboolean eof;
 #endif
-  GIOCondition internal_direction;
 };
 
 static gint unique_interaction_id = 0;
@@ -141,6 +183,9 @@ g_tls_connection_gnutls_init (GTlsConnectionGnutls *gnutls)
   p11_kit_pin_register_callback (gnutls->priv->interaction_id,
                                  on_pin_prompt_callback, gnutls, NULL);
 #endif
+
+  gnutls->priv->waiting_for_op = g_cancellable_new ();
+  g_cancellable_cancel (gnutls->priv->waiting_for_op);
 }
 
 static gnutls_priority_t priorities[2][2];
@@ -248,8 +293,11 @@ g_tls_connection_gnutls_finalize (GObject *object)
   g_free (gnutls->priv->interaction_id);
   g_clear_object (&gnutls->priv->interaction);
 
-  g_clear_error (&gnutls->priv->error);
   g_clear_error (&gnutls->priv->handshake_error);
+  g_clear_error (&gnutls->priv->read_error);
+  g_clear_error (&gnutls->priv->write_error);
+
+  g_clear_object (&gnutls->priv->waiting_for_op);
 
   G_OBJECT_CLASS (g_tls_connection_gnutls_parent_class)->finalize (object);
 }
@@ -434,57 +482,226 @@ g_tls_connection_gnutls_get_certificate (GTlsConnectionGnutls *gnutls,
                                      gnutls->priv->interaction_id, st);
 }
 
+typedef enum {
+  G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE,
+  G_TLS_CONNECTION_GNUTLS_OP_READ,
+  G_TLS_CONNECTION_GNUTLS_OP_WRITE,
+  G_TLS_CONNECTION_GNUTLS_OP_CLOSE,
+} GTlsConnectionGnutlsOp;
+
+static gboolean
+claim_op (GTlsConnectionGnutls    *gnutls,
+         GTlsConnectionGnutlsOp   op,
+         gboolean                 blocking,
+         GCancellable            *cancellable,
+         GError                 **error)
+{
+ try_again:
+  if (g_cancellable_set_error_if_cancelled (cancellable, error))
+    return FALSE;
+
+  g_mutex_lock (&gnutls->priv->op_mutex);
+
+  if (gnutls->priv->closing || gnutls->priv->closed)
+    {
+      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+                          _("Connection is closed"));
+      g_mutex_unlock (&gnutls->priv->op_mutex);
+      return FALSE;
+    }
+
+  if (gnutls->priv->handshake_error && op != G_TLS_CONNECTION_GNUTLS_OP_CLOSE)
+    {
+      if (error)
+       *error = g_error_copy (gnutls->priv->handshake_error);
+      g_mutex_unlock (&gnutls->priv->op_mutex);
+      return FALSE;
+    }
+
+  if (op != G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE)
+    {
+      if (gnutls->priv->need_handshake)
+       {
+         gnutls->priv->need_handshake = FALSE;
+         gnutls->priv->handshaking = TRUE;
+         if (!do_implicit_handshake (gnutls, blocking, cancellable, error))
+           {
+             g_mutex_unlock (&gnutls->priv->op_mutex);
+             return FALSE;
+           }
+       }
+
+      if (gnutls->priv->need_finish_handshake)
+       {
+         gboolean success;
+
+         gnutls->priv->need_finish_handshake = FALSE;
+
+         g_mutex_unlock (&gnutls->priv->op_mutex);
+         success = finish_handshake (gnutls, gnutls->priv->implicit_handshake, error);
+         g_clear_object (&gnutls->priv->implicit_handshake);
+         g_mutex_lock (&gnutls->priv->op_mutex);
+
+         gnutls->priv->handshaking = FALSE;
+         if (!success || g_cancellable_set_error_if_cancelled (cancellable, error))
+           {
+             g_mutex_unlock (&gnutls->priv->op_mutex);
+             return FALSE;
+           }
+       }
+    }
+
+  if ((op != G_TLS_CONNECTION_GNUTLS_OP_WRITE && gnutls->priv->reading) ||
+      (op != G_TLS_CONNECTION_GNUTLS_OP_READ && gnutls->priv->writing) ||
+      (op != G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE && gnutls->priv->handshaking))
+    {
+      GPollFD fds[2];
+      int nfds;
+
+      g_cancellable_reset (gnutls->priv->waiting_for_op);
+
+      g_mutex_unlock (&gnutls->priv->op_mutex);
+
+      if (!blocking)
+       {
+         g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+                              _("Operation would block"));
+         return FALSE;
+       }
+
+      g_cancellable_make_pollfd (gnutls->priv->waiting_for_op, &fds[0]);
+      if (g_cancellable_make_pollfd (cancellable, &fds[0]))
+       nfds = 2;
+      else
+       nfds = 1;
+      g_poll (fds, nfds, -1);
+      g_cancellable_release_fd (cancellable);
+
+      goto try_again;
+    }
+
+  if (op == G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE)
+    {
+      gnutls->priv->handshaking = TRUE;
+      gnutls->priv->need_handshake = FALSE;
+    }
+  if (op == G_TLS_CONNECTION_GNUTLS_OP_CLOSE)
+    gnutls->priv->closing = TRUE;
+
+  if (op != G_TLS_CONNECTION_GNUTLS_OP_WRITE)
+    gnutls->priv->reading = TRUE;
+  if (op != G_TLS_CONNECTION_GNUTLS_OP_READ)
+    gnutls->priv->writing = TRUE;
+
+  g_mutex_unlock (&gnutls->priv->op_mutex);
+  return TRUE;
+}
+
+static void
+yield_op (GTlsConnectionGnutls   *gnutls,
+         GTlsConnectionGnutlsOp  op)
+{
+  g_mutex_lock (&gnutls->priv->op_mutex);
+
+  if (op == G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE)
+    gnutls->priv->handshaking = FALSE;
+  if (op == G_TLS_CONNECTION_GNUTLS_OP_CLOSE)
+    gnutls->priv->closing = FALSE;
+
+  if (op != G_TLS_CONNECTION_GNUTLS_OP_WRITE)
+    gnutls->priv->reading = FALSE;
+  if (op != G_TLS_CONNECTION_GNUTLS_OP_READ)
+    gnutls->priv->writing = FALSE;
+
+  g_cancellable_cancel (gnutls->priv->waiting_for_op);
+  g_mutex_unlock (&gnutls->priv->op_mutex);
+}
+
 static void
 begin_gnutls_io (GTlsConnectionGnutls  *gnutls,
+                GIOCondition           direction,
                 gboolean               blocking,
                 GCancellable          *cancellable)
 {
-  gnutls->priv->blocking = blocking;
-  gnutls->priv->cancellable = cancellable;
-  gnutls->priv->internal_direction = 0;
-  if (cancellable)
-    g_cancellable_push_current (cancellable);
-  g_clear_error (&gnutls->priv->error);
+  g_assert (direction & (G_IO_IN | G_IO_OUT));
+
+  if (direction & G_IO_IN)
+    {
+      gnutls->priv->read_blocking = blocking;
+      gnutls->priv->read_cancellable = cancellable;
+      g_clear_error (&gnutls->priv->read_error);
+    }
+
+  if (direction & G_IO_OUT)
+    {
+      gnutls->priv->write_blocking = blocking;
+      gnutls->priv->write_cancellable = cancellable;
+      g_clear_error (&gnutls->priv->write_error);
+    }
 }
 
 static int
 end_gnutls_io (GTlsConnectionGnutls  *gnutls,
+              GIOCondition           direction,
               int                    status,
+              const char            *errmsg,
               GError               **error)
 {
-  if (gnutls->priv->cancellable)
-    g_cancellable_pop_current (gnutls->priv->cancellable);
-  gnutls->priv->cancellable = NULL;
+  GError *my_error = NULL;
 
-  if (status >= 0)
+  g_assert (direction & (G_IO_IN | G_IO_OUT));
+  g_assert (!error || !*error);
+
+  if (status == GNUTLS_E_AGAIN ||
+      status == GNUTLS_E_WARNING_ALERT_RECEIVED)
+    return GNUTLS_E_AGAIN;
+
+  if (direction & G_IO_IN)
     {
-      g_clear_error (&gnutls->priv->error);
-      return status;
+      gnutls->priv->read_cancellable = NULL;
+      if (status < 0)
+       {
+         my_error = gnutls->priv->read_error;
+         gnutls->priv->read_error = NULL;
+       }
+      else
+       g_clear_error (&gnutls->priv->read_error);
+    }
+  if (direction & G_IO_OUT)
+    {
+      gnutls->priv->write_cancellable = NULL;
+      if (status < 0 && !my_error)
+       {
+         my_error = gnutls->priv->write_error;
+         gnutls->priv->write_error = NULL;
+       }
+      else
+       g_clear_error (&gnutls->priv->write_error);
     }
 
+  if (status >= 0)
+    return status;
+
   if (gnutls->priv->handshaking && !gnutls->priv->ever_handshaked)
     {
-      if (g_error_matches (gnutls->priv->error, G_IO_ERROR, G_IO_ERROR_FAILED) ||
+      if (g_error_matches (my_error, G_IO_ERROR, G_IO_ERROR_FAILED) ||
          status == GNUTLS_E_UNEXPECTED_PACKET_LENGTH ||
          status == GNUTLS_E_FATAL_ALERT_RECEIVED ||
          status == GNUTLS_E_DECRYPTION_FAILED ||
          status == GNUTLS_E_UNSUPPORTED_VERSION_PACKET)
        {
-         g_clear_error (&gnutls->priv->error);
+         g_clear_error (&my_error);
          g_set_error_literal (error, G_TLS_ERROR, G_TLS_ERROR_NOT_TLS,
                               _("Peer failed to perform TLS handshake"));
          return GNUTLS_E_PULL_ERROR;
        }
     }
 
-  if (gnutls->priv->error)
+  if (my_error)
     {
-      if (g_error_matches (gnutls->priv->error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-       status = GNUTLS_E_AGAIN;
-      else
+      if (!g_error_matches (my_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
        G_TLS_CONNECTION_GNUTLS_GET_CLASS (gnutls)->failed (gnutls);
-      g_propagate_error (error, gnutls->priv->error);
-      gnutls->priv->error = NULL;
+      g_propagate_error (error, my_error);
       return status;
     }
   else if (status == GNUTLS_E_REHANDSHAKE)
@@ -496,9 +713,17 @@ end_gnutls_io (GTlsConnectionGnutls  *gnutls,
          return GNUTLS_E_PULL_ERROR;
        }
 
-      gnutls->priv->need_handshake = TRUE;
+      g_mutex_lock (&gnutls->priv->op_mutex);
+      if (!gnutls->priv->handshaking)
+       gnutls->priv->need_handshake = TRUE;
+      g_mutex_unlock (&gnutls->priv->op_mutex);
       return status;
     }
+  else if (status == GNUTLS_E_GOT_APPLICATION_DATA)
+    {
+      if (gnutls->priv->handshaking && G_IS_TLS_SERVER_CONNECTION (gnutls))
+       return GNUTLS_E_AGAIN;
+    }
   else if (
 #ifdef GNUTLS_E_PREMATURE_TERMINATION
           status == GNUTLS_E_PREMATURE_TERMINATION
@@ -518,34 +743,34 @@ end_gnutls_io (GTlsConnectionGnutls  *gnutls,
        return 0;
     }
 
+  if (error)
+    {
+      g_set_error (error, G_TLS_ERROR, G_TLS_ERROR_MISC,
+                   errmsg, gnutls_strerror (status));
+    }
   return status;
 }
 
-#define BEGIN_GNUTLS_IO(gnutls, blocking, cancellable) \
-  begin_gnutls_io (gnutls, blocking, cancellable);     \
+#define BEGIN_GNUTLS_IO(gnutls, direction, blocking, cancellable)      \
+  begin_gnutls_io (gnutls, direction, blocking, cancellable);          \
   do {
 
-#define END_GNUTLS_IO(gnutls, ret, errmsg, err)        \
-  } while ((ret == GNUTLS_E_AGAIN ||                   \
-            ret == GNUTLS_E_WARNING_ALERT_RECEIVED) && \
-           !gnutls->priv->error);                      \
-  ret = end_gnutls_io (gnutls, ret, err);              \
-  if (ret < 0 && ret != GNUTLS_E_REHANDSHAKE && err && !*err) \
-    {                                                  \
-      g_set_error (err, G_TLS_ERROR, G_TLS_ERROR_MISC,\
-                   errmsg, gnutls_strerror (ret));     \
-    }                                                  \
-  ;
+#define END_GNUTLS_IO(gnutls, direction, ret, errmsg, err)             \
+  } while ((ret = end_gnutls_io (gnutls, direction, ret, errmsg, err)) == GNUTLS_E_AGAIN);
 
 gboolean
 g_tls_connection_gnutls_check (GTlsConnectionGnutls  *gnutls,
                               GIOCondition           condition)
 {
-  if (!gnutls->priv->internal_direction)
+  /* Racy, but worst case is that we just get WOULD_BLOCK back */
+  if (gnutls->priv->need_finish_handshake)
     return TRUE;
 
+  /* If a handshake or close is in progress, then tls_istream and
+   * tls_ostream are blocked, regardless of the base stream status.
+   */
   if (gnutls->priv->handshaking || gnutls->priv->closing)
-    condition = gnutls->priv->internal_direction;
+    return FALSE;
 
   if (condition & G_IO_IN)
     return g_pollable_input_stream_is_readable (gnutls->priv->base_istream);
@@ -554,13 +779,16 @@ g_tls_connection_gnutls_check (GTlsConnectionGnutls  *gnutls,
 }
 
 typedef struct {
-  GSource source;
+  GSource               source;
 
   GTlsConnectionGnutls *gnutls;
   GObject              *stream;
 
   GSource              *child_source;
-  GIOCondition          current_direction;
+  GIOCondition          condition;
+
+  gboolean              io_waiting;
+  gboolean              op_waiting;
 } GTlsConnectionGnutlsSource;
 
 static gboolean
@@ -577,40 +805,51 @@ gnutls_source_check (GSource *source)
   return FALSE;
 }
 
-static gboolean
-gnutls_source_sync_child_source (GTlsConnectionGnutlsSource *gnutls_source)
+static void
+gnutls_source_sync (GTlsConnectionGnutlsSource *gnutls_source)
 {
   GTlsConnectionGnutls *gnutls = gnutls_source->gnutls;
-  GSource *source = (GSource *)gnutls_source;
-  GIOCondition direction;
+  gboolean io_waiting, op_waiting;
 
-  if (gnutls->priv->handshaking || gnutls->priv->closing)
-    direction = gnutls->priv->internal_direction;
-  else if (!gnutls_source->stream)
-    return FALSE;
-  else if (G_IS_TLS_INPUT_STREAM_GNUTLS (gnutls_source->stream))
-    direction = G_IO_IN;
+  g_mutex_lock (&gnutls->priv->op_mutex);
+  if (((gnutls_source->condition & G_IO_IN) && gnutls->priv->reading) ||
+      ((gnutls_source->condition & G_IO_OUT) && gnutls->priv->writing) ||
+      (gnutls->priv->handshaking && !gnutls->priv->need_finish_handshake))
+    op_waiting = TRUE;
   else
-    direction = G_IO_OUT;
+    op_waiting = FALSE;
 
-  if (direction == gnutls_source->current_direction)
-    return TRUE;
+  if (!op_waiting && !gnutls->priv->need_handshake &&
+      !gnutls->priv->need_finish_handshake)
+    io_waiting = TRUE;
+  else
+    io_waiting = FALSE;
+  g_mutex_unlock (&gnutls->priv->op_mutex);
+
+  if (op_waiting == gnutls_source->op_waiting &&
+      io_waiting == gnutls_source->io_waiting)
+    return;
+  gnutls_source->op_waiting = op_waiting;
+  gnutls_source->io_waiting = io_waiting;
 
   if (gnutls_source->child_source)
     {
-      g_source_remove_child_source (source, gnutls_source->child_source);
+      g_source_remove_child_source ((GSource *)gnutls_source,
+                                   gnutls_source->child_source);
       g_source_unref (gnutls_source->child_source);
     }
 
-  if (direction & G_IO_IN)
+  if (op_waiting)
+    gnutls_source->child_source = g_cancellable_source_new (gnutls->priv->waiting_for_op);
+  else if (io_waiting && G_IS_POLLABLE_INPUT_STREAM (gnutls_source->stream))
     gnutls_source->child_source = g_pollable_input_stream_create_source (gnutls->priv->base_istream, NULL);
-  else
+  else if (io_waiting && G_IS_POLLABLE_OUTPUT_STREAM (gnutls_source->stream))
     gnutls_source->child_source = g_pollable_output_stream_create_source (gnutls->priv->base_ostream, NULL);
+  else
+    gnutls_source->child_source = g_timeout_source_new (0);
 
   g_source_set_dummy_callback (gnutls_source->child_source);
-  g_source_add_child_source (source, gnutls_source->child_source);
-  gnutls_source->current_direction = direction;
-  return TRUE;
+  g_source_add_child_source ((GSource *)gnutls_source, gnutls_source->child_source);
 }
 
 static gboolean
@@ -624,7 +863,7 @@ gnutls_source_dispatch (GSource     *source,
 
   ret = (*func) (gnutls_source->stream, user_data);
   if (ret)
-    ret = gnutls_source_sync_child_source (gnutls_source);
+    gnutls_source_sync (gnutls_source);
 
   return ret;
 }
@@ -635,9 +874,7 @@ gnutls_source_finalize (GSource *source)
   GTlsConnectionGnutlsSource *gnutls_source = (GTlsConnectionGnutlsSource *)source;
 
   g_object_unref (gnutls_source->gnutls);
-
-  if (gnutls_source->child_source)
-    g_source_unref (gnutls_source->child_source);
+  g_source_unref (gnutls_source->child_source);
 }
 
 static gboolean
@@ -686,11 +923,15 @@ g_tls_connection_gnutls_create_source (GTlsConnectionGnutls  *gnutls,
   g_source_set_name (source, "GTlsConnectionGnutlsSource");
   gnutls_source = (GTlsConnectionGnutlsSource *)source;
   gnutls_source->gnutls = g_object_ref (gnutls);
+  gnutls_source->condition = condition;
   if (condition & G_IO_IN)
     gnutls_source->stream = G_OBJECT (gnutls->priv->tls_istream);
   else if (condition & G_IO_OUT)
     gnutls_source->stream = G_OBJECT (gnutls->priv->tls_ostream);
-  gnutls_source_sync_child_source (gnutls_source);
+
+  gnutls_source->op_waiting = (gboolean) -1;
+  gnutls_source->io_waiting = (gboolean) -1;
+  gnutls_source_sync (gnutls_source);
 
   if (cancellable)
     {
@@ -704,15 +945,20 @@ g_tls_connection_gnutls_create_source (GTlsConnectionGnutls  *gnutls,
 }
 
 static void
-set_gnutls_error (GTlsConnectionGnutls *gnutls, GIOCondition direction)
+set_gnutls_error (GTlsConnectionGnutls *gnutls,
+                 GError               *error)
 {
-  if (g_error_matches (gnutls->priv->error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+  /* We set EINTR rather than EAGAIN for G_IO_ERROR_WOULD_BLOCK so
+   * that GNUTLS_E_AGAIN only gets returned for gnutls-internal
+   * reasons, not for actual socket EAGAINs (and we have access
+   * to @error at the higher levels, so we can distinguish them
+   * that way later).
+   */
+
+  if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+    gnutls_transport_set_errno (gnutls->priv->session, EINTR);
+  else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
     gnutls_transport_set_errno (gnutls->priv->session, EINTR);
-  else if (g_error_matches (gnutls->priv->error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-    {
-      gnutls_transport_set_errno (gnutls->priv->session, EAGAIN);
-      gnutls->priv->internal_direction = direction;
-    }
   else
     gnutls_transport_set_errno (gnutls->priv->session, EIO);
 }
@@ -725,21 +971,22 @@ g_tls_connection_gnutls_pull_func (gnutls_transport_ptr_t  transport_data,
   GTlsConnectionGnutls *gnutls = transport_data;
   ssize_t ret;
 
-  /* If gnutls->priv->error is non-%NULL when we're called, it means
+  /* If gnutls->priv->read_error is non-%NULL when we're called, it means
    * that an error previously occurred, but gnutls decided not to
    * propagate it. So it's correct for us to just clear it. (Usually
    * this means it ignored an EAGAIN after a short read, and now
    * we'll return EAGAIN again, which it will obey this time.)
    */
-  g_clear_error (&gnutls->priv->error);
+  g_clear_error (&gnutls->priv->read_error);
 
   ret = g_pollable_stream_read (G_INPUT_STREAM (gnutls->priv->base_istream),
-                               buf, buflen, gnutls->priv->blocking,
-                               gnutls->priv->cancellable,
-                               &gnutls->priv->error);
+                               buf, buflen,
+                               gnutls->priv->read_blocking,
+                               gnutls->priv->read_cancellable,
+                               &gnutls->priv->read_error);
 
   if (ret < 0)
-    set_gnutls_error (gnutls, G_IO_IN);
+    set_gnutls_error (gnutls, gnutls->priv->read_error);
 #ifndef GNUTLS_E_PREMATURE_TERMINATION
   else if (ret == 0)
     gnutls->priv->eof = TRUE;
@@ -757,18 +1004,76 @@ g_tls_connection_gnutls_push_func (gnutls_transport_ptr_t  transport_data,
   ssize_t ret;
 
   /* See comment in pull_func. */
-  g_clear_error (&gnutls->priv->error);
+  g_clear_error (&gnutls->priv->write_error);
 
   ret = g_pollable_stream_write (G_OUTPUT_STREAM (gnutls->priv->base_ostream),
-                                buf, buflen, gnutls->priv->blocking,
-                                gnutls->priv->cancellable,
-                                &gnutls->priv->error);
+                                buf, buflen,
+                                gnutls->priv->write_blocking,
+                                gnutls->priv->write_cancellable,
+                                &gnutls->priv->write_error);
   if (ret < 0)
-    set_gnutls_error (gnutls, G_IO_OUT);
+    set_gnutls_error (gnutls, gnutls->priv->write_error);
 
   return ret;
 }
 
+static void
+handshake_thread (GSimpleAsyncResult *result,
+                 GObject            *object,
+                 GCancellable       *cancellable)
+{
+  GTlsConnectionGnutls *gnutls = G_TLS_CONNECTION_GNUTLS (object);
+  gboolean is_client;
+  GError *error = NULL;
+  int ret;
+
+  gnutls->priv->started_handshake = FALSE;
+
+  if (!claim_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE,
+                TRUE, cancellable, &error))
+    {
+      g_simple_async_result_take_error (result, error);
+      return;
+    }
+
+  g_clear_error (&gnutls->priv->handshake_error);
+
+  is_client = G_IS_TLS_CLIENT_CONNECTION (gnutls);
+
+  if (!is_client && gnutls->priv->ever_handshaked &&
+      !gnutls->priv->implicit_handshake)
+    {
+      BEGIN_GNUTLS_IO (gnutls, G_IO_IN | G_IO_OUT, TRUE, cancellable);
+      ret = gnutls_rehandshake (gnutls->priv->session);
+      END_GNUTLS_IO (gnutls, G_IO_IN | G_IO_OUT, ret,
+                    _("Error performing TLS handshake: %s"), &error);
+
+      if (error)
+       {
+         g_simple_async_result_take_error (result, error);
+         return;
+       }
+    }
+
+  gnutls->priv->started_handshake = TRUE;
+
+  g_clear_object (&gnutls->priv->peer_certificate);
+  gnutls->priv->peer_certificate_errors = 0;
+
+  g_tls_connection_gnutls_set_handshake_priority (gnutls);
+
+  BEGIN_GNUTLS_IO (gnutls, G_IO_IN | G_IO_OUT, TRUE, cancellable);
+  ret = gnutls_handshake (gnutls->priv->session);
+  END_GNUTLS_IO (gnutls, G_IO_IN | G_IO_OUT, ret,
+                _("Error performing TLS handshake: %s"), &error);
+
+  gnutls->priv->ever_handshaked = TRUE;
+
+  if (error)
+    g_simple_async_result_take_error (result, error);
+  else
+    g_simple_async_result_set_op_res_gboolean (result, TRUE);
+}
 
 static GTlsCertificate *
 get_peer_certificate_from_session (GTlsConnectionGnutls *gnutls)
@@ -874,57 +1179,16 @@ accept_peer_certificate (GTlsConnectionGnutls *gnutls,
 }
 
 static gboolean
-handshake_internal (GTlsConnectionGnutls  *gnutls,
-                   gboolean               blocking,
-                   GCancellable          *cancellable,
-                   GError               **error)
+finish_handshake (GTlsConnectionGnutls  *gnutls,
+                 GSimpleAsyncResult    *result,
+                 GError               **error)
 {
-  GTlsCertificate *peer_certificate = NULL;
-  GTlsCertificateFlags peer_certificate_errors = 0;
-  int ret;
-
-  g_clear_error (&gnutls->priv->handshake_error);
-
-  if (G_IS_TLS_SERVER_CONNECTION_GNUTLS (gnutls) &&
-      gnutls->priv->ever_handshaked && !gnutls->priv->handshaking &&
-      !gnutls->priv->need_handshake)
-    {
-      BEGIN_GNUTLS_IO (gnutls, blocking, cancellable);
-      ret = gnutls_rehandshake (gnutls->priv->session);
-      END_GNUTLS_IO (gnutls, ret, _("Error performing TLS handshake: %s"), error);
-
-      if (ret != 0)
-       return FALSE;
-    }
-
-  if (!gnutls->priv->handshaking)
-    {
-      gnutls->priv->handshaking = TRUE;
-
-      g_clear_object (&gnutls->priv->peer_certificate);
-      gnutls->priv->peer_certificate_errors = 0;
-
-      g_tls_connection_gnutls_set_handshake_priority (gnutls);
-      G_TLS_CONNECTION_GNUTLS_GET_CLASS (gnutls)->begin_handshake (gnutls);
-    }
-
-  BEGIN_GNUTLS_IO (gnutls, blocking, cancellable);
-  ret = gnutls_handshake (gnutls->priv->session);
-  END_GNUTLS_IO (gnutls, ret, _("Error performing TLS handshake: %s"),
-                &gnutls->priv->handshake_error);
-
-  if (ret == GNUTLS_E_AGAIN)
-    {
-      g_propagate_error (error, gnutls->priv->handshake_error);
-      gnutls->priv->handshake_error = NULL;
-      return FALSE;
-    }
+  GTlsCertificate *peer_certificate;
+  GTlsCertificateFlags peer_certificate_errors;
 
-  gnutls->priv->handshaking = FALSE;
-  gnutls->priv->need_handshake = FALSE;
-  gnutls->priv->ever_handshaked = TRUE;
+  g_assert (error != NULL);
 
-  if (ret == 0 &&
+  if (!g_simple_async_result_propagate_error (result, error) &&
       gnutls_certificate_type_get (gnutls->priv->session) == GNUTLS_CRT_X509)
     peer_certificate = get_peer_certificate_from_session (gnutls);
   else
@@ -936,8 +1200,7 @@ handshake_internal (GTlsConnectionGnutls  *gnutls,
       if (!accept_peer_certificate (gnutls, peer_certificate,
                                    peer_certificate_errors))
        {
-         g_set_error_literal (&gnutls->priv->handshake_error,
-                              G_TLS_ERROR, G_TLS_ERROR_BAD_CERTIFICATE,
+         g_set_error_literal (error, G_TLS_ERROR, G_TLS_ERROR_BAD_CERTIFICATE,
                               _("Unacceptable TLS certificate"));
        }
 
@@ -946,43 +1209,18 @@ handshake_internal (GTlsConnectionGnutls  *gnutls,
       g_object_notify (G_OBJECT (gnutls), "peer-certificate");
       g_object_notify (G_OBJECT (gnutls), "peer-certificate-errors");
     }
-  else if (G_IS_TLS_CLIENT_CONNECTION (gnutls))
+  else if (error && !*error && G_IS_TLS_CLIENT_CONNECTION (gnutls))
     {
-      g_set_error_literal (&gnutls->priv->handshake_error,
-                          G_TLS_ERROR, G_TLS_ERROR_BAD_CERTIFICATE,
+      g_set_error_literal (error, G_TLS_ERROR, G_TLS_ERROR_BAD_CERTIFICATE,
                           _("Server did not return a valid TLS certificate"));
     }
 
-  G_TLS_CONNECTION_GNUTLS_GET_CLASS (gnutls)->
-    finish_handshake (gnutls, &gnutls->priv->handshake_error);
+  G_TLS_CONNECTION_GNUTLS_GET_CLASS (gnutls)->finish_handshake (gnutls, error);
 
-  if (gnutls->priv->handshake_error)
-    {
-      if (error)
-       *error = g_error_copy (gnutls->priv->handshake_error);
-      return FALSE;
-    }
-  else
-    return TRUE;
-}
+  if (*error && gnutls->priv->started_handshake)
+    gnutls->priv->handshake_error = g_error_copy (*error);
 
-static gboolean
-handshake_in_progress_or_failed (GTlsConnectionGnutls  *gnutls,
-                                gboolean               blocking,
-                                GCancellable          *cancellable,
-                                GError               **error)
-{
-  if (gnutls->priv->handshake_error)
-    {
-      if (error)
-       *error = g_error_copy (gnutls->priv->handshake_error);
-      return TRUE;
-    }
-
-  if (!(gnutls->priv->need_handshake || gnutls->priv->handshaking))
-    return FALSE;
-
-  return !handshake_internal (gnutls, blocking, cancellable, error);
+  return (*error == NULL);
 }
 
 static gboolean
@@ -991,40 +1229,52 @@ g_tls_connection_gnutls_handshake (GTlsConnection   *conn,
                                   GError          **error)
 {
   GTlsConnectionGnutls *gnutls = G_TLS_CONNECTION_GNUTLS (conn);
+  GSimpleAsyncResult *result;
+  gboolean success;
+  GError *my_error = NULL;
+
+  result = g_simple_async_result_new (G_OBJECT (conn), NULL, NULL,
+                                     g_tls_connection_gnutls_handshake);
+  handshake_thread (result, G_OBJECT (conn), cancellable);
+
+  success = finish_handshake (gnutls, result, &my_error);
+  g_object_unref (result);
 
-  return handshake_internal (gnutls, TRUE, cancellable, error);
+  yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE);
+
+  if (my_error)
+    g_propagate_error (error, my_error);
+  return success;
 }
 
-static gboolean
-g_tls_connection_gnutls_handshake_ready (GObject  *pollable_stream,
-                                        gpointer  user_data)
+/* In the async version we use two GSimpleAsyncResults; one to run
+ * handshake_thread() and then call handshake_thread_completed(), and
+ * a second to call the caller's original callback after we call
+ * finish_handshake().
+ */
+
+static void
+handshake_thread_completed (GObject      *object,
+                           GAsyncResult *result,
+                           gpointer      user_data)
 {
   GTlsConnectionGnutls *gnutls;
-  GSimpleAsyncResult *simple = user_data;
-  gboolean success;
+  GSimpleAsyncResult *caller_result = user_data;
   GError *error = NULL;
+  gboolean success;
 
-  gnutls = G_TLS_CONNECTION_GNUTLS (g_async_result_get_source_object (G_ASYNC_RESULT (simple)));
+  gnutls = G_TLS_CONNECTION_GNUTLS (g_async_result_get_source_object (G_ASYNC_RESULT (caller_result)));
   g_object_unref (gnutls);
 
-  success = handshake_internal (gnutls, FALSE, NULL, &error);
-  if (!success && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-    {
-      g_error_free (error);
-      return TRUE;
-    }
+  success = finish_handshake (gnutls, G_SIMPLE_ASYNC_RESULT (result), &error);
+  yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE);
 
-  if (error)
-    {
-      g_simple_async_result_set_from_error (simple, error);
-      g_error_free (error);
-    }
+  if (success)
+    g_simple_async_result_set_op_res_gboolean (caller_result, TRUE);
   else
-    g_simple_async_result_set_op_res_gboolean (simple, success);
-  g_simple_async_result_complete (simple);
-  g_object_unref (simple);
-
-  return FALSE;
+    g_simple_async_result_take_error (caller_result, error);
+  g_simple_async_result_complete (caller_result);
+  g_object_unref (caller_result);
 }
 
 static void
@@ -1034,40 +1284,17 @@ g_tls_connection_gnutls_handshake_async (GTlsConnection       *conn,
                                         GAsyncReadyCallback   callback,
                                         gpointer              user_data)
 {
-  GTlsConnectionGnutls *gnutls = G_TLS_CONNECTION_GNUTLS (conn);
-  GSimpleAsyncResult *simple;
-  gboolean success;
-  GError *error = NULL;
-  GSource *source;
+  GSimpleAsyncResult *thread_result, *caller_result;
 
-  simple = g_simple_async_result_new (G_OBJECT (conn), callback, user_data,
-                                     g_tls_connection_gnutls_handshake_async);
-  success = handshake_internal (gnutls, FALSE, cancellable, &error);
-  if (success)
-    {
-      g_simple_async_result_set_op_res_gboolean (simple, TRUE);
-      g_simple_async_result_complete_in_idle (simple);
-      g_object_unref (simple);
-      return;
-    }
-  else if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-    {
-      g_simple_async_result_set_from_error (simple, error);
-      g_error_free (error);
-      g_simple_async_result_complete_in_idle (simple);
-      g_object_unref (simple);
-      return;
-    }
-  else if (error)
-    g_error_free (error);
-
-  source = g_tls_connection_gnutls_create_source (gnutls, 0, cancellable);
-  g_source_set_callback (source,
-                        (GSourceFunc) g_tls_connection_gnutls_handshake_ready,
-                        simple, NULL);
-  g_source_set_priority (source, io_priority);
-  g_source_attach (source, g_main_context_get_thread_default ());
-  g_source_unref (source);
+  caller_result = g_simple_async_result_new (G_OBJECT (conn), callback, user_data,
+                                            g_tls_connection_gnutls_handshake_async);
+
+  thread_result = g_simple_async_result_new (G_OBJECT (conn),
+                                            handshake_thread_completed, caller_result,
+                                            g_tls_connection_gnutls_handshake_async);
+  g_simple_async_result_run_in_thread (thread_result, handshake_thread,
+                                      io_priority, cancellable);
+  g_object_unref (thread_result);
 }
 
 static gboolean
@@ -1087,6 +1314,66 @@ g_tls_connection_gnutls_handshake_finish (GTlsConnection       *conn,
   return g_simple_async_result_get_op_res_gboolean (simple);
 }
 
+static void
+implicit_handshake_completed (GObject      *object,
+                             GAsyncResult *result,
+                             gpointer      user_data)
+{
+  GTlsConnectionGnutls *gnutls = G_TLS_CONNECTION_GNUTLS (object);
+
+  g_mutex_lock (&gnutls->priv->op_mutex);
+  gnutls->priv->need_finish_handshake = TRUE;
+  g_mutex_unlock (&gnutls->priv->op_mutex);
+
+  yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE);
+}
+
+static gboolean
+do_implicit_handshake (GTlsConnectionGnutls  *gnutls,
+                      gboolean               blocking,
+                      GCancellable          *cancellable,
+                      GError               **error)
+{
+  /* We have op_mutex */
+
+  gnutls->priv->implicit_handshake =
+    g_simple_async_result_new (G_OBJECT (gnutls),
+                              implicit_handshake_completed, NULL,
+                              do_implicit_handshake);
+
+  if (blocking)
+    {
+      GError *my_error = NULL;
+      gboolean success;
+
+      g_mutex_unlock (&gnutls->priv->op_mutex);
+      handshake_thread (gnutls->priv->implicit_handshake,
+                       G_OBJECT (gnutls),
+                       cancellable);
+      success = finish_handshake (gnutls,
+                                 gnutls->priv->implicit_handshake,
+                                 &my_error);
+      g_clear_object (&gnutls->priv->implicit_handshake);
+      yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE);
+      g_mutex_lock (&gnutls->priv->op_mutex);
+
+      if (my_error)
+       g_propagate_error (error, my_error);
+      return success;
+    }
+  else
+    {
+      g_simple_async_result_run_in_thread (gnutls->priv->implicit_handshake,
+                                          handshake_thread,
+                                          G_PRIORITY_DEFAULT, cancellable);
+
+      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+                          _("Operation would block"));
+
+      return FALSE;
+    }
+}
+
 gssize
 g_tls_connection_gnutls_read (GTlsConnectionGnutls  *gnutls,
                              void                  *buffer,
@@ -1098,12 +1385,15 @@ g_tls_connection_gnutls_read (GTlsConnectionGnutls  *gnutls,
   gssize ret;
 
  again:
-  if (handshake_in_progress_or_failed (gnutls, blocking, cancellable, error))
+  if (!claim_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_READ,
+                blocking, cancellable, error))
     return -1;
 
-  BEGIN_GNUTLS_IO (gnutls, blocking, cancellable);
+  BEGIN_GNUTLS_IO (gnutls, G_IO_IN, blocking, cancellable);
   ret = gnutls_record_recv (gnutls->priv->session, buffer, count);
-  END_GNUTLS_IO (gnutls, ret, _("Error reading data from TLS socket: %s"), error);
+  END_GNUTLS_IO (gnutls, G_IO_IN, ret, _("Error reading data from TLS socket: %s"), error);
+
+  yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_READ);
 
   if (ret >= 0)
     return ret;
@@ -1124,12 +1414,15 @@ g_tls_connection_gnutls_write (GTlsConnectionGnutls  *gnutls,
   gssize ret;
 
  again:
-  if (handshake_in_progress_or_failed (gnutls, blocking, cancellable, error))
+  if (!claim_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_WRITE,
+                blocking, cancellable, error))
     return -1;
 
-  BEGIN_GNUTLS_IO (gnutls, blocking, cancellable);
+  BEGIN_GNUTLS_IO (gnutls, G_IO_OUT, blocking, cancellable);
   ret = gnutls_record_send (gnutls->priv->session, buffer, count);
-  END_GNUTLS_IO (gnutls, ret, _("Error writing data to TLS socket: %s"), error);
+  END_GNUTLS_IO (gnutls, G_IO_OUT, ret, _("Error writing data to TLS socket: %s"), error);
+
+  yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_WRITE);
 
   if (ret >= 0)
     return ret;
@@ -1156,114 +1449,64 @@ g_tls_connection_gnutls_get_output_stream (GIOStream *stream)
 }
 
 static gboolean
-close_internal (GTlsConnectionGnutls  *gnutls,
-               gboolean               blocking,
-               GCancellable          *cancellable,
-               GError               **error)
-{
-  int ret;
-
-  /* If we haven't finished the initial handshake yet, there's no
-   * reason to finish it just so we can close.
-   */
-  if (!gnutls->priv->ever_handshaked)
-    return TRUE;
-
-  if (handshake_in_progress_or_failed (gnutls, blocking, cancellable, error))
-    return FALSE;
-
-  gnutls->priv->closing = TRUE;
-  BEGIN_GNUTLS_IO (gnutls, blocking, cancellable);
-  ret = gnutls_bye (gnutls->priv->session, GNUTLS_SHUT_WR);
-  END_GNUTLS_IO (gnutls, ret, _("Error performing TLS close: %s"), error);
-  if (ret == 0 || !error || !g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-    gnutls->priv->closing = FALSE;
-
-  return ret == 0;
-}
-
-static gboolean
 g_tls_connection_gnutls_close (GIOStream     *stream,
                               GCancellable  *cancellable,
                               GError       **error)
 {
   GTlsConnectionGnutls *gnutls = G_TLS_CONNECTION_GNUTLS (stream);
+  gboolean success;
+  int ret;
 
-  if (!close_internal (gnutls, TRUE, cancellable, error))
+  if (!claim_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_CLOSE,
+                TRUE, cancellable, error))
     return FALSE;
-  return g_io_stream_close (gnutls->priv->base_io_stream,
-                           cancellable, error);
-}
 
-typedef struct {
-  GSimpleAsyncResult *simple;
-  GCancellable *cancellable;
-  int io_priority;
-} AsyncCloseData;
+  if (gnutls->priv->closed)
+    {
+      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+                          _("Connection is already closed"));
+      yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_CLOSE);
+      return FALSE;
+    }
 
-static void
-close_base_stream_cb (GObject      *base_stream,
-                     GAsyncResult *result,
-                     gpointer      user_data)
-{
-  gboolean success;
-  GError *error = NULL;
-  AsyncCloseData *acd = user_data;
+  if (gnutls->priv->ever_handshaked)
+    {
+      BEGIN_GNUTLS_IO (gnutls, G_IO_IN | G_IO_OUT, TRUE, cancellable);
+      ret = gnutls_bye (gnutls->priv->session, GNUTLS_SHUT_WR);
+      END_GNUTLS_IO (gnutls, G_IO_IN | G_IO_OUT, ret,
+                    _("Error performing TLS close: %s"), error);
+    }
 
-  success = g_io_stream_close_finish (G_IO_STREAM (base_stream),
-                                     result, &error);
-  if (success)
-    g_simple_async_result_set_op_res_gboolean (acd->simple, TRUE);
-  else
+  gnutls->priv->closed = TRUE;
+
+  if (ret != 0)
     {
-      g_simple_async_result_set_from_error (acd->simple, error);
-      g_error_free (error);
+      yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_CLOSE);
+      return FALSE;
     }
 
-  g_simple_async_result_complete (acd->simple);
-  g_object_unref (acd->simple);
-  if (acd->cancellable)
-    g_object_unref (acd->cancellable);
-  g_slice_free (AsyncCloseData, acd);
+  success = g_io_stream_close (gnutls->priv->base_io_stream,
+                              cancellable, error);
+  yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_CLOSE);
+  return success;
 }
 
-static gboolean
-g_tls_connection_gnutls_close_ready (GObject  *pollable_stream,
-                                    gpointer  user_data)
+/* We do async close as synchronous-in-a-thread so we don't need to
+ * implement G_IO_IN/G_IO_OUT flip-flopping just for this one case
+ * (since handshakes are also done synchronously now).
+ */
+static void
+close_thread (GSimpleAsyncResult *result,
+             GObject            *object,
+             GCancellable       *cancellable)
 {
-  GTlsConnectionGnutls *gnutls;
-  AsyncCloseData *acd = user_data;
-  gboolean success;
+  GIOStream *stream = G_IO_STREAM (object);
   GError *error = NULL;
 
-  gnutls = G_TLS_CONNECTION_GNUTLS (g_async_result_get_source_object (G_ASYNC_RESULT (acd->simple)));
-  g_object_unref (gnutls);
-
-  success = close_internal (gnutls, FALSE, NULL, &error);
-  if (!success && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-    {
-      g_error_free (error);
-      return TRUE;
-    }
-
-  if (error)
-    {
-      g_simple_async_result_set_from_error (acd->simple, error);
-      g_simple_async_result_complete (acd->simple);
-      g_error_free (error);
-      g_object_unref (acd->simple);
-      if (acd->cancellable)
-       g_object_unref (acd->cancellable);
-      g_slice_free (AsyncCloseData, acd);
-    }
+  if (!g_tls_connection_gnutls_close (stream, cancellable, &error))
+    g_simple_async_result_take_error (result, error);
   else
-    {
-      g_io_stream_close_async (gnutls->priv->base_io_stream,
-                              acd->io_priority, acd->cancellable,
-                              close_base_stream_cb, acd);
-    }
-
-  return FALSE;
+    g_simple_async_result_set_op_res_gboolean (result, TRUE);
 }
 
 static void
@@ -1273,48 +1516,14 @@ g_tls_connection_gnutls_close_async (GIOStream           *stream,
                                     GAsyncReadyCallback  callback,
                                     gpointer             user_data)
 {
-  GTlsConnectionGnutls *gnutls = G_TLS_CONNECTION_GNUTLS (stream);
-  GSimpleAsyncResult *simple;
-  gboolean success;
-  GError *error = NULL;
-  AsyncCloseData *acd;
-  GSource *source;
+  GSimpleAsyncResult *result;
 
-  simple = g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
+  result = g_simple_async_result_new (G_OBJECT (stream),
+                                     callback, user_data,
                                      g_tls_connection_gnutls_close_async);
-
-  success = close_internal (gnutls, FALSE, cancellable, &error);
-  if (error && !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-    {
-      g_simple_async_result_set_from_error (simple, error);
-      g_error_free (error);
-      g_simple_async_result_complete_in_idle (simple);
-      g_object_unref (simple);
-    }
-
-  if (error)
-    g_error_free (error);
-
-  acd = g_slice_new (AsyncCloseData);
-  acd->simple = simple;
-  acd->cancellable = cancellable ? g_object_ref (cancellable) : cancellable;
-  acd->io_priority = io_priority;
-
-  if (success)
-    {
-      g_io_stream_close_async (gnutls->priv->base_io_stream,
-                              io_priority, cancellable,
-                              close_base_stream_cb, acd);
-      return;
-    }
-
-  source = g_tls_connection_gnutls_create_source (gnutls, 0, acd->cancellable);
-  g_source_set_callback (source,
-                        (GSourceFunc) g_tls_connection_gnutls_close_ready,
-                        acd, NULL);
-  g_source_set_priority (source, acd->io_priority);
-  g_source_attach (source, g_main_context_get_thread_default ());
-  g_source_unref (source);
+  g_simple_async_result_run_in_thread (result, close_thread,
+                                      io_priority, cancellable);
+  g_object_unref (result);
 }
 
 static gboolean
index 30fdb4e..751490d 100644 (file)
@@ -41,6 +41,9 @@ typedef struct {
   gboolean rehandshake;
   GTlsCertificateFlags accept_flags;
   GError *read_error;
+
+  char buf[128];
+  gssize nread, nwrote;
 } TestConnection;
 
 static void
@@ -75,7 +78,7 @@ teardown_connection (TestConnection *test, gconstpointer data)
       g_object_add_weak_pointer (G_OBJECT (test->service), (gpointer *)&test->service);
       g_object_unref (test->service);
       while (test->service)
-       g_main_context_iteration (NULL, TRUE);
+       g_main_context_iteration (NULL, FALSE);
     }
 
   if (test->server_connection)
@@ -212,7 +215,7 @@ on_incoming_connection (GSocketService     *service,
 }
 
 static void
-start_server_service (TestConnection *test, GTlsAuthenticationMode auth_mode)
+start_async_server_service (TestConnection *test, GTlsAuthenticationMode auth_mode)
 {
   GError *error = NULL;
 
@@ -227,14 +230,101 @@ start_server_service (TestConnection *test, GTlsAuthenticationMode auth_mode)
   g_signal_connect (test->service, "incoming", G_CALLBACK (on_incoming_connection), test);
 }
 
-static GIOStream*
-start_server_and_connect_to_it (TestConnection *test, GTlsAuthenticationMode auth_mode)
+static GIOStream *
+start_async_server_and_connect_to_it (TestConnection *test, GTlsAuthenticationMode auth_mode)
+{
+  GSocketClient *client;
+  GError *error = NULL;
+  GSocketConnection *connection;
+
+  start_async_server_service (test, auth_mode);
+
+  client = g_socket_client_new ();
+  connection = g_socket_client_connect (client, G_SOCKET_CONNECTABLE (test->address),
+                                        NULL, &error);
+  g_assert_no_error (error);
+  g_object_unref (client);
+
+  return G_IO_STREAM (connection);
+}
+
+static void
+run_echo_server (GThreadedSocketService *service,
+                GSocketConnection      *connection,
+                GObject                *source_object,
+                gpointer                user_data)
+{
+  TestConnection *test = user_data;
+  GTlsConnection *tlsconn;
+  GTlsCertificate *cert;
+  GError *error = NULL;
+  GInputStream *istream;
+  GOutputStream *ostream;
+  gssize nread, nwrote, total;
+  gchar buf[128];
+
+  cert = g_tls_certificate_new_from_file (TEST_FILE ("server-and-key.pem"), &error);
+  g_assert_no_error (error);
+
+  test->server_connection = g_tls_server_connection_new (G_IO_STREAM (connection),
+                                                         cert, &error);
+  g_assert_no_error (error);
+  g_object_unref (cert);
+
+  tlsconn = G_TLS_CONNECTION (test->server_connection);
+  g_tls_connection_handshake (tlsconn, NULL, &error);
+  g_assert_no_error (error);
+
+  istream = g_io_stream_get_input_stream (test->server_connection);
+  ostream = g_io_stream_get_output_stream (test->server_connection);
+
+  while (TRUE)
+    {
+      nread = g_input_stream_read (istream, buf, sizeof (buf), NULL, &error);
+      g_assert_no_error (error);
+      g_assert_cmpint (nread, >=, 0);
+
+      if (nread == 0)
+       break;
+
+      for (total = 0; total < nread; total += nwrote)
+       {
+         nwrote = g_output_stream_write (ostream, buf + total, nread - total, NULL, &error);
+         g_assert_no_error (error);
+       }
+
+      if (test->rehandshake)
+       {
+         test->rehandshake = FALSE;
+         g_tls_connection_handshake (tlsconn, NULL, &error);
+         g_assert_no_error (error);
+       }
+    }
+}
+
+static void
+start_echo_server_service (TestConnection *test)
+{
+  GError *error = NULL;
+
+  test->service = g_threaded_socket_service_new (5);
+  g_socket_listener_add_address (G_SOCKET_LISTENER (test->service),
+                                 G_SOCKET_ADDRESS (test->address),
+                                 G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP,
+                                 NULL, NULL, &error);
+  g_assert_no_error (error);
+
+  g_signal_connect (test->service, "run", G_CALLBACK (run_echo_server), test);
+}
+
+static GIOStream *
+start_echo_server_and_connect_to_it (TestConnection *test)
 {
   GSocketClient *client;
   GError *error = NULL;
   GSocketConnection *connection;
 
-  start_server_service (test, auth_mode);
+  start_echo_server_service (test);
 
   client = g_socket_client_new ();
   connection = g_socket_client_connect (client, G_SOCKET_CONNECTABLE (test->address),
@@ -289,7 +379,7 @@ test_basic_connection (TestConnection *test,
   GIOStream *connection;
   GError *error = NULL;
 
-  connection = start_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_NONE);
+  connection = start_async_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_NONE);
   test->client_connection = g_tls_client_connection_new (connection, test->identity, &error);
   g_assert_no_error (error);
   g_object_unref (connection);
@@ -314,7 +404,7 @@ test_verified_connection (TestConnection *test,
   g_assert_no_error (error);
   g_assert (test->database);
 
-  connection = start_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_NONE);
+  connection = start_async_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_NONE);
   test->client_connection = g_tls_client_connection_new (connection, test->identity, &error);
   g_assert_no_error (error);
   g_assert (test->client_connection);
@@ -343,7 +433,7 @@ test_client_auth_connection (TestConnection *test,
   g_assert_no_error (error);
   g_assert (test->database);
 
-  connection = start_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_REQUIRED);
+  connection = start_async_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_REQUIRED);
   test->client_connection = g_tls_client_connection_new (connection, test->identity, &error);
   g_assert_no_error (error);
   g_assert (test->client_connection);
@@ -381,7 +471,7 @@ test_connection_no_database (TestConnection *test,
   GIOStream *connection;
   GError *error = NULL;
 
-  connection = start_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_NONE);
+  connection = start_async_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_NONE);
   test->client_connection = g_tls_client_connection_new (connection, test->identity, &error);
   g_assert_no_error (error);
   g_assert (test->client_connection);
@@ -427,7 +517,7 @@ test_failed_connection (TestConnection *test,
   GError *error = NULL;
   GSocketConnectable *bad_addr;
 
-  connection = start_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_NONE);
+  connection = start_async_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_NONE);
 
   bad_addr = g_network_address_new ("wrong.example.com", 80);
   test->client_connection = g_tls_client_connection_new (connection, bad_addr, &error);
@@ -475,7 +565,7 @@ test_connection_socket_client (TestConnection *test,
   GIOStream *base;
   GError *error = NULL;
 
-  start_server_service (test, G_TLS_AUTHENTICATION_NONE);
+  start_async_server_service (test, G_TLS_AUTHENTICATION_NONE);
   client = g_socket_client_new ();
   g_socket_client_set_tls (client, TRUE);
   flags = G_TLS_CERTIFICATE_VALIDATE_ALL & ~G_TLS_CERTIFICATE_UNKNOWN_CA;
@@ -523,7 +613,7 @@ test_connection_socket_client_failed (TestConnection *test,
 {
   GSocketClient *client;
 
-  start_server_service (test, G_TLS_AUTHENTICATION_NONE);
+  start_async_server_service (test, G_TLS_AUTHENTICATION_NONE);
   client = g_socket_client_new ();
   g_socket_client_set_tls (client, TRUE);
   /* this time we don't adjust the validation flags */
@@ -535,6 +625,201 @@ test_connection_socket_client_failed (TestConnection *test,
   g_object_unref (client);
 }
 
+static void
+simul_async_read_complete (GObject      *object,
+                          GAsyncResult *result,
+                          gpointer      user_data)
+{
+  TestConnection *test = user_data;
+  gssize nread;
+  GError *error = NULL;
+
+  nread = g_input_stream_read_finish (G_INPUT_STREAM (object),
+                                     result, &error);
+  g_assert_no_error (error);
+
+  test->nread += nread;
+  g_assert_cmpint (test->nread, <=, TEST_DATA_LENGTH);
+
+  if (test->nread == TEST_DATA_LENGTH)
+    {
+      g_io_stream_close (test->client_connection, NULL, &error);
+      g_assert_no_error (error);
+      g_main_loop_quit (test->loop);
+    }
+  else
+    {
+      g_input_stream_read_async (G_INPUT_STREAM (object),
+                                test->buf + test->nread,
+                                TEST_DATA_LENGTH / 2,
+                                G_PRIORITY_DEFAULT, NULL,
+                                simul_async_read_complete, test);
+    }
+}
+
+static void
+simul_async_write_complete (GObject      *object,
+                           GAsyncResult *result,
+                           gpointer      user_data)
+{
+  TestConnection *test = user_data;
+  gssize nwrote;
+  GError *error = NULL;
+
+  nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (object),
+                                        result, &error);
+  g_assert_no_error (error);
+
+  test->nwrote += nwrote;
+  if (test->nwrote < TEST_DATA_LENGTH)
+    {
+      g_output_stream_write_async (G_OUTPUT_STREAM (object),
+                                  TEST_DATA + test->nwrote,
+                                  TEST_DATA_LENGTH - test->nwrote,
+                                  G_PRIORITY_DEFAULT, NULL,
+                                  simul_async_write_complete, test);
+    }
+}
+
+static void
+test_simultaneous_async (TestConnection *test,
+                        gconstpointer   data)
+{
+  GIOStream *connection;
+  GTlsCertificateFlags flags;
+  GError *error = NULL;
+
+  connection = start_echo_server_and_connect_to_it (test);
+  test->client_connection = g_tls_client_connection_new (connection, test->identity, &error);
+  g_assert_no_error (error);
+  g_object_unref (connection);
+
+  flags = G_TLS_CERTIFICATE_VALIDATE_ALL &
+    ~(G_TLS_CERTIFICATE_UNKNOWN_CA | G_TLS_CERTIFICATE_BAD_IDENTITY);
+  g_tls_client_connection_set_validation_flags (G_TLS_CLIENT_CONNECTION (test->client_connection),
+                                                flags);
+
+  memset (test->buf, 0, sizeof (test->buf));
+  test->nread = test->nwrote = 0;
+
+  g_input_stream_read_async (g_io_stream_get_input_stream (test->client_connection),
+                            test->buf, TEST_DATA_LENGTH / 2,
+                            G_PRIORITY_DEFAULT, NULL,
+                            simul_async_read_complete, test);
+  g_output_stream_write_async (g_io_stream_get_output_stream (test->client_connection),
+                              TEST_DATA, TEST_DATA_LENGTH / 2,
+                              G_PRIORITY_DEFAULT, NULL,
+                              simul_async_write_complete, test);
+
+  g_main_loop_run (test->loop);
+
+  g_assert_cmpint (test->nread, ==, TEST_DATA_LENGTH);
+  g_assert_cmpint (test->nwrote, ==, TEST_DATA_LENGTH);
+  g_assert_cmpstr (test->buf, ==, TEST_DATA);
+}
+
+static void
+test_simultaneous_async_rehandshake (TestConnection *test,
+                                    gconstpointer   data)
+{
+  test->rehandshake = TRUE;
+  test_simultaneous_async (test, data);
+}
+
+static gpointer
+simul_read_thread (gpointer user_data)
+{
+  TestConnection *test = user_data;
+  GInputStream *istream = g_io_stream_get_input_stream (test->client_connection);
+  GError *error = NULL;
+  gssize nread;
+
+  while (test->nread < TEST_DATA_LENGTH)
+    {
+      nread = g_input_stream_read (istream,
+                                  test->buf + test->nread,
+                                  MIN (TEST_DATA_LENGTH / 2, TEST_DATA_LENGTH - test->nread),
+                                  NULL, &error);
+      g_assert_no_error (error);
+
+      test->nread += nread;
+    }
+
+  return NULL;
+}
+
+static gpointer
+simul_write_thread (gpointer user_data)
+{
+  TestConnection *test = user_data;
+  GOutputStream *ostream = g_io_stream_get_output_stream (test->client_connection);
+  GError *error = NULL;
+  gssize nwrote;
+
+  while (test->nwrote < TEST_DATA_LENGTH)
+    {
+      nwrote = g_output_stream_write (ostream,
+                                     TEST_DATA + test->nwrote,
+                                     MIN (TEST_DATA_LENGTH / 2, TEST_DATA_LENGTH - test->nwrote),
+                                     NULL, &error);
+      g_assert_no_error (error);
+
+      test->nwrote += nwrote;
+    }
+
+  return NULL;
+}
+
+static void
+test_simultaneous_sync (TestConnection *test,
+                       gconstpointer   data)
+{
+  GIOStream *connection;
+  GTlsCertificateFlags flags;
+  GError *error = NULL;
+  GThread *read_thread, *write_thread;
+
+  connection = start_echo_server_and_connect_to_it (test);
+  test->client_connection = g_tls_client_connection_new (connection, test->identity, &error);
+  g_assert_no_error (error);
+  g_object_unref (connection);
+
+  flags = G_TLS_CERTIFICATE_VALIDATE_ALL &
+    ~(G_TLS_CERTIFICATE_UNKNOWN_CA | G_TLS_CERTIFICATE_BAD_IDENTITY);
+  g_tls_client_connection_set_validation_flags (G_TLS_CLIENT_CONNECTION (test->client_connection),
+                                                flags);
+
+  memset (test->buf, 0, sizeof (test->buf));
+  test->nread = test->nwrote = 0;
+
+  read_thread = g_thread_new ("reader", simul_read_thread, test);
+  write_thread = g_thread_new ("writer", simul_write_thread, test);
+
+  /* We need to run the main loop to get the GThreadedSocketService to
+   * receive the connection and spawn the server thread.
+   */
+  while (!test->server_connection)
+    g_main_context_iteration (NULL, FALSE);
+
+  g_thread_join (write_thread);
+  g_thread_join (read_thread);
+
+  g_assert_cmpint (test->nread, ==, TEST_DATA_LENGTH);
+  g_assert_cmpint (test->nwrote, ==, TEST_DATA_LENGTH);
+  g_assert_cmpstr (test->buf, ==, TEST_DATA);
+
+  g_io_stream_close (test->client_connection, NULL, &error);
+  g_assert_no_error (error);
+}
+
+static void
+test_simultaneous_sync_rehandshake (TestConnection *test,
+                                   gconstpointer   data)
+{
+  test->rehandshake = TRUE;
+  test_simultaneous_sync (test, data);
+}
+
 int
 main (int   argc,
       char *argv[])
@@ -564,6 +849,14 @@ main (int   argc,
               setup_connection, test_connection_socket_client, teardown_connection);
   g_test_add ("/tls/connection/socket-client-failed", TestConnection, NULL,
               setup_connection, test_connection_socket_client_failed, teardown_connection);
+  g_test_add ("/tls/connection/simultaneous-async", TestConnection, NULL,
+              setup_connection, test_simultaneous_async, teardown_connection);
+  g_test_add ("/tls/connection/simultaneous-sync", TestConnection, NULL,
+             setup_connection, test_simultaneous_sync, teardown_connection);
+  g_test_add ("/tls/connection/simultaneous-async-rehandshake", TestConnection, NULL,
+              setup_connection, test_simultaneous_async_rehandshake, teardown_connection);
+  g_test_add ("/tls/connection/simultaneous-sync-rehandshake", TestConnection, NULL,
+             setup_connection, test_simultaneous_sync_rehandshake, teardown_connection);
 
   ret = g_test_run();