gcr: Implement input for GcrGnupgProcess
authorStef Walter <stefw@collabora.co.uk>
Wed, 14 Sep 2011 09:52:24 +0000 (11:52 +0200)
committerStef Walter <stefw@collabora.co.uk>
Tue, 27 Sep 2011 07:38:46 +0000 (09:38 +0200)
 * Use io streams for input and output

docs/reference/gcr/gcr-sections.txt
gcr/Makefile.am
gcr/gcr-callback-output-stream.c [new file with mode: 0644]
gcr/gcr-callback-output-stream.h [new file with mode: 0644]
gcr/gcr-gnupg-collection.c
gcr/gcr-gnupg-process.c
gcr/gcr-gnupg-process.h
gcr/tests/files/gnupg-mock/mock-arguments-environ
gcr/tests/files/gnupg-mock/mock-echo [new file with mode: 0755]
gcr/tests/test-gnupg-process.c

index fe3419a..84ab8c0 100644 (file)
@@ -592,4 +592,13 @@ GCR_ERROR
 gcr_error_get_domain
 GcrOpensshPubCallback
 GcrOpenpgpCallback
+GCR_CALLBACK_OUTPUT_STREAM
+GCR_CALLBACK_OUTPUT_STREAM_CLASS
+GCR_CALLBACK_OUTPUT_STREAM_GET_CLASS
+GCR_IS_CALLBACK_OUTPUT_STREAM
+GCR_IS_CALLBACK_OUTPUT_STREAM_CLASS
+GCR_TYPE_CALLBACK_OUTPUT_STREAM
+GcrCallbackOutputFunc
+GcrCallbackOutputStream
+GcrCallbackOutputStreamClass
 </SECTION>
index 8dfeef2..27c170d 100644 (file)
@@ -90,6 +90,7 @@ BUILT_SOURCES = \
        $(BUILT_UI_FILES)
 
 libgcr_base_@GCR_MAJOR@_la_SOURCES = \
+       gcr-callback-output-stream.c gcr-callback-output-stream.h \
        gcr-certificate.c gcr-certificate.h \
        gcr-certificate-chain.c gcr-certificate-chain.h \
        gcr-collection.c gcr-collection.h \
diff --git a/gcr/gcr-callback-output-stream.c b/gcr/gcr-callback-output-stream.c
new file mode 100644 (file)
index 0000000..2d4e0e5
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ * gnome-keyring
+ *
+ * Copyright (C) 2011 Collabora Ltd.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
+ * 02111-1307, USA.
+ *
+ * Author: Stef Walter <stefw@collabora.co.uk>
+ */
+
+#include "config.h"
+
+#include "gcr-callback-output-stream.h"
+#define DEBUG_FLAG GCR_DEBUG_GNUPG
+#include "gcr-debug.h"
+
+#include <glib/gi18n.h>
+
+struct _GcrCallbackOutputStream {
+       GOutputStream parent;
+       GcrCallbackOutputFunc callback;
+       gpointer user_data;
+       GDestroyNotify destroy_func;
+};
+
+struct _GcrCallbackOutputStreamClass {
+       GOutputStreamClass parent_class;
+};
+
+G_DEFINE_TYPE (GcrCallbackOutputStream, _gcr_callback_output_stream, G_TYPE_OUTPUT_STREAM);
+
+static void
+_gcr_callback_output_stream_init (GcrCallbackOutputStream *self)
+{
+
+}
+
+static gssize
+_gcr_callback_output_stream_write (GOutputStream *stream,
+                                   const void *buffer,
+                                   gsize count,
+                                   GCancellable *cancellable,
+                                   GError **error)
+{
+       GcrCallbackOutputStream *self = GCR_CALLBACK_OUTPUT_STREAM (stream);
+
+       if (g_cancellable_set_error_if_cancelled (cancellable, error)) {
+               return -1;
+       } else if (self->callback == NULL) {
+               g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+                            _("The stream was closed"));
+               return -1;
+       }
+
+       return (self->callback) (buffer, count, cancellable, self->user_data, error);
+}
+
+static gboolean
+_gcr_callback_output_stream_close (GOutputStream *stream,
+                                   GCancellable *cancellable,
+                                   GError **error)
+{
+       GcrCallbackOutputStream *self = GCR_CALLBACK_OUTPUT_STREAM (stream);
+       if (g_cancellable_set_error_if_cancelled (cancellable, error)) {
+               return FALSE;
+       } else if (self->callback == NULL) {
+               g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+                            _("The stream was closed"));
+               return FALSE;
+       }
+
+       if (self->destroy_func != NULL)
+               (self->destroy_func) (self->user_data);
+       self->destroy_func = NULL;
+       self->user_data = NULL;
+       self->callback = NULL;
+
+       return TRUE;
+}
+
+static void
+_gcr_callback_output_stream_dispose (GObject *obj)
+{
+       _gcr_callback_output_stream_close (G_OUTPUT_STREAM (obj), NULL, NULL);
+       G_OBJECT_CLASS (_gcr_callback_output_stream_parent_class)->dispose (obj);
+}
+
+static void
+_gcr_callback_output_stream_class_init (GcrCallbackOutputStreamClass *klass)
+{
+       GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+       GOutputStreamClass *output_class = G_OUTPUT_STREAM_CLASS (klass);
+
+       gobject_class->dispose = _gcr_callback_output_stream_dispose;
+       output_class->write_fn = _gcr_callback_output_stream_write;
+       output_class->close_fn = _gcr_callback_output_stream_close;
+}
+
+GOutputStream *
+_gcr_callback_output_stream_new (GcrCallbackOutputFunc callback,
+                                 gpointer user_data,
+                                 GDestroyNotify destroy_func)
+{
+       GcrCallbackOutputStream *self;
+
+       g_return_val_if_fail (callback, NULL);
+
+       self = g_object_new (GCR_TYPE_CALLBACK_OUTPUT_STREAM, NULL);
+       self->callback = callback;
+       self->user_data = user_data;
+       self->destroy_func = destroy_func;
+
+       return G_OUTPUT_STREAM (self);
+}
diff --git a/gcr/gcr-callback-output-stream.h b/gcr/gcr-callback-output-stream.h
new file mode 100644 (file)
index 0000000..51d618e
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * gnome-keyring
+ *
+ * Copyright (C) 2011 Collabora Ltd.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
+ * 02111-1307, USA.
+ *
+ * Author: Stef Walter <stefw@collabora.co.uk>
+ */
+
+#ifndef GCR_CALLBACK_OUTPUT_STREAM_H
+#define GCR_CALLBACK_OUTPUT_STREAM_H
+
+#include "gcr-base.h"
+#include "gcr-collection.h"
+
+#include <glib-object.h>
+
+G_BEGIN_DECLS
+
+#define GCR_TYPE_CALLBACK_OUTPUT_STREAM               (_gcr_callback_output_stream_get_type ())
+#define GCR_CALLBACK_OUTPUT_STREAM(obj)               (G_TYPE_CHECK_INSTANCE_CAST ((obj), GCR_TYPE_CALLBACK_OUTPUT_STREAM, GcrCallbackOutputStream))
+#define GCR_CALLBACK_OUTPUT_STREAM_CLASS(klass)       (G_TYPE_CHECK_CLASS_CAST ((klass), GCR_TYPE_CALLBACK_OUTPUT_STREAM, GcrCallbackOutputStreamClass))
+#define GCR_IS_CALLBACK_OUTPUT_STREAM(obj)            (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GCR_TYPE_CALLBACK_OUTPUT_STREAM))
+#define GCR_IS_CALLBACK_OUTPUT_STREAM_CLASS(klass)    (G_TYPE_CHECK_CLASS_TYPE ((klass), GCR_TYPE_CALLBACK_OUTPUT_STREAM))
+#define GCR_CALLBACK_OUTPUT_STREAM_GET_CLASS(obj)     (G_TYPE_INSTANCE_GET_CLASS ((obj), GCR_TYPE_CALLBACK_OUTPUT_STREAM, GcrCallbackOutputStreamClass))
+
+typedef struct _GcrCallbackOutputStream GcrCallbackOutputStream;
+typedef struct _GcrCallbackOutputStreamClass GcrCallbackOutputStreamClass;
+
+GType               _gcr_callback_output_stream_get_type   (void);
+
+typedef gssize      (*GcrCallbackOutputFunc)               (gconstpointer buffer,
+                                                            gsize count,
+                                                            GCancellable *cancellable,
+                                                            gpointer user_data,
+                                                            GError **error);
+
+GOutputStream *     _gcr_callback_output_stream_new        (GcrCallbackOutputFunc callback,
+                                                            gpointer user_data,
+                                                            GDestroyNotify destroy_func);
+
+G_END_DECLS
+
+#endif /* GCR_CALLBACK_OUTPUT_STREAM_H */
index 26405a4..3d65ba8 100644 (file)
@@ -23,7 +23,7 @@
 
 #include "config.h"
 
-#include "gcr-record.h"
+#include "gcr-callback-output-stream.h"
 #include "gcr-collection.h"
 #define DEBUG_FLAG GCR_DEBUG_GNUPG
 #include "gcr-debug.h"
@@ -32,6 +32,7 @@
 #include "gcr-gnupg-process.h"
 #include "gcr-gnupg-util.h"
 #include "gcr-internal.h"
+#include "gcr-record.h"
 #include "gcr-util.h"
 
 #include <sys/wait.h>
@@ -54,9 +55,6 @@ G_DEFINE_TYPE_WITH_CODE (GcrGnupgCollection, _gcr_gnupg_collection, G_TYPE_OBJEC
        G_IMPLEMENT_INTERFACE (GCR_TYPE_COLLECTION, _gcr_collection_iface)
 );
 
-/* -----------------------------------------------------------------------------
- * OBJECT
- */
 
 static void
 _gcr_gnupg_collection_init (GcrGnupgCollection *self)
@@ -243,10 +241,10 @@ typedef struct {
        GString *out_data;                    /* Pending output not yet parsed into colons */
        GHashTable *difference;               /* Hashset gchar *keyid -> gchar *keyid */
 
-       guint output_sig;
        guint error_sig;
        guint status_sig;
-       guint attribute_sig;
+       GOutputStream *output;
+       GOutputStream *outattr;
 
        GQueue *attribute_queue;              /* Queue of unprocessed GcrRecord* status records */
        GByteArray *attribute_buf;            /* Buffer of unprocessed attribute data received */
@@ -268,17 +266,18 @@ _gcr_gnupg_collection_load_free (gpointer data)
        g_object_unref (load->collection);
 
        if (load->process) {
-               if (load->output_sig)
-                       g_signal_handler_disconnect (load->process, load->output_sig);
                if (load->error_sig)
                        g_signal_handler_disconnect (load->process, load->error_sig);
                if (load->status_sig)
                        g_signal_handler_disconnect (load->process, load->status_sig);
-               if (load->attribute_sig)
-                       g_signal_handler_disconnect (load->process, load->attribute_sig);
                g_object_unref (load->process);
        }
 
+       g_output_stream_close (load->output, NULL, NULL);
+       g_object_unref (load->output);
+       g_output_stream_close (load->outattr, NULL, NULL);
+       g_object_unref (load->outattr);
+
        if (load->cancel)
                g_object_unref (load->cancel);
 
@@ -515,15 +514,19 @@ on_line_parse_output (const gchar *line, gpointer user_data)
 }
 
 
-static void
-on_gnupg_process_output_data (GcrGnupgProcess *process, GByteArray *buffer,
-                              gpointer user_data)
+static gssize
+on_gnupg_process_output_data (gconstpointer buffer,
+                              gsize count,
+                              GCancellable *cancellable,
+                              gpointer user_data,
+                              GError **error)
 {
        GSimpleAsyncResult *res = G_SIMPLE_ASYNC_RESULT (user_data);
        GcrGnupgCollectionLoad *load = g_simple_async_result_get_op_res_gpointer (res);
 
-       g_string_append_len (load->out_data, (gchar*)buffer->data, buffer->len);
+       g_string_append_len (load->out_data, buffer, count);
        _gcr_util_parse_lines (load->out_data, FALSE, on_line_parse_output, load);
+       return count;
 }
 
 static void
@@ -550,22 +553,24 @@ on_gnupg_process_status_record (GcrGnupgProcess *process, GcrRecord *record,
        process_outstanding_attributes (load);
 }
 
-static void
-on_gnupg_process_attribute_data (GcrGnupgProcess *process, GByteArray *buffer,
-                                 gpointer user_data)
+static gssize
+on_gnupg_process_attribute_data (gconstpointer buffer,
+                                 gsize count,
+                                 GCancellable *cancellable,
+                                 gpointer user_data,
+                                 GError **error)
 {
        GSimpleAsyncResult *res = G_SIMPLE_ASYNC_RESULT (user_data);
        GcrGnupgCollectionLoad *load = g_simple_async_result_get_op_res_gpointer (res);
 
        /* If we don't have a buffer, just claim this one */
        if (!load->attribute_buf)
-               load->attribute_buf = g_byte_array_ref (buffer);
+               load->attribute_buf = g_byte_array_new ();
 
-       /* If we have data remaining over, add it to our buffer */
-       else
-               g_byte_array_append (load->attribute_buf, buffer->data, buffer->len);
+       g_byte_array_append (load->attribute_buf, buffer, count);
 
        process_outstanding_attributes (load);
+       return count;
 }
 
 static void
@@ -695,11 +700,14 @@ _gcr_gnupg_collection_load_async (GcrGnupgCollection *self, GCancellable *cancel
        load->collection = g_object_ref (self);
        load->cancel = cancellable ? g_object_ref (cancellable) : cancellable;
 
+       load->output = _gcr_callback_output_stream_new (on_gnupg_process_output_data, res, NULL);
+       load->outattr = _gcr_callback_output_stream_new (on_gnupg_process_attribute_data, res, NULL);
+
        load->process = _gcr_gnupg_process_new (self->pv->directory, NULL);
-       load->output_sig = g_signal_connect (load->process, "output-data", G_CALLBACK (on_gnupg_process_output_data), res);
+       _gcr_gnupg_process_set_output_stream (load->process, load->output);
+       _gcr_gnupg_process_set_attribute_stream (load->process, load->outattr);
        load->error_sig = g_signal_connect (load->process, "error-line", G_CALLBACK (on_gnupg_process_error_line), res);
        load->status_sig = g_signal_connect (load->process, "status-record", G_CALLBACK (on_gnupg_process_status_record), res);
-       load->attribute_sig = g_signal_connect (load->process, "attribute-data", G_CALLBACK (on_gnupg_process_attribute_data), res);
 
        /*
         * Track all the keys we currently have, at end remove those that
index b4a4637..ed4592a 100644 (file)
 enum {
        PROP_0,
        PROP_DIRECTORY,
-       PROP_EXECUTABLE
+       PROP_EXECUTABLE,
+       PROP_INPUT_STREAM,
+       PROP_OUTPUT_STREAM,
+       PROP_ATTRIBUTE_STREAM
 };
 
 enum {
@@ -62,10 +65,8 @@ enum {
 };
 
 enum {
-       OUTPUT_DATA,
        ERROR_LINE,
        STATUS_RECORD,
-       ATTRIBUTE_DATA,
        NUM_SIGNALS
 };
 
@@ -77,6 +78,7 @@ typedef struct _GnupgSource {
 
        GcrGnupgProcess *process;       /* Pointer back to the process object */
 
+       GByteArray *input_buf;
        GString *error_buf;
        GString *status_buf;
 
@@ -91,6 +93,10 @@ struct _GcrGnupgProcessPrivate {
        gchar *directory;
        gchar *executable;
 
+       GInputStream *input;
+       GOutputStream *output;
+       GOutputStream *attributes;
+
        gboolean running;
        gboolean complete;
        GError *error;
@@ -139,6 +145,15 @@ _gcr_gnupg_process_get_property (GObject *obj, guint prop_id, GValue *value,
        case PROP_EXECUTABLE:
                g_value_set_string (value, self->pv->executable);
                break;
+       case PROP_INPUT_STREAM:
+               g_value_set_object (value, _gcr_gnupg_process_get_input_stream (self));
+               break;
+       case PROP_OUTPUT_STREAM:
+               g_value_set_object (value, _gcr_gnupg_process_get_output_stream (self));
+               break;
+       case PROP_ATTRIBUTE_STREAM:
+               g_value_set_object (value, _gcr_gnupg_process_get_attribute_stream (self));
+               break;
        default:
                G_OBJECT_WARN_INVALID_PROPERTY_ID (obj, prop_id, pspec);
                break;
@@ -160,6 +175,15 @@ _gcr_gnupg_process_set_property (GObject *obj, guint prop_id, const GValue *valu
                g_return_if_fail (!self->pv->executable);
                self->pv->executable = g_value_dup_string (value);
                break;
+       case PROP_INPUT_STREAM:
+               _gcr_gnupg_process_set_input_stream (self, g_value_get_object (value));
+               break;
+       case PROP_OUTPUT_STREAM:
+               _gcr_gnupg_process_set_output_stream (self, g_value_get_object (value));
+               break;
+       case PROP_ATTRIBUTE_STREAM:
+               _gcr_gnupg_process_set_attribute_stream (self, g_value_get_object (value));
+               break;
        default:
                G_OBJECT_WARN_INVALID_PROPERTY_ID (obj, prop_id, pspec);
                break;
@@ -167,6 +191,18 @@ _gcr_gnupg_process_set_property (GObject *obj, guint prop_id, const GValue *valu
 }
 
 static void
+_gcr_gnupg_process_dispose (GObject *obj)
+{
+       GcrGnupgProcess *self = GCR_GNUPG_PROCESS (obj);
+
+       g_clear_object (&self->pv->input);
+       g_clear_object (&self->pv->output);
+       g_clear_object (&self->pv->attributes);
+
+       G_OBJECT_CLASS (_gcr_gnupg_process_parent_class)->dispose (obj);
+}
+
+static void
 _gcr_gnupg_process_finalize (GObject *obj)
 {
        GcrGnupgProcess *self = GCR_GNUPG_PROCESS (obj);
@@ -187,6 +223,7 @@ _gcr_gnupg_process_class_init (GcrGnupgProcessClass *klass)
        gobject_class->constructed = _gcr_gnupg_process_constructed;
        gobject_class->get_property = _gcr_gnupg_process_get_property;
        gobject_class->set_property = _gcr_gnupg_process_set_property;
+       gobject_class->dispose = _gcr_gnupg_process_dispose;
        gobject_class->finalize = _gcr_gnupg_process_finalize;
 
        /**
@@ -209,17 +246,31 @@ _gcr_gnupg_process_class_init (GcrGnupgProcessClass *klass)
                                        GPG_EXECUTABLE, G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
 
        /**
-        * GcrGnupgProcess::output-data:
-        * @data: a #GByteArray of output data.
+        * GcrGnupgProcess:input-stream:
         *
-        * Signal emitted when normal output data is available from the gnupg
-        * process. The data does not necessarily come on line boundaries, and
-        * won't be null-terminated.
+        * Input for gnupg, or %NULL for no input.
         */
-       signals[OUTPUT_DATA] = g_signal_new ("output-data", GCR_TYPE_GNUPG_PROCESS,
-                  G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GcrGnupgProcessClass, output_data),
-                  NULL, NULL, _gcr_marshal_VOID__BOXED,
-                  G_TYPE_NONE, 1, G_TYPE_BYTE_ARRAY);
+       g_object_class_install_property (gobject_class, PROP_INPUT_STREAM,
+                  g_param_spec_object ("input-stream", "Input Stream", "Input Stream",
+                                       G_TYPE_INPUT_STREAM, G_PARAM_READWRITE));
+
+       /**
+        * GcrGnupgProcess:output-stream:
+        *
+        * Output from gnupg, or %NULL for ignored output.
+        */
+       g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM,
+                  g_param_spec_object ("output-stream", "Output Stream", "Output Stream",
+                                       G_TYPE_OUTPUT_STREAM, G_PARAM_READWRITE));
+
+       /**
+        * GcrGnupgProcess:attribute-stream:
+        *
+        * Output of attribute data from gnupg, or %NULL for ignored attributes.
+        */
+       g_object_class_install_property (gobject_class, PROP_ATTRIBUTE_STREAM,
+                  g_param_spec_object ("attribute-stream", "Attribute Stream", "Attribute Stream",
+                                       G_TYPE_OUTPUT_STREAM, G_PARAM_READWRITE));
 
        /**
         * GcrGnupgProcess::error-line:
@@ -244,18 +295,6 @@ _gcr_gnupg_process_class_init (GcrGnupgProcessClass *klass)
                   NULL, NULL, _gcr_marshal_VOID__BOXED,
                   G_TYPE_NONE, 1, GCR_TYPE_RECORD);
 
-       /**
-        * GcrGnupgProcess::attribute-data:
-        * @data: a #GByteArray of attribute data.
-        *
-        * Signal emitted when attribute data is available from the gnupg
-        * process.
-        */
-       signals[ATTRIBUTE_DATA] = g_signal_new ("attribute-data", GCR_TYPE_GNUPG_PROCESS,
-                  G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GcrGnupgProcessClass, attribute_data),
-                  NULL, NULL, _gcr_marshal_VOID__BOXED,
-                  G_TYPE_NONE, 1, G_TYPE_BYTE_ARRAY);
-
        g_type_class_add_private (gobject_class, sizeof (GcrGnupgProcessPrivate));
 }
 
@@ -304,6 +343,72 @@ _gcr_gnupg_process_new (const gchar *directory, const gchar *executable)
                             NULL);
 }
 
+GInputStream *
+_gcr_gnupg_process_get_input_stream (GcrGnupgProcess *self)
+{
+       g_return_val_if_fail (GCR_GNUPG_PROCESS (self), NULL);
+       return self->pv->input;
+}
+
+void
+_gcr_gnupg_process_set_input_stream (GcrGnupgProcess *self,
+                                     GInputStream *input)
+{
+       g_return_if_fail (GCR_GNUPG_PROCESS (self));
+       g_return_if_fail (input == NULL || G_INPUT_STREAM (input));
+
+       if (input)
+               g_object_ref (input);
+       if (self->pv->input)
+               g_object_unref (self->pv->input);
+       self->pv->input = input;
+       g_object_notify (G_OBJECT (self), "input-stream");
+}
+
+GOutputStream *
+_gcr_gnupg_process_get_output_stream (GcrGnupgProcess *self)
+{
+       g_return_val_if_fail (GCR_GNUPG_PROCESS (self), NULL);
+       return self->pv->output;
+}
+
+void
+_gcr_gnupg_process_set_output_stream (GcrGnupgProcess *self,
+                                      GOutputStream *output)
+{
+       g_return_if_fail (GCR_GNUPG_PROCESS (self));
+       g_return_if_fail (output == NULL || G_OUTPUT_STREAM (output));
+
+       if (output)
+               g_object_ref (output);
+       if (self->pv->output)
+               g_object_unref (self->pv->output);
+       self->pv->output = output;
+       g_object_notify (G_OBJECT (self), "output-stream");
+}
+
+GOutputStream *
+_gcr_gnupg_process_get_attribute_stream (GcrGnupgProcess *self)
+{
+       g_return_val_if_fail (GCR_GNUPG_PROCESS (self), NULL);
+       return self->pv->attributes;
+}
+
+void
+_gcr_gnupg_process_set_attribute_stream (GcrGnupgProcess *self,
+                                         GOutputStream *output)
+{
+       g_return_if_fail (GCR_GNUPG_PROCESS (self));
+       g_return_if_fail (output == NULL || G_OUTPUT_STREAM (output));
+
+       if (output)
+               g_object_ref (output);
+       if (self->pv->attributes)
+               g_object_unref (self->pv->attributes);
+       self->pv->attributes = output;
+       g_object_notify (G_OBJECT (self), "attribute-stream");
+}
+
 static void
 run_async_ready_callback (GcrGnupgProcess *self)
 {
@@ -452,6 +557,8 @@ on_gnupg_source_finalize (GSource *source)
                close_fd (&gnupg_source->polls[i].fd);
 
        g_object_unref (gnupg_source->process);
+       if (gnupg_source->input_buf)
+               g_byte_array_free (gnupg_source->input_buf, TRUE);
        g_string_free (gnupg_source->error_buf, TRUE);
        g_string_free (gnupg_source->status_buf, TRUE);
 
@@ -481,6 +588,26 @@ read_output (int fd, GByteArray *buffer)
        return TRUE;
 }
 
+static gboolean
+write_input (int fd, GByteArray *buffer)
+{
+       gssize result;
+
+       g_return_val_if_fail (fd >= 0, FALSE);
+
+       for (;;) {
+               result = write (fd, buffer->data, buffer->len);
+               if (result < 0) {
+                       if (errno == EINTR || errno == EAGAIN)
+                               continue;
+                       return FALSE;
+               } else {
+                       g_byte_array_remove_range (buffer, 0, result);
+                       return TRUE;
+               }
+       }
+}
+
 static void
 emit_status_for_each_line (const gchar *line, gpointer user_data)
 {
@@ -512,32 +639,150 @@ emit_error_for_each_line (const gchar *line, gpointer user_data)
 }
 
 static gboolean
+on_gnupg_source_input (GcrGnupgProcess *self,
+                       GnupgSource *gnupg_source,
+                       gint fd)
+{
+       gssize read;
+
+       if (gnupg_source->input_buf == NULL ||
+           gnupg_source->input_buf->len == 0) {
+               if (self->pv->input == NULL)
+                       return FALSE;
+               if (!gnupg_source->input_buf)
+                       gnupg_source->input_buf = g_byte_array_new ();
+               g_byte_array_set_size (gnupg_source->input_buf, 4096);
+               read = g_input_stream_read (self->pv->input,
+                                           gnupg_source->input_buf->data,
+                                           gnupg_source->input_buf->len,
+                                           gnupg_source->cancellable, NULL);
+               g_byte_array_set_size (gnupg_source->input_buf, read < 0 ? 0 : read);
+               if (read < 0)
+                       return FALSE;
+               if (read == 0)
+                       return FALSE;
+       }
+
+       if (!write_input (fd, gnupg_source->input_buf)) {
+               g_warning ("couldn't write output data to gnupg process");
+               return FALSE;
+       }
+
+       return TRUE;
+}
+
+static gboolean
+on_gnupg_source_status (GcrGnupgProcess *self,
+                        GnupgSource *gnupg_source,
+                        gint fd)
+{
+       GByteArray *buffer = g_byte_array_new ();
+       gboolean result = TRUE;
+
+       if (!read_output (fd, buffer)) {
+               g_warning ("couldn't read status data from gnupg process");
+               result = FALSE;
+       } else {
+               g_string_append_len (gnupg_source->status_buf, (gchar*)buffer->data, buffer->len);
+               _gcr_util_parse_lines (gnupg_source->status_buf, buffer->len == 0,
+                                      emit_status_for_each_line, self);
+       }
+
+       g_byte_array_unref (buffer);
+       return result;
+}
+
+static gboolean
+on_gnupg_source_attribute (GcrGnupgProcess *self,
+                           GnupgSource *gnupg_source,
+                           gint fd)
+{
+       GByteArray *buffer = g_byte_array_new ();
+       gboolean result = TRUE;
+
+       if (!read_output (fd, buffer)) {
+               g_warning ("couldn't read attribute data from gnupg process");
+               result = FALSE;
+       } else if (buffer->len > 0) {
+               _gcr_debug ("received %d bytes of attribute data", (gint)buffer->len);
+               if (self->pv->attributes != NULL)
+                       g_output_stream_write_all (self->pv->attributes, buffer->data,
+                                                  buffer->len, NULL,
+                                                  gnupg_source->cancellable, NULL);
+       }
+
+       g_byte_array_unref (buffer);
+       return result;
+}
+
+static gboolean
+on_gnupg_source_output (GcrGnupgProcess *self,
+                        GnupgSource *gnupg_source,
+                        gint fd)
+{
+       GByteArray *buffer = g_byte_array_new ();
+       gboolean result = TRUE;
+
+       if (!read_output (fd, buffer)) {
+               g_warning ("couldn't read output data from gnupg process");
+               result = FALSE;
+       } else if (buffer->len > 0) {
+               _gcr_debug ("received %d bytes of output data", (gint)buffer->len);
+               if (self->pv->output != NULL)
+                       g_output_stream_write_all (self->pv->output, buffer->data, buffer->len,
+                                                  NULL, gnupg_source->cancellable, NULL);
+       }
+
+       g_byte_array_unref (buffer);
+       return result;
+}
+
+static gboolean
+on_gnupg_source_error (GcrGnupgProcess *self,
+                       GnupgSource *gnupg_source,
+                       gint fd,
+                       gboolean last)
+{
+       GByteArray *buffer = g_byte_array_new ();
+       gboolean result = TRUE;
+
+       if (!read_output (fd, buffer)) {
+               g_warning ("couldn't read error data from gnupg process");
+               result = FALSE;
+       } else {
+               g_string_append_len (gnupg_source->error_buf, (gchar*)buffer->data, buffer->len);
+               _gcr_util_parse_lines (gnupg_source->error_buf, last,
+                                      emit_error_for_each_line, gnupg_source->process);
+       }
+
+       g_byte_array_unref (buffer);
+       return result;
+}
+
+static gboolean
 on_gnupg_source_dispatch (GSource *source, GSourceFunc unused, gpointer user_data)
 {
        GnupgSource *gnupg_source = (GnupgSource*)source;
-       GByteArray *buffer;
+       GcrGnupgProcess *self = gnupg_source->process;
        GPollFD *poll;
 
        /* Standard input, no support yet */
        poll = &gnupg_source->polls[FD_INPUT];
-       if (poll->fd >= 0 && poll->revents != 0) {
-               close_poll (source, poll);
+       if (poll->fd >= 0) {
+               if (poll->revents & G_IO_OUT)
+                       if (!on_gnupg_source_input (self, gnupg_source, poll->fd))
+                               poll->revents |= G_IO_HUP;
+               if (poll->revents & G_IO_HUP)
+                       close_poll (source, poll);
+               poll->revents = 0;
        }
 
        /* Status output */
        poll = &gnupg_source->polls[FD_STATUS];
        if (poll->fd >= 0) {
-               if (poll->revents & G_IO_IN) {
-                       buffer = g_byte_array_new ();
-                       if (!read_output (poll->fd, buffer)) {
-                               g_warning ("couldn't read status data from gnupg process");
-                       } else {
-                               g_string_append_len (gnupg_source->status_buf, (gchar*)buffer->data, buffer->len);
-                               _gcr_util_parse_lines (gnupg_source->status_buf, buffer->len == 0,
-                                                      emit_status_for_each_line, gnupg_source->process);
-                       }
-                       g_byte_array_unref (buffer);
-               }
+               if (poll->revents & G_IO_IN)
+                       if (!on_gnupg_source_status (self, gnupg_source, poll->fd))
+                               poll->revents |= G_IO_HUP;
                if (poll->revents & G_IO_HUP)
                        close_poll (source, poll);
                poll->revents = 0;
@@ -546,16 +791,9 @@ on_gnupg_source_dispatch (GSource *source, GSourceFunc unused, gpointer user_dat
        /* Attribute output */
        poll = &gnupg_source->polls[FD_ATTRIBUTE];
        if (poll->fd >= 0) {
-               if (poll->revents & G_IO_IN) {
-                       buffer = g_byte_array_new ();
-                       if (!read_output (poll->fd, buffer)) {
-                               g_warning ("couldn't read attribute data from gnupg process");
-                       } else if (buffer->len > 0) {
-                               _gcr_debug ("received %d bytes of attribute data", (gint)buffer->len);
-                               g_signal_emit (gnupg_source->process, signals[ATTRIBUTE_DATA], 0, buffer);
-                       }
-                       g_byte_array_unref (buffer);
-               }
+               if (poll->revents & G_IO_IN)
+                       if (!on_gnupg_source_attribute (self, gnupg_source, poll->fd))
+                               poll->revents |= G_IO_HUP;
                if (poll->revents & G_IO_HUP)
                        close_poll (source, poll);
                poll->revents = 0;
@@ -564,16 +802,9 @@ on_gnupg_source_dispatch (GSource *source, GSourceFunc unused, gpointer user_dat
        /* Standard output */
        poll = &gnupg_source->polls[FD_OUTPUT];
        if (poll->fd >= 0) {
-               if (poll->revents & G_IO_IN) {
-                       buffer = g_byte_array_new ();
-                       if (!read_output (poll->fd, buffer)) {
-                               g_warning ("couldn't read output data from gnupg process");
-                       } else if (buffer->len > 0) {
-                               _gcr_debug ("received %d bytes of output data", (gint)buffer->len);
-                               g_signal_emit (gnupg_source->process, signals[OUTPUT_DATA], 0, buffer);
-                       }
-                       g_byte_array_unref (buffer);
-               }
+               if (poll->revents & G_IO_IN)
+                       if (!on_gnupg_source_output (self, gnupg_source, poll->fd))
+                               poll->revents |= G_IO_HUP;
                if (poll->revents & G_IO_HUP)
                        close_poll (source, poll);
                poll->revents = 0;
@@ -582,17 +813,10 @@ on_gnupg_source_dispatch (GSource *source, GSourceFunc unused, gpointer user_dat
        /* Standard error */
        poll = &gnupg_source->polls[FD_ERROR];
        if (poll->fd >= 0) {
-               if (poll->revents & G_IO_IN) {
-                       buffer = g_byte_array_new ();
-                       if (!read_output (poll->fd, buffer)) {
-                               g_warning ("couldn't read error data from gnupg process");
-                       } else {
-                               g_string_append_len (gnupg_source->error_buf, (gchar*)buffer->data, buffer->len);
-                               _gcr_util_parse_lines (gnupg_source->error_buf, (poll->revents & G_IO_HUP) ? TRUE : FALSE,
-                                                      emit_error_for_each_line, gnupg_source->process);
-                       }
-                       g_byte_array_unref (buffer);
-               }
+               if (poll->revents & G_IO_IN)
+                       if (!on_gnupg_source_error (self, gnupg_source, poll->fd,
+                                                   (poll->revents & G_IO_HUP) ? TRUE : FALSE))
+                               poll->revents |= G_IO_HUP;
                if (poll->revents & G_IO_HUP)
                        close_poll (source, poll);
                poll->revents = 0;
@@ -731,6 +955,7 @@ _gcr_gnupg_process_run_async (GcrGnupgProcess *self, const gchar **argv, const g
        int attribute_fds[2] = { -1, -1 };
        int output_fd = -1;
        int error_fd = -1;
+       int input_fd = -1;
        GnupgSource *gnupg_source;
        GSource *source;
        GPid pid;
@@ -756,6 +981,7 @@ _gcr_gnupg_process_run_async (GcrGnupgProcess *self, const gchar **argv, const g
        g_ptr_array_add (args, g_strdup (self->pv->executable));
 
        /* Spawn/child will close all other attributes, besides thesthose in child_fds */
+       child_fds[FD_INPUT] = 0;
        child_fds[FD_OUTPUT] = 1;
        child_fds[FD_ERROR] = 2;
 
@@ -805,8 +1031,8 @@ _gcr_gnupg_process_run_async (GcrGnupgProcess *self, const gchar **argv, const g
 
        g_spawn_async_with_pipes (self->pv->directory, (gchar**)args->pdata,
                                  (gchar**)envs->pdata, G_SPAWN_DO_NOT_REAP_CHILD,
-                                 on_gnupg_process_child_setup,
-                                 child_fds, &pid, NULL, &output_fd, &error_fd, &error);
+                                 on_gnupg_process_child_setup, child_fds,
+                                 &pid, &input_fd, &output_fd, &error_fd, &error);
 
        g_ptr_array_free (args, TRUE);
        g_ptr_array_free (envs, TRUE);
@@ -841,6 +1067,11 @@ _gcr_gnupg_process_run_async (GcrGnupgProcess *self, const gchar **argv, const g
        gnupg_source->process = g_object_ref (self);
        gnupg_source->child_pid = pid;
 
+       gnupg_source->polls[FD_INPUT].fd = input_fd;
+       if (input_fd >= 0) {
+               gnupg_source->polls[FD_INPUT].events = G_IO_HUP | G_IO_OUT;
+               g_source_add_poll (source, &gnupg_source->polls[FD_INPUT]);
+       }
        gnupg_source->polls[FD_OUTPUT].fd = output_fd;
        if (output_fd >= 0) {
                gnupg_source->polls[FD_OUTPUT].events = G_IO_HUP | G_IO_IN;
index 143f36d..8eabfeb 100644 (file)
@@ -51,13 +51,9 @@ struct _GcrGnupgProcessClass {
        GObjectClass parent_class;
 
        /* signals */
-       gboolean (*output_data) (GcrGnupgProcess *self, GByteArray *output);
-
        gboolean (*error_line) (GcrGnupgProcess *self, const gchar *line);
 
        gboolean (*status_record) (GcrGnupgProcess *self, GcrRecord *record);
-
-       gboolean (*attribute_data) (GcrGnupgProcess *self, GByteArray *output);
 };
 
 typedef enum {
@@ -72,6 +68,21 @@ GType               _gcr_gnupg_process_get_type                (void) G_GNUC_CON
 GcrGnupgProcess*    _gcr_gnupg_process_new                     (const gchar *directory,
                                                                 const gchar *executable);
 
+GInputStream *      _gcr_gnupg_process_get_input_stream        (GcrGnupgProcess *self);
+
+void                _gcr_gnupg_process_set_input_stream        (GcrGnupgProcess *self,
+                                                                GInputStream *input);
+
+GOutputStream *     _gcr_gnupg_process_get_output_stream       (GcrGnupgProcess *self);
+
+void                _gcr_gnupg_process_set_output_stream       (GcrGnupgProcess *self,
+                                                                GOutputStream *output);
+
+GOutputStream *     _gcr_gnupg_process_get_attribute_stream    (GcrGnupgProcess *self);
+
+void                _gcr_gnupg_process_set_attribute_stream    (GcrGnupgProcess *self,
+                                                                GOutputStream *output);
+
 void                _gcr_gnupg_process_run_async               (GcrGnupgProcess *self,
                                                                 const gchar **argv,
                                                                 const gchar **envp,
index 272c76e..5282a2f 100755 (executable)
@@ -12,7 +12,7 @@ while getopts '1:2:' arg; do
                echo $OPTARG
                ;;
        *)
-               invalid argument: $arg
+               echo "invalid argument: $arg"
                exit 2
                ;;
        esac
diff --git a/gcr/tests/files/gnupg-mock/mock-echo b/gcr/tests/files/gnupg-mock/mock-echo
new file mode 100755 (executable)
index 0000000..7c4e251
--- /dev/null
@@ -0,0 +1,8 @@
+#!/bin/sh
+
+# This script is used with test-gnupg-process
+set -euf
+
+while read line; do
+       echo "$line"
+done
index 318b9a4..740b232 100644 (file)
@@ -23,6 +23,7 @@
 #include "config.h"
 
 #include "gcr/gcr-base.h"
+#include "gcr/gcr-callback-output-stream.h"
 #include "gcr/gcr-gnupg-process.h"
 
 #include "egg/egg-testing.h"
@@ -32,6 +33,8 @@
 #include <errno.h>
 #include <string.h>
 
+#define WAIT 50000
+
 typedef struct {
        GcrGnupgProcess *process;
        GAsyncResult *result;
@@ -104,26 +107,28 @@ build_script_path (const gchar *name)
        return path;
 }
 
-static void
-on_process_output_data (GcrGnupgProcess *process, GByteArray *buffer, gpointer user_data)
+static gssize
+on_process_output_data (gconstpointer buffer,
+                        gsize count,
+                        GCancellable *cancellable,
+                        gpointer user_data,
+                        GError **error)
 {
        Test *test = user_data;
-
-       g_assert (process == test->process);
-       g_assert (buffer);
-
-       g_string_append_len (test->output_buf, (gchar*)buffer->data, buffer->len);
+       g_string_append_len (test->output_buf, buffer, count);
+       return count;
 }
 
-static void
-on_process_attribute_data (GcrGnupgProcess *process, GByteArray *buffer, gpointer user_data)
+static gssize
+on_process_attribute_data (gconstpointer buffer,
+                           gsize count,
+                           GCancellable *cancellable,
+                           gpointer user_data,
+                           GError **error)
 {
        Test *test = user_data;
-
-       g_assert (process == test->process);
-       g_assert (buffer);
-
-       g_string_append_len (test->attribute_buf, (gchar*)buffer->data, buffer->len);
+       g_string_append_len (test->attribute_buf, buffer, count);
+       return count;
 }
 
 static void
@@ -154,6 +159,7 @@ static void
 test_run_simple_output (Test *test, gconstpointer unused)
 {
        const gchar *argv[] = { NULL };
+       GOutputStream *output;
        GError *error = NULL;
        gboolean ret;
        gchar *script;
@@ -162,10 +168,12 @@ test_run_simple_output (Test *test, gconstpointer unused)
        test->process = _gcr_gnupg_process_new (NULL, script);
        g_free (script);
 
-       g_signal_connect (test->process, "output-data", G_CALLBACK (on_process_output_data), test);
+       output = _gcr_callback_output_stream_new (on_process_output_data, test, NULL);
+       _gcr_gnupg_process_set_output_stream (test->process, output);
+       g_object_unref (output);
 
        _gcr_gnupg_process_run_async (test->process, argv, NULL, 0, NULL, on_async_ready, test);
-       egg_test_wait_until (500);
+       egg_test_wait_until (WAIT);
 
        g_assert (test->result);
        ret = _gcr_gnupg_process_run_finish (test->process, test->result, &error);
@@ -193,7 +201,7 @@ test_run_simple_error (Test *test, gconstpointer unused)
        g_signal_connect (test->process, "error-line", G_CALLBACK (on_process_error_line), test);
 
        _gcr_gnupg_process_run_async (test->process, argv, NULL, 0, NULL, on_async_ready, test);
-       egg_test_wait_until (500);
+       egg_test_wait_until (WAIT);
 
        g_assert (test->result);
        ret = _gcr_gnupg_process_run_finish (test->process, test->result, &error);
@@ -210,6 +218,7 @@ static void
 test_run_status_and_output (Test *test, gconstpointer unused)
 {
        const gchar *argv[] = { NULL };
+       GOutputStream *output;
        GError *error = NULL;
        gchar *script;
        gboolean ret;
@@ -218,12 +227,15 @@ test_run_status_and_output (Test *test, gconstpointer unused)
        test->process = _gcr_gnupg_process_new (NULL, script);
        g_free (script);
 
-       g_signal_connect (test->process, "output-data", G_CALLBACK (on_process_output_data), test);
+       output = _gcr_callback_output_stream_new (on_process_output_data, test, NULL);
+       _gcr_gnupg_process_set_output_stream (test->process, output);
+       g_object_unref (output);
+
        g_signal_connect (test->process, "status-record", G_CALLBACK (on_process_status_record), test);
 
        _gcr_gnupg_process_run_async (test->process, argv, NULL, GCR_GNUPG_PROCESS_WITH_STATUS,
                                      NULL, on_async_ready, test);
-       egg_test_wait_until (500);
+       egg_test_wait_until (WAIT);
 
        g_assert (test->result);
        ret = _gcr_gnupg_process_run_finish (test->process, test->result, &error);
@@ -247,6 +259,7 @@ static void
 test_run_status_and_attribute (Test *test, gconstpointer unused)
 {
        const gchar *argv[] = { NULL };
+       GOutputStream *output;
        GError *error = NULL;
        gchar *script;
        gboolean ret;
@@ -255,13 +268,16 @@ test_run_status_and_attribute (Test *test, gconstpointer unused)
        test->process = _gcr_gnupg_process_new (NULL, script);
        g_free (script);
 
-       g_signal_connect (test->process, "attribute-data", G_CALLBACK (on_process_attribute_data), test);
+       output = _gcr_callback_output_stream_new (on_process_attribute_data, test, NULL);
+       _gcr_gnupg_process_set_attribute_stream (test->process, output);
+       g_object_unref (output);
+
        g_signal_connect (test->process, "status-record", G_CALLBACK (on_process_status_record), test);
 
        _gcr_gnupg_process_run_async (test->process, argv, NULL,
                                      GCR_GNUPG_PROCESS_WITH_STATUS | GCR_GNUPG_PROCESS_WITH_ATTRIBUTES,
                                      NULL, on_async_ready, test);
-       egg_test_wait_until (500);
+       egg_test_wait_until (WAIT);
 
        g_assert (test->result);
        ret = _gcr_gnupg_process_run_finish (test->process, test->result, &error);
@@ -286,6 +302,7 @@ static void
 test_run_arguments_and_environment (Test *test, gconstpointer unused)
 {
        GError *error = NULL;
+       GOutputStream *output;
        gchar *script;
        gboolean ret;
 
@@ -305,15 +322,21 @@ test_run_arguments_and_environment (Test *test, gconstpointer unused)
        test->process = _gcr_gnupg_process_new (NULL, script);
        g_free (script);
 
-       g_signal_connect (test->process, "output-data", G_CALLBACK (on_process_output_data), test);
+       output = _gcr_callback_output_stream_new (on_process_output_data, test, NULL);
+       _gcr_gnupg_process_set_output_stream (test->process, output);
+       g_object_unref (output);
+
        g_signal_connect (test->process, "error-line", G_CALLBACK (on_process_error_line), test);
 
        _gcr_gnupg_process_run_async (test->process, argv, envp, 0, NULL, on_async_ready, test);
-       egg_test_wait_until (500);
+       egg_test_wait_until (WAIT);
 
        g_assert (test->result);
        ret = _gcr_gnupg_process_run_finish (test->process, test->result, &error);
-       g_assert_no_error (error);
+       if (error) {
+               g_printerr ("%s\n", test->error_buf->str);
+               g_assert_no_error (error);
+       }
        g_assert (ret == TRUE);
 
        g_assert_cmpstr ("value1\nvalue2\n", ==, test->output_buf->str);
@@ -327,6 +350,7 @@ static void
 test_run_with_homedir (Test *test, gconstpointer unused)
 {
        const gchar *argv[] = { NULL };
+       GOutputStream *output;
        GError *error = NULL;
        gchar *script;
        gchar *check;
@@ -336,10 +360,12 @@ test_run_with_homedir (Test *test, gconstpointer unused)
        test->process = _gcr_gnupg_process_new (SRCDIR, script);
        g_free (script);
 
-       g_signal_connect (test->process, "output-data", G_CALLBACK (on_process_output_data), test);
+       output = _gcr_callback_output_stream_new (on_process_output_data, test, NULL);
+       _gcr_gnupg_process_set_output_stream (test->process, output);
+       g_object_unref (output);
 
        _gcr_gnupg_process_run_async (test->process, argv, NULL, 0, NULL, on_async_ready, test);
-       egg_test_wait_until (500);
+       egg_test_wait_until (WAIT);
 
        g_assert (test->result);
        ret = _gcr_gnupg_process_run_finish (test->process, test->result, &error);
@@ -355,6 +381,49 @@ test_run_with_homedir (Test *test, gconstpointer unused)
 }
 
 static void
+test_run_with_input_and_output (Test *test,
+                                gconstpointer unused)
+{
+       const gchar *argv[] = { NULL };
+       const gchar *data = "one\ntwenty two\nthree\nfourty four\n";
+       GInputStream *input;
+       GOutputStream *output;
+       GError *error = NULL;
+       GString *string;
+       gchar *script;
+       gboolean ret;
+
+       script = build_script_path ("mock-echo");
+       test->process = _gcr_gnupg_process_new (SRCDIR, script);
+       g_free (script);
+
+       input = g_memory_input_stream_new_from_data ((gpointer)data, -1, NULL);
+       output = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+
+       _gcr_gnupg_process_set_input_stream (test->process, input);
+       _gcr_gnupg_process_set_output_stream (test->process, output);
+
+       _gcr_gnupg_process_run_async (test->process, argv, NULL, 0, NULL, on_async_ready, test);
+       egg_test_wait_until (WAIT);
+
+       g_assert (test->result);
+       ret = _gcr_gnupg_process_run_finish (test->process, test->result, &error);
+       g_assert_no_error (error);
+       g_assert (ret == TRUE);
+
+       string = g_string_new_len (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (output)),
+                                  g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (output)));
+       g_assert_cmpstr (data, ==, string->str);
+       g_string_free (string, TRUE);
+
+       g_clear_object (&input);
+       g_clear_object (&output);
+       g_clear_object (&test->result);
+       g_clear_object (&test->process);
+
+}
+
+static void
 test_run_bad_executable (Test *test, gconstpointer unused)
 {
        GError *error = NULL;
@@ -367,7 +436,7 @@ test_run_bad_executable (Test *test, gconstpointer unused)
        g_free (script);
 
        _gcr_gnupg_process_run_async (test->process, argv, NULL, 0, NULL, on_async_ready, test);
-       egg_test_wait_until (500);
+       egg_test_wait_until (WAIT);
 
        g_assert (test->result);
        ret = _gcr_gnupg_process_run_finish (test->process, test->result, &error);
@@ -392,7 +461,7 @@ test_run_fail_exit (Test *test, gconstpointer unused)
        g_free (script);
 
        _gcr_gnupg_process_run_async (test->process, argv, NULL, 0, NULL, on_async_ready, test);
-       egg_test_wait_until (500);
+       egg_test_wait_until (WAIT);
 
        g_assert (test->result);
        ret = _gcr_gnupg_process_run_finish (test->process, test->result, &error);
@@ -418,7 +487,7 @@ test_run_fail_signal (Test *test, gconstpointer unused)
        g_free (script);
 
        _gcr_gnupg_process_run_async (test->process, argv, NULL, 0, NULL, on_async_ready, test);
-       egg_test_wait_until (500);
+       egg_test_wait_until (WAIT);
 
        g_assert (test->result);
        ret = _gcr_gnupg_process_run_finish (test->process, test->result, &error);
@@ -448,7 +517,7 @@ test_run_and_cancel (Test *test, gconstpointer unused)
 
        _gcr_gnupg_process_run_async (test->process, argv, NULL, 0, cancellable, on_async_ready, test);
        g_cancellable_cancel (cancellable);
-       egg_test_wait_until (500);
+       egg_test_wait_until (WAIT);
 
        g_assert (test->result);
        ret = _gcr_gnupg_process_run_finish (test->process, test->result, &error);
@@ -461,17 +530,23 @@ test_run_and_cancel (Test *test, gconstpointer unused)
        g_clear_object (&test->process);
 }
 
-static void
-on_process_output_cancel (GcrGnupgProcess *process, GByteArray *buffer, gpointer user_data)
+static gssize
+on_process_output_cancel (gconstpointer buffer,
+                          gsize count,
+                          GCancellable *cancellable,
+                          gpointer user_data,
+                          GError **error)
 {
-       GCancellable *cancellable = G_CANCELLABLE (user_data);
        g_cancellable_cancel (cancellable);
+       g_cancellable_set_error_if_cancelled (cancellable, error);
+       return -1;
 }
 
 static void
 test_run_and_cancel_later (Test *test, gconstpointer unused)
 {
        GError *error = NULL;
+       GOutputStream *output;
        gchar *script;
        const gchar *argv[] = { "15" };
        GCancellable *cancellable;
@@ -481,11 +556,13 @@ test_run_and_cancel_later (Test *test, gconstpointer unused)
 
        script = build_script_path ("mock-simple-output");
        test->process = _gcr_gnupg_process_new (NULL, script);
-       g_signal_connect (test->process, "output-data", G_CALLBACK (on_process_output_cancel), cancellable);
+       output = _gcr_callback_output_stream_new (on_process_output_cancel, NULL, NULL);
+       _gcr_gnupg_process_set_output_stream (test->process, output);
+       g_object_unref (output);
        g_free (script);
 
        _gcr_gnupg_process_run_async (test->process, argv, NULL, 0, cancellable, on_async_ready, test);
-       egg_test_wait_until (500);
+       egg_test_wait_until (WAIT);
 
        g_assert (test->result);
        ret = _gcr_gnupg_process_run_finish (test->process, test->result, &error);
@@ -512,6 +589,7 @@ main (int argc, char **argv)
        g_test_add ("/gcr/gnupg-process/run_status_and_attribute", Test, NULL, setup, test_run_status_and_attribute, teardown);
        g_test_add ("/gcr/gnupg-process/run_arguments_and_environment", Test, NULL, setup, test_run_arguments_and_environment, teardown);
        g_test_add ("/gcr/gnupg-process/run_with_homedir", Test, NULL, setup, test_run_with_homedir, teardown);
+       g_test_add ("/gcr/gnupg-process/run_with_input_and_output", Test, NULL, setup, test_run_with_input_and_output, teardown);
        g_test_add ("/gcr/gnupg-process/run_bad_executable", Test, NULL, setup, test_run_bad_executable, teardown);
        g_test_add ("/gcr/gnupg-process/run_fail_exit", Test, NULL, setup, test_run_fail_exit, teardown);
        g_test_add ("/gcr/gnupg-process/run_fail_signal", Test, NULL, setup, test_run_fail_signal, teardown);