minimal build
[platform/upstream/gcr.git] / gcr / gcr-gnupg-process.c
index b4a4637..4677b46 100644 (file)
@@ -35,6 +35,7 @@
 #include <fcntl.h>
 #include <errno.h>
 #include <string.h>
+#include <unistd.h>
 
 /**
  * GcrGnupgProcessFlags:
 enum {
        PROP_0,
        PROP_DIRECTORY,
-       PROP_EXECUTABLE
+       PROP_EXECUTABLE,
+       PROP_INPUT_STREAM,
+       PROP_OUTPUT_STREAM,
+       PROP_ATTRIBUTE_STREAM
 };
 
 enum {
@@ -62,10 +66,8 @@ enum {
 };
 
 enum {
-       OUTPUT_DATA,
        ERROR_LINE,
        STATUS_RECORD,
-       ATTRIBUTE_DATA,
        NUM_SIGNALS
 };
 
@@ -77,6 +79,7 @@ typedef struct _GnupgSource {
 
        GcrGnupgProcess *process;       /* Pointer back to the process object */
 
+       GByteArray *input_buf;
        GString *error_buf;
        GString *status_buf;
 
@@ -91,10 +94,13 @@ struct _GcrGnupgProcessPrivate {
        gchar *directory;
        gchar *executable;
 
+       GInputStream *input;
+       GOutputStream *output;
+       GOutputStream *attributes;
+
        gboolean running;
        gboolean complete;
        GError *error;
-
        guint source_sig;
 
        GAsyncReadyCallback async_callback;
@@ -139,6 +145,15 @@ _gcr_gnupg_process_get_property (GObject *obj, guint prop_id, GValue *value,
        case PROP_EXECUTABLE:
                g_value_set_string (value, self->pv->executable);
                break;
+       case PROP_INPUT_STREAM:
+               g_value_set_object (value, _gcr_gnupg_process_get_input_stream (self));
+               break;
+       case PROP_OUTPUT_STREAM:
+               g_value_set_object (value, _gcr_gnupg_process_get_output_stream (self));
+               break;
+       case PROP_ATTRIBUTE_STREAM:
+               g_value_set_object (value, _gcr_gnupg_process_get_attribute_stream (self));
+               break;
        default:
                G_OBJECT_WARN_INVALID_PROPERTY_ID (obj, prop_id, pspec);
                break;
@@ -160,6 +175,15 @@ _gcr_gnupg_process_set_property (GObject *obj, guint prop_id, const GValue *valu
                g_return_if_fail (!self->pv->executable);
                self->pv->executable = g_value_dup_string (value);
                break;
+       case PROP_INPUT_STREAM:
+               _gcr_gnupg_process_set_input_stream (self, g_value_get_object (value));
+               break;
+       case PROP_OUTPUT_STREAM:
+               _gcr_gnupg_process_set_output_stream (self, g_value_get_object (value));
+               break;
+       case PROP_ATTRIBUTE_STREAM:
+               _gcr_gnupg_process_set_attribute_stream (self, g_value_get_object (value));
+               break;
        default:
                G_OBJECT_WARN_INVALID_PROPERTY_ID (obj, prop_id, pspec);
                break;
@@ -167,10 +191,23 @@ _gcr_gnupg_process_set_property (GObject *obj, guint prop_id, const GValue *valu
 }
 
 static void
+_gcr_gnupg_process_dispose (GObject *obj)
+{
+       GcrGnupgProcess *self = GCR_GNUPG_PROCESS (obj);
+
+       g_clear_object (&self->pv->input);
+       g_clear_object (&self->pv->output);
+       g_clear_object (&self->pv->attributes);
+
+       G_OBJECT_CLASS (_gcr_gnupg_process_parent_class)->dispose (obj);
+}
+
+static void
 _gcr_gnupg_process_finalize (GObject *obj)
 {
        GcrGnupgProcess *self = GCR_GNUPG_PROCESS (obj);
 
+       g_assert (self->pv->source_sig == 0);
        g_assert (!self->pv->running);
        g_free (self->pv->directory);
        g_free (self->pv->executable);
@@ -187,6 +224,7 @@ _gcr_gnupg_process_class_init (GcrGnupgProcessClass *klass)
        gobject_class->constructed = _gcr_gnupg_process_constructed;
        gobject_class->get_property = _gcr_gnupg_process_get_property;
        gobject_class->set_property = _gcr_gnupg_process_set_property;
+       gobject_class->dispose = _gcr_gnupg_process_dispose;
        gobject_class->finalize = _gcr_gnupg_process_finalize;
 
        /**
@@ -209,17 +247,31 @@ _gcr_gnupg_process_class_init (GcrGnupgProcessClass *klass)
                                        GPG_EXECUTABLE, G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
 
        /**
-        * GcrGnupgProcess::output-data:
-        * @data: a #GByteArray of output data.
+        * GcrGnupgProcess:input-stream:
         *
-        * Signal emitted when normal output data is available from the gnupg
-        * process. The data does not necessarily come on line boundaries, and
-        * won't be null-terminated.
+        * Input for gnupg, or %NULL for no input.
         */
-       signals[OUTPUT_DATA] = g_signal_new ("output-data", GCR_TYPE_GNUPG_PROCESS,
-                  G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GcrGnupgProcessClass, output_data),
-                  NULL, NULL, _gcr_marshal_VOID__BOXED,
-                  G_TYPE_NONE, 1, G_TYPE_BYTE_ARRAY);
+       g_object_class_install_property (gobject_class, PROP_INPUT_STREAM,
+                  g_param_spec_object ("input-stream", "Input Stream", "Input Stream",
+                                       G_TYPE_INPUT_STREAM, G_PARAM_READWRITE));
+
+       /**
+        * GcrGnupgProcess:output-stream:
+        *
+        * Output from gnupg, or %NULL for ignored output.
+        */
+       g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM,
+                  g_param_spec_object ("output-stream", "Output Stream", "Output Stream",
+                                       G_TYPE_OUTPUT_STREAM, G_PARAM_READWRITE));
+
+       /**
+        * GcrGnupgProcess:attribute-stream:
+        *
+        * Output of attribute data from gnupg, or %NULL for ignored attributes.
+        */
+       g_object_class_install_property (gobject_class, PROP_ATTRIBUTE_STREAM,
+                  g_param_spec_object ("attribute-stream", "Attribute Stream", "Attribute Stream",
+                                       G_TYPE_OUTPUT_STREAM, G_PARAM_READWRITE));
 
        /**
         * GcrGnupgProcess::error-line:
@@ -244,18 +296,6 @@ _gcr_gnupg_process_class_init (GcrGnupgProcessClass *klass)
                   NULL, NULL, _gcr_marshal_VOID__BOXED,
                   G_TYPE_NONE, 1, GCR_TYPE_RECORD);
 
-       /**
-        * GcrGnupgProcess::attribute-data:
-        * @data: a #GByteArray of attribute data.
-        *
-        * Signal emitted when attribute data is available from the gnupg
-        * process.
-        */
-       signals[ATTRIBUTE_DATA] = g_signal_new ("attribute-data", GCR_TYPE_GNUPG_PROCESS,
-                  G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GcrGnupgProcessClass, attribute_data),
-                  NULL, NULL, _gcr_marshal_VOID__BOXED,
-                  G_TYPE_NONE, 1, G_TYPE_BYTE_ARRAY);
-
        g_type_class_add_private (gobject_class, sizeof (GcrGnupgProcessPrivate));
 }
 
@@ -270,7 +310,7 @@ static GObject*
 _gcr_gnupg_process_get_source_object (GAsyncResult *result)
 {
        g_return_val_if_fail (GCR_IS_GNUPG_PROCESS (result), NULL);
-       return G_OBJECT (result);
+       return g_object_ref (result);
 }
 
 static void
@@ -304,6 +344,79 @@ _gcr_gnupg_process_new (const gchar *directory, const gchar *executable)
                             NULL);
 }
 
+const gchar *
+_gcr_gnupg_process_get_directory (GcrGnupgProcess *self)
+{
+       g_return_val_if_fail (GCR_GNUPG_PROCESS (self), NULL);
+       return self->pv->directory;
+}
+
+GInputStream *
+_gcr_gnupg_process_get_input_stream (GcrGnupgProcess *self)
+{
+       g_return_val_if_fail (GCR_GNUPG_PROCESS (self), NULL);
+       return self->pv->input;
+}
+
+void
+_gcr_gnupg_process_set_input_stream (GcrGnupgProcess *self,
+                                     GInputStream *input)
+{
+       g_return_if_fail (GCR_GNUPG_PROCESS (self));
+       g_return_if_fail (input == NULL || G_INPUT_STREAM (input));
+
+       if (input)
+               g_object_ref (input);
+       if (self->pv->input)
+               g_object_unref (self->pv->input);
+       self->pv->input = input;
+       g_object_notify (G_OBJECT (self), "input-stream");
+}
+
+GOutputStream *
+_gcr_gnupg_process_get_output_stream (GcrGnupgProcess *self)
+{
+       g_return_val_if_fail (GCR_GNUPG_PROCESS (self), NULL);
+       return self->pv->output;
+}
+
+void
+_gcr_gnupg_process_set_output_stream (GcrGnupgProcess *self,
+                                      GOutputStream *output)
+{
+       g_return_if_fail (GCR_GNUPG_PROCESS (self));
+       g_return_if_fail (output == NULL || G_OUTPUT_STREAM (output));
+
+       if (output)
+               g_object_ref (output);
+       if (self->pv->output)
+               g_object_unref (self->pv->output);
+       self->pv->output = output;
+       g_object_notify (G_OBJECT (self), "output-stream");
+}
+
+GOutputStream *
+_gcr_gnupg_process_get_attribute_stream (GcrGnupgProcess *self)
+{
+       g_return_val_if_fail (GCR_GNUPG_PROCESS (self), NULL);
+       return self->pv->attributes;
+}
+
+void
+_gcr_gnupg_process_set_attribute_stream (GcrGnupgProcess *self,
+                                         GOutputStream *output)
+{
+       g_return_if_fail (GCR_GNUPG_PROCESS (self));
+       g_return_if_fail (output == NULL || G_OUTPUT_STREAM (output));
+
+       if (output)
+               g_object_ref (output);
+       if (self->pv->attributes)
+               g_object_unref (self->pv->attributes);
+       self->pv->attributes = output;
+       g_object_notify (G_OBJECT (self), "attribute-stream");
+}
+
 static void
 run_async_ready_callback (GcrGnupgProcess *self)
 {
@@ -346,11 +459,6 @@ complete_run_process (GcrGnupgProcess *self)
        self->pv->running = FALSE;
        self->pv->complete = TRUE;
 
-       if (self->pv->source_sig) {
-               g_source_remove (self->pv->source_sig);
-               self->pv->source_sig = 0;
-       }
-
        if (self->pv->error == NULL) {
                _gcr_debug ("completed process");
        } else {
@@ -359,27 +467,27 @@ complete_run_process (GcrGnupgProcess *self)
        }
 }
 
-static gboolean
-complete_if_source_is_done (GnupgSource *gnupg_source)
+static void
+complete_source_is_done (GnupgSource *gnupg_source)
 {
-       gint i;
+       GcrGnupgProcess *self = gnupg_source->process;
 
-       for (i = 0; i < NUM_FDS; ++i) {
-               if (gnupg_source->polls[i].fd >= 0)
-                       return FALSE;
-       }
+       _gcr_debug ("all fds closed and process exited, completing");
 
-       if (gnupg_source->child_pid)
-               return FALSE;
+       g_assert (gnupg_source->child_sig == 0);
 
-       _gcr_debug ("all fds closed and process exited, completing");
+       if (gnupg_source->cancel_sig) {
+               g_signal_handler_disconnect (gnupg_source->cancellable, gnupg_source->cancel_sig);
+               gnupg_source->cancel_sig = 0;
+       }
+
+       g_clear_object (&gnupg_source->cancellable);
 
-       complete_run_process (gnupg_source->process);
-       run_async_ready_callback (gnupg_source->process);
+       complete_run_process (self);
+       run_async_ready_callback (self);
 
        /* All done, the source can go away now */
        g_source_unref ((GSource*)gnupg_source);
-       return TRUE;
 }
 
 static void
@@ -443,15 +551,15 @@ on_gnupg_source_finalize (GSource *source)
        GnupgSource *gnupg_source = (GnupgSource*)source;
        gint i;
 
-       if (gnupg_source->cancel_sig)
-               g_signal_handler_disconnect (gnupg_source->cancellable, gnupg_source->cancel_sig);
-       if (gnupg_source->cancellable)
-               g_object_unref (gnupg_source->cancellable);
+       g_assert (gnupg_source->cancellable == NULL);
+       g_assert (gnupg_source->cancel_sig == 0);
 
        for (i = 0; i < NUM_FDS; ++i)
                close_fd (&gnupg_source->polls[i].fd);
 
        g_object_unref (gnupg_source->process);
+       if (gnupg_source->input_buf)
+               g_byte_array_free (gnupg_source->input_buf, TRUE);
        g_string_free (gnupg_source->error_buf, TRUE);
        g_string_free (gnupg_source->status_buf, TRUE);
 
@@ -481,6 +589,26 @@ read_output (int fd, GByteArray *buffer)
        return TRUE;
 }
 
+static gboolean
+write_input (int fd, GByteArray *buffer)
+{
+       gssize result;
+
+       g_return_val_if_fail (fd >= 0, FALSE);
+
+       for (;;) {
+               result = write (fd, buffer->data, buffer->len);
+               if (result < 0) {
+                       if (errno == EINTR || errno == EAGAIN)
+                               continue;
+                       return FALSE;
+               } else {
+                       g_byte_array_remove_range (buffer, 0, result);
+                       return TRUE;
+               }
+       }
+}
+
 static void
 emit_status_for_each_line (const gchar *line, gpointer user_data)
 {
@@ -512,32 +640,151 @@ emit_error_for_each_line (const gchar *line, gpointer user_data)
 }
 
 static gboolean
+on_gnupg_source_input (GcrGnupgProcess *self,
+                       GnupgSource *gnupg_source,
+                       gint fd)
+{
+       gssize read;
+
+       if (gnupg_source->input_buf == NULL ||
+           gnupg_source->input_buf->len == 0) {
+               if (self->pv->input == NULL)
+                       return FALSE;
+               if (!gnupg_source->input_buf)
+                       gnupg_source->input_buf = g_byte_array_new ();
+               g_byte_array_set_size (gnupg_source->input_buf, 4096);
+               read = g_input_stream_read (self->pv->input,
+                                           gnupg_source->input_buf->data,
+                                           gnupg_source->input_buf->len,
+                                           gnupg_source->cancellable, NULL);
+               g_byte_array_set_size (gnupg_source->input_buf, read < 0 ? 0 : read);
+               if (read < 0)
+                       return FALSE;
+               if (read == 0)
+                       return FALSE;
+       }
+
+       if (!write_input (fd, gnupg_source->input_buf)) {
+               g_warning ("couldn't write output data to gnupg process");
+               return FALSE;
+       }
+
+       return TRUE;
+}
+
+static gboolean
+on_gnupg_source_status (GcrGnupgProcess *self,
+                        GnupgSource *gnupg_source,
+                        gint fd)
+{
+       GByteArray *buffer = g_byte_array_new ();
+       gboolean result = TRUE;
+
+       if (!read_output (fd, buffer)) {
+               g_warning ("couldn't read status data from gnupg process");
+               result = FALSE;
+       } else {
+               g_string_append_len (gnupg_source->status_buf, (gchar*)buffer->data, buffer->len);
+               _gcr_util_parse_lines (gnupg_source->status_buf, buffer->len == 0,
+                                      emit_status_for_each_line, self);
+       }
+
+       g_byte_array_unref (buffer);
+       return result;
+}
+
+static gboolean
+on_gnupg_source_attribute (GcrGnupgProcess *self,
+                           GnupgSource *gnupg_source,
+                           gint fd)
+{
+       GByteArray *buffer = g_byte_array_new ();
+       gboolean result = TRUE;
+
+       if (!read_output (fd, buffer)) {
+               g_warning ("couldn't read attribute data from gnupg process");
+               result = FALSE;
+       } else if (buffer->len > 0) {
+               _gcr_debug ("received %d bytes of attribute data", (gint)buffer->len);
+               if (self->pv->attributes != NULL)
+                       g_output_stream_write_all (self->pv->attributes, buffer->data,
+                                                  buffer->len, NULL,
+                                                  gnupg_source->cancellable, NULL);
+       }
+
+       g_byte_array_unref (buffer);
+       return result;
+}
+
+static gboolean
+on_gnupg_source_output (GcrGnupgProcess *self,
+                        GnupgSource *gnupg_source,
+                        gint fd)
+{
+       GByteArray *buffer = g_byte_array_new ();
+       gboolean result = TRUE;
+
+       if (!read_output (fd, buffer)) {
+               g_warning ("couldn't read output data from gnupg process");
+               result = FALSE;
+       } else if (buffer->len > 0) {
+               _gcr_debug ("received %d bytes of output data", (gint)buffer->len);
+               if (self->pv->output != NULL)
+                       g_output_stream_write_all (self->pv->output, buffer->data, buffer->len,
+                                                  NULL, gnupg_source->cancellable, NULL);
+       }
+
+       g_byte_array_unref (buffer);
+       return result;
+}
+
+static gboolean
+on_gnupg_source_error (GcrGnupgProcess *self,
+                       GnupgSource *gnupg_source,
+                       gint fd,
+                       gboolean last)
+{
+       GByteArray *buffer = g_byte_array_new ();
+       gboolean result = TRUE;
+
+       if (!read_output (fd, buffer)) {
+               g_warning ("couldn't read error data from gnupg process");
+               result = FALSE;
+       } else {
+               g_string_append_len (gnupg_source->error_buf, (gchar*)buffer->data, buffer->len);
+               _gcr_util_parse_lines (gnupg_source->error_buf, last,
+                                      emit_error_for_each_line, gnupg_source->process);
+       }
+
+       g_byte_array_unref (buffer);
+       return result;
+}
+
+static gboolean
 on_gnupg_source_dispatch (GSource *source, GSourceFunc unused, gpointer user_data)
 {
        GnupgSource *gnupg_source = (GnupgSource*)source;
-       GByteArray *buffer;
+       GcrGnupgProcess *self = gnupg_source->process;
        GPollFD *poll;
+       guint i;
 
        /* Standard input, no support yet */
        poll = &gnupg_source->polls[FD_INPUT];
-       if (poll->fd >= 0 && poll->revents != 0) {
-               close_poll (source, poll);
+       if (poll->fd >= 0) {
+               if (poll->revents & G_IO_OUT)
+                       if (!on_gnupg_source_input (self, gnupg_source, poll->fd))
+                               poll->revents |= G_IO_HUP;
+               if (poll->revents & G_IO_HUP)
+                       close_poll (source, poll);
+               poll->revents = 0;
        }
 
        /* Status output */
        poll = &gnupg_source->polls[FD_STATUS];
        if (poll->fd >= 0) {
-               if (poll->revents & G_IO_IN) {
-                       buffer = g_byte_array_new ();
-                       if (!read_output (poll->fd, buffer)) {
-                               g_warning ("couldn't read status data from gnupg process");
-                       } else {
-                               g_string_append_len (gnupg_source->status_buf, (gchar*)buffer->data, buffer->len);
-                               _gcr_util_parse_lines (gnupg_source->status_buf, buffer->len == 0,
-                                                      emit_status_for_each_line, gnupg_source->process);
-                       }
-                       g_byte_array_unref (buffer);
-               }
+               if (poll->revents & G_IO_IN)
+                       if (!on_gnupg_source_status (self, gnupg_source, poll->fd))
+                               poll->revents |= G_IO_HUP;
                if (poll->revents & G_IO_HUP)
                        close_poll (source, poll);
                poll->revents = 0;
@@ -546,16 +793,9 @@ on_gnupg_source_dispatch (GSource *source, GSourceFunc unused, gpointer user_dat
        /* Attribute output */
        poll = &gnupg_source->polls[FD_ATTRIBUTE];
        if (poll->fd >= 0) {
-               if (poll->revents & G_IO_IN) {
-                       buffer = g_byte_array_new ();
-                       if (!read_output (poll->fd, buffer)) {
-                               g_warning ("couldn't read attribute data from gnupg process");
-                       } else if (buffer->len > 0) {
-                               _gcr_debug ("received %d bytes of attribute data", (gint)buffer->len);
-                               g_signal_emit (gnupg_source->process, signals[ATTRIBUTE_DATA], 0, buffer);
-                       }
-                       g_byte_array_unref (buffer);
-               }
+               if (poll->revents & G_IO_IN)
+                       if (!on_gnupg_source_attribute (self, gnupg_source, poll->fd))
+                               poll->revents |= G_IO_HUP;
                if (poll->revents & G_IO_HUP)
                        close_poll (source, poll);
                poll->revents = 0;
@@ -564,16 +804,9 @@ on_gnupg_source_dispatch (GSource *source, GSourceFunc unused, gpointer user_dat
        /* Standard output */
        poll = &gnupg_source->polls[FD_OUTPUT];
        if (poll->fd >= 0) {
-               if (poll->revents & G_IO_IN) {
-                       buffer = g_byte_array_new ();
-                       if (!read_output (poll->fd, buffer)) {
-                               g_warning ("couldn't read output data from gnupg process");
-                       } else if (buffer->len > 0) {
-                               _gcr_debug ("received %d bytes of output data", (gint)buffer->len);
-                               g_signal_emit (gnupg_source->process, signals[OUTPUT_DATA], 0, buffer);
-                       }
-                       g_byte_array_unref (buffer);
-               }
+               if (poll->revents & G_IO_IN)
+                       if (!on_gnupg_source_output (self, gnupg_source, poll->fd))
+                               poll->revents |= G_IO_HUP;
                if (poll->revents & G_IO_HUP)
                        close_poll (source, poll);
                poll->revents = 0;
@@ -582,26 +815,27 @@ on_gnupg_source_dispatch (GSource *source, GSourceFunc unused, gpointer user_dat
        /* Standard error */
        poll = &gnupg_source->polls[FD_ERROR];
        if (poll->fd >= 0) {
-               if (poll->revents & G_IO_IN) {
-                       buffer = g_byte_array_new ();
-                       if (!read_output (poll->fd, buffer)) {
-                               g_warning ("couldn't read error data from gnupg process");
-                       } else {
-                               g_string_append_len (gnupg_source->error_buf, (gchar*)buffer->data, buffer->len);
-                               _gcr_util_parse_lines (gnupg_source->error_buf, (poll->revents & G_IO_HUP) ? TRUE : FALSE,
-                                                      emit_error_for_each_line, gnupg_source->process);
-                       }
-                       g_byte_array_unref (buffer);
-               }
+               if (poll->revents & G_IO_IN)
+                       if (!on_gnupg_source_error (self, gnupg_source, poll->fd,
+                                                   (poll->revents & G_IO_HUP) ? TRUE : FALSE))
+                               poll->revents |= G_IO_HUP;
                if (poll->revents & G_IO_HUP)
                        close_poll (source, poll);
                poll->revents = 0;
        }
 
-       if (complete_if_source_is_done (gnupg_source))
-               return FALSE; /* Disconnect this source */
+       for (i = 0; i < NUM_FDS; ++i) {
+               if (gnupg_source->polls[i].fd >= 0)
+                       return TRUE;
+       }
 
-       return TRUE;
+       /* Because we return below */
+       self->pv->source_sig = 0;
+
+       if (!gnupg_source->child_pid)
+               complete_source_is_done (gnupg_source);
+
+       return FALSE; /* Disconnect this source */
 }
 
 static GSourceFuncs gnupg_source_funcs = {
@@ -618,6 +852,7 @@ on_gnupg_process_child_exited (GPid pid, gint status, gpointer user_data)
        GcrGnupgProcess *self = gnupg_source->process;
        GError *error = NULL;
        gint code;
+       guint i;
 
        _gcr_debug ("process exited: %d", (int)pid);
 
@@ -650,7 +885,12 @@ on_gnupg_process_child_exited (GPid pid, gint status, gpointer user_data)
                g_error_free (error);
        }
 
-       complete_if_source_is_done (gnupg_source);
+       for (i = 0; i < NUM_FDS; ++i) {
+               if (gnupg_source->polls[i].fd >= 0)
+                       return;
+       }
+
+       complete_source_is_done (gnupg_source);
 }
 
 static void
@@ -682,19 +922,17 @@ on_cancellable_cancelled (GCancellable *cancellable, gpointer user_data)
 
        _gcr_debug ("process cancelled");
 
+       /* Set an error, which is respected when this actually completes. */
+       if (gnupg_source->process->pv->error == NULL)
+               gnupg_source->process->pv->error = g_error_new_literal (G_IO_ERROR, G_IO_ERROR_CANCELLED,
+                                                                       _("The operation was cancelled"));
+
        /* Try and kill the child process */
        if (gnupg_source->child_pid) {
                _gcr_debug ("sending term signal to process: %d",
                            (int)gnupg_source->child_pid);
                kill (gnupg_source->child_pid, SIGTERM);
        }
-
-       /* Set an error, which is respected when this actually completes. */
-       if (gnupg_source->process->pv->error == NULL)
-               gnupg_source->process->pv->error = g_error_new_literal (G_IO_ERROR, G_IO_ERROR_CANCELLED,
-                                                                       _("The operation was cancelled"));
-
-       complete_if_source_is_done (gnupg_source);
 }
 
 /**
@@ -731,6 +969,7 @@ _gcr_gnupg_process_run_async (GcrGnupgProcess *self, const gchar **argv, const g
        int attribute_fds[2] = { -1, -1 };
        int output_fd = -1;
        int error_fd = -1;
+       int input_fd = -1;
        GnupgSource *gnupg_source;
        GSource *source;
        GPid pid;
@@ -756,6 +995,7 @@ _gcr_gnupg_process_run_async (GcrGnupgProcess *self, const gchar **argv, const g
        g_ptr_array_add (args, g_strdup (self->pv->executable));
 
        /* Spawn/child will close all other attributes, besides thesthose in child_fds */
+       child_fds[FD_INPUT] = 0;
        child_fds[FD_OUTPUT] = 1;
        child_fds[FD_ERROR] = 2;
 
@@ -805,8 +1045,8 @@ _gcr_gnupg_process_run_async (GcrGnupgProcess *self, const gchar **argv, const g
 
        g_spawn_async_with_pipes (self->pv->directory, (gchar**)args->pdata,
                                  (gchar**)envs->pdata, G_SPAWN_DO_NOT_REAP_CHILD,
-                                 on_gnupg_process_child_setup,
-                                 child_fds, &pid, NULL, &output_fd, &error_fd, &error);
+                                 on_gnupg_process_child_setup, child_fds,
+                                 &pid, &input_fd, &output_fd, &error_fd, &error);
 
        g_ptr_array_free (args, TRUE);
        g_ptr_array_free (envs, TRUE);
@@ -841,6 +1081,11 @@ _gcr_gnupg_process_run_async (GcrGnupgProcess *self, const gchar **argv, const g
        gnupg_source->process = g_object_ref (self);
        gnupg_source->child_pid = pid;
 
+       gnupg_source->polls[FD_INPUT].fd = input_fd;
+       if (input_fd >= 0) {
+               gnupg_source->polls[FD_INPUT].events = G_IO_HUP | G_IO_OUT;
+               g_source_add_poll (source, &gnupg_source->polls[FD_INPUT]);
+       }
        gnupg_source->polls[FD_OUTPUT].fd = output_fd;
        if (output_fd >= 0) {
                gnupg_source->polls[FD_OUTPUT].events = G_IO_HUP | G_IO_IN;
@@ -870,12 +1115,12 @@ _gcr_gnupg_process_run_async (GcrGnupgProcess *self, const gchar **argv, const g
                                                                  (GDestroyNotify)g_source_unref);
        }
 
-       g_assert (!self->pv->source_sig);
+       g_assert (self->pv->source_sig == 0);
        g_source_set_callback (source, unused_callback, NULL, NULL);
        self->pv->source_sig = g_source_attach (source, g_main_context_default ());
 
        /* This assumes the outstanding reference to source */
-       g_assert (!gnupg_source->child_sig);
+       g_assert (gnupg_source->child_sig == 0);
        gnupg_source->child_sig = g_child_watch_add_full (G_PRIORITY_DEFAULT, pid,
                                                          on_gnupg_process_child_exited,
                                                          g_source_ref (source),
@@ -909,7 +1154,6 @@ _gcr_gnupg_process_run_finish (GcrGnupgProcess *self, GAsyncResult *result,
        g_assert (!self->pv->running);
        g_assert (!self->pv->async_callback);
        g_assert (!self->pv->user_data);
-       g_assert (!self->pv->source_sig);
 
        if (self->pv->error) {
                g_propagate_error (error, self->pv->error);