Add support for graceful disconnect to GTcpConnection
authorAlexander Larsson <alexl@redhat.com>
Wed, 20 May 2009 09:19:47 +0000 (11:19 +0200)
committerAlexander Larsson <alexl@redhat.com>
Wed, 20 May 2009 09:19:47 +0000 (11:19 +0200)
docs/reference/gio/gio-sections.txt
gio/gio.symbols
gio/gtcpconnection.c
gio/gtcpconnection.h
gio/tests/send-data.c

index 581eda6..e2fce8f 100644 (file)
@@ -1722,6 +1722,8 @@ g_socket_connection_get_remote_address
 g_socket_connection_get_socket
 <SUBSECTION>
 GTcpConnection
+g_tcp_connection_set_graceful_disconnect
+g_tcp_connection_get_graceful_disconnect
 <SUBSECTION>
 GUnixConnection
 g_unix_connection_receive_fd
index 6cf2493..7db9b5c 100644 (file)
@@ -1182,6 +1182,8 @@ g_threaded_socket_service_new
 #if IN_HEADER(__G_TCP_CONNECTION_H__)
 #if IN_FILE(__G_TCP_CONNECTION_C__)
 g_tcp_connection_get_type G_GNUC_CONST
+g_tcp_connection_set_graceful_disconnect
+g_tcp_connection_get_graceful_disconnect
 #endif
 #endif
 
index bd1c411..6c4e320 100644 (file)
@@ -29,6 +29,9 @@
 
 #include "config.h"
 #include "gtcpconnection.h"
+#include "gasyncresult.h"
+#include "gsimpleasyncresult.h"
+#include "giostream.h"
 #include "glibintl.h"
 
 #include "gioalias.h"
@@ -53,15 +56,343 @@ G_DEFINE_TYPE_WITH_CODE (GTcpConnection, g_tcp_connection,
                                             g_socket_protocol_id_lookup_by_name ("tcp"));
                         );
 
+static gboolean g_tcp_connection_close       (GIOStream            *stream,
+                                             GCancellable         *cancellable,
+                                             GError              **error);
+static void     g_tcp_connection_close_async (GIOStream            *stream,
+                                             int                   io_priority,
+                                             GCancellable         *cancellable,
+                                             GAsyncReadyCallback   callback,
+                                             gpointer              user_data);
+
+struct _GTcpConnectionPrivate
+{
+  guint graceful_disconnect : 1;
+};
+
+
+enum
+{
+  PROP_0,
+  PROP_GRACEFUL_DISCONNECT
+};
+
 static void
 g_tcp_connection_init (GTcpConnection *connection)
 {
+  connection->priv = G_TYPE_INSTANCE_GET_PRIVATE (connection,
+                                                  G_TYPE_TCP_CONNECTION,
+                                                  GTcpConnectionPrivate);
+  connection->priv->graceful_disconnect = FALSE;
+}
+
+static void
+g_tcp_connection_get_property (GObject    *object,
+                              guint       prop_id,
+                              GValue     *value,
+                              GParamSpec *pspec)
+{
+  GTcpConnection *connection = G_TCP_CONNECTION (object);
+
+  switch (prop_id)
+    {
+      case PROP_GRACEFUL_DISCONNECT:
+       g_value_set_boolean (value, connection->priv->graceful_disconnect);
+       break;
+
+      default:
+       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+    }
+}
+
+static void
+g_tcp_connection_set_property (GObject      *object,
+                              guint         prop_id,
+                              const GValue *value,
+                              GParamSpec   *pspec)
+{
+  GTcpConnection *connection = G_TCP_CONNECTION (object);
+
+  switch (prop_id)
+    {
+      case PROP_GRACEFUL_DISCONNECT:
+       g_tcp_connection_set_graceful_disconnect (connection,
+                                                 g_value_get_boolean (value));
+       break;
+
+      default:
+       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+    }
 }
 
 static void
 g_tcp_connection_class_init (GTcpConnectionClass *class)
 {
+  GObjectClass *gobject_class = G_OBJECT_CLASS (class);
+  GIOStreamClass *stream_class = G_IO_STREAM_CLASS (class);
+
+  g_type_class_add_private (class, sizeof (GTcpConnectionPrivate));
+
+  gobject_class->set_property = g_tcp_connection_set_property;
+  gobject_class->get_property = g_tcp_connection_get_property;
+
+  stream_class->close_fn = g_tcp_connection_close;
+  stream_class->close_async = g_tcp_connection_close_async;
+
+  g_object_class_install_property (gobject_class, PROP_GRACEFUL_DISCONNECT,
+                                  g_param_spec_boolean ("graceful-disconnect",
+                                                        P_("Graceful Disconnect"),
+                                                        P_("Whether or not close does a graceful disconnect"),
+                                                        FALSE,
+                                                        G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+}
+
+static gboolean
+g_tcp_connection_close (GIOStream            *stream,
+                       GCancellable         *cancellable,
+                       GError              **error)
+{
+  GTcpConnection *connection = G_TCP_CONNECTION (stream);
+  GSocket *socket;
+  char buffer[1024];
+  gssize ret;
+  GError *my_error;
+  gboolean had_error;
+
+  socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (stream));
+  had_error = FALSE;
+
+  if (connection->priv->graceful_disconnect &&
+      !g_cancellable_is_cancelled (cancellable) /* Cancelled -> close fast */)
+    {
+      if (!g_socket_shutdown (socket, FALSE, TRUE, error))
+       {
+         error = NULL; /* Ignore further errors */
+         had_error = TRUE;
+       }
+      else
+       {
+         while (TRUE)
+           {
+             if (!g_socket_condition_wait (socket,
+                                           G_IO_IN, cancellable, error))
+               {
+                 had_error = TRUE;
+                 error = NULL;
+                 break;
+               }
+
+             my_error = NULL;
+             ret = g_socket_receive (socket,  buffer, sizeof (buffer),
+                                     &my_error);
+             if (ret < 0)
+               {
+                 if (g_error_matches (my_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+                   g_error_free (my_error);
+                 else
+                   {
+                     had_error = TRUE;
+                     g_propagate_error (error, my_error);
+                     error = NULL;
+                     break;
+                   }
+               }
+             if (ret == 0)
+               break;
+           }
+       }
+    }
+
+  return G_IO_STREAM_CLASS (g_tcp_connection_parent_class)
+    ->close_fn (stream, cancellable, error) && !had_error;
+}
+
+typedef struct {
+  GSimpleAsyncResult *res;
+  GCancellable *cancellable;
+} CloseAsyncData;
+
+static void
+close_async_data_free (CloseAsyncData *data)
+{
+  g_object_unref (data->res);
+  if (data->cancellable)
+    g_object_unref (data->cancellable);
+  g_free (data);
 }
 
+static void
+async_close_finish (CloseAsyncData *data, GError *error, gboolean in_mainloop)
+{
+  GIOStreamClass *parent = G_IO_STREAM_CLASS (g_tcp_connection_parent_class);
+  GIOStream *stream;
+  GError *my_error;
+
+  stream = G_IO_STREAM (g_async_result_get_source_object (G_ASYNC_RESULT (data->res)));
+
+  /* Doesn't block, ignore error */
+  if (error)
+    {
+      parent->close_fn (stream, data->cancellable, NULL);
+      g_simple_async_result_set_from_error (data->res, error);
+    }
+  else
+    {
+      my_error = NULL;
+      parent->close_fn (stream, data->cancellable, &my_error);
+      if (my_error)
+       {
+         g_simple_async_result_set_from_error (data->res, my_error);
+         g_error_free (my_error);
+       }
+    }
+
+  if (in_mainloop)
+    g_simple_async_result_complete (data->res);
+  else
+    g_simple_async_result_complete_in_idle (data->res);
+}
+
+static gboolean
+close_read_ready (GSocket *socket,
+                 GIOCondition condition,
+                 CloseAsyncData *data)
+{
+  GError *error = NULL;
+  char buffer[1024];
+  gssize ret;
+
+  if (g_cancellable_set_error_if_cancelled (data->cancellable,
+                                           &error))
+    {
+      async_close_finish (data, error, TRUE);
+      g_error_free (error);
+      return FALSE;
+    }
+
+  ret = g_socket_receive (socket,  buffer, sizeof (buffer), &error);
+  if (ret < 0)
+    {
+      if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+       g_error_free (error);
+      else
+       {
+         async_close_finish (data, error, TRUE);
+         g_error_free (error);
+         return FALSE;
+       }
+    }
+
+  if (ret == 0)
+    {
+      async_close_finish (data, NULL, TRUE);
+      return FALSE;
+    }
+
+  return TRUE;
+}
+
+
+static void
+g_tcp_connection_close_async (GIOStream        *stream,
+                             int               io_priority,
+                             GCancellable     *cancellable,
+                             GAsyncReadyCallback callback,
+                             gpointer          user_data)
+{
+  GTcpConnection *connection = G_TCP_CONNECTION (stream);
+  CloseAsyncData *data;
+  GSocket *socket;
+  GSource *source;
+  GError *error;
+
+  if (connection->priv->graceful_disconnect &&
+      !g_cancellable_is_cancelled (cancellable) /* Cancelled -> close fast */)
+    {
+      data = g_new (CloseAsyncData, 1);
+      data->res =
+       g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
+                                  g_tcp_connection_close_async);
+      if (cancellable)
+       data->cancellable = g_object_ref (cancellable);
+      else
+       data->cancellable = NULL;
+
+      socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (stream));
+
+      error = NULL;
+      if (!g_socket_shutdown (socket, FALSE, TRUE, &error))
+       {
+         async_close_finish (data, error, FALSE);
+         g_error_free (error);
+         close_async_data_free (data);
+         return;
+       }
+
+      source = g_socket_create_source (socket, G_IO_IN, cancellable);
+      g_source_set_callback (source,
+                            (GSourceFunc) close_read_ready,
+                            data, (GDestroyNotify)close_async_data_free);
+      g_source_attach (source, NULL);
+      g_source_unref (source);
+
+      return;
+    }
+
+ out:
+  return G_IO_STREAM_CLASS (g_tcp_connection_parent_class)
+    ->close_async (stream, io_priority, cancellable, callback, user_data);
+
+
+}
+
+/**
+ * g_tcp_connection_set_graceful_disconnect:
+ * @connection: a #GTcpConnection
+ * @graceful_disconnect: Whether to do graceful disconnects or not
+ *
+ * This enabled graceful disconnects on close. A graceful disconnect
+ * means that we signal the recieving end that the connection is terminated
+ * and wait for it to close the connection before closing the connection.
+ *
+ * A graceful disconnect means that we can be sure that we successfully sent
+ * all the outstanding data to the other end, or get an error reported.
+ * However, it also means we have to wait for all the data to reach the
+ * other side and for it to acknowledge this by closing the socket, which may
+ * take a while. For this reason it is disabled by default.
+ *
+ * Since: 2.22
+ **/
+void
+g_tcp_connection_set_graceful_disconnect (GTcpConnection *connection,
+                                         gboolean        graceful_disconnect)
+{
+  graceful_disconnect = !!graceful_disconnect;
+  if (graceful_disconnect != connection->priv->graceful_disconnect)
+    {
+      connection->priv->graceful_disconnect = graceful_disconnect;
+      g_object_notify (G_OBJECT (connection), "graceful-disconnect");
+    }
+}
+
+/**
+ * g_tcp_connection_get_graceful_disconnect:
+ * @connection: a #GTcpConnection
+ *
+ * Checks if graceful disconnects are used. See
+ * g_tcp_connection_set_graceful_disconnect().
+ *
+ * Returns: %TRUE if graceful disconnect is used on close, %FALSE otherwise
+ *
+ * Since: 2.22
+ **/
+gboolean
+g_tcp_connection_get_graceful_disconnect (GTcpConnection *connection)
+{
+  return connection->priv->graceful_disconnect;
+}
+
+
 #define __G_TCP_CONNECTION_C__
 #include "gioaliasdef.c"
index a9eed5d..3928825 100644 (file)
@@ -57,7 +57,11 @@ struct _GTcpConnection
   GTcpConnectionPrivate *priv;
 };
 
-GType                   g_tcp_connection_get_type                       (void);
+GType    g_tcp_connection_get_type                (void) G_GNUC_CONST;
+
+void     g_tcp_connection_set_graceful_disconnect (GTcpConnection *connection,
+                                                  gboolean        graceful_disconnect);
+gboolean g_tcp_connection_get_graceful_disconnect (GTcpConnection *connection);
 
 G_END_DECLS
 
index 41eed5f..d2a702b 100644 (file)
@@ -2,10 +2,18 @@
 #include <string.h>
 #include <stdio.h>
 
+GMainLoop *loop;
+
 int cancel_timeout = 0;
+gboolean async = FALSE;
+gboolean graceful = FALSE;
 static GOptionEntry cmd_entries[] = {
   {"cancel", 'c', 0, G_OPTION_ARG_INT, &cancel_timeout,
    "Cancel any op after the specified amount of seconds", NULL},
+  {"async", 'a', 0, G_OPTION_ARG_NONE, &async,
+   "Use async ops", NULL},
+  {"graceful-disconnect", 'g', 0, G_OPTION_ARG_NONE, &graceful,
+   "Use graceful disconnect", NULL},
   {NULL}
 };
 
@@ -35,6 +43,17 @@ socket_address_to_string (GSocketAddress *address)
   return res;
 }
 
+static void
+async_cb (GObject *source_object,
+         GAsyncResult *res,
+         gpointer user_data)
+{
+  GAsyncResult **resp = user_data;
+  *resp = g_object_ref (res);
+  g_main_loop_quit (loop);
+}
+
+
 int
 main (int argc, char *argv[])
 {
@@ -64,6 +83,9 @@ main (int argc, char *argv[])
       return 1;
     }
 
+  if (async)
+    loop = g_main_loop_new (NULL, FALSE);
+
   if (cancel_timeout)
     {
       cancellable = g_cancellable_new ();
@@ -96,6 +118,9 @@ main (int argc, char *argv[])
           socket_address_to_string (address));
   g_object_unref (address);
 
+  if (graceful)
+    g_tcp_connection_set_graceful_disconnect (G_TCP_CONNECTION (connection), TRUE);
+
   out = g_io_stream_get_output_stream (G_IO_STREAM (connection));
 
   while (fgets(buffer, sizeof (buffer), stdin) != NULL)
@@ -110,10 +135,28 @@ main (int argc, char *argv[])
     }
 
   g_print ("closing stream\n");
-  if (!g_io_stream_close (G_IO_STREAM (connection), cancellable, &error))
+  if (async)
     {
-      g_warning ("close error: %s\n",  error->message);
-      return 1;
+      GAsyncResult *res;
+      g_io_stream_close_async (G_IO_STREAM (connection),
+                              0, cancellable, async_cb, &res);
+      g_main_loop_run (loop);
+      if (!g_io_stream_close_finish (G_IO_STREAM (connection),
+                                    res, &error))
+       {
+         g_object_unref (res);
+         g_warning ("close error: %s\n",  error->message);
+         return 1;
+       }
+      g_object_unref (res);
+    }
+  else
+    {
+      if (!g_io_stream_close (G_IO_STREAM (connection), cancellable, &error))
+       {
+         g_warning ("close error: %s\n",  error->message);
+         return 1;
+       }
     }
 
   return 0;