gsize stdin_length;
gsize stdin_offset;
- /* Not actually GString. Just borrowing the struct. */
- GString stdout_string;
- GString stderr_string;
-
- GBytes *unref_this_later;
- gchar *free_this_later;
+ GInputStream *stdin_buf;
+ GMemoryOutputStream *stdout_buf;
+ GMemoryOutputStream *stderr_buf;
GCancellable *cancellable;
GSource *cancellable_source;
- gboolean completion_reported;
+ guint outstanding_ops;
+ gboolean reported_error;
} CommunicateState;
static void
-ensure_string_allocated (GString *str)
-{
- /* This will work because the first time we will set it to
- * COMMUNICATE_READ_SIZE and then all future attempts will grow by at
- * least that much (as a result of multiplying the existing value by
- * 2).
- */
- if (str->len + COMMUNICATE_READ_SIZE > str->allocated_len)
- {
- str->allocated_len = MAX(COMMUNICATE_READ_SIZE, str->allocated_len * 2);
- str->str = g_realloc (str->str, str->allocated_len);
- }
-}
-
-static void
g_subprocess_communicate_made_progress (GObject *source_object,
GAsyncResult *result,
gpointer user_data)
state = g_task_get_task_data (task);
source = source_object;
- if (source == subprocess->stdin_pipe)
- {
- gssize s;
-
- s = g_output_stream_write_finish (subprocess->stdin_pipe, result, &error);
- g_assert (s != 0);
+ state->outstanding_ops--;
- if (s != -1)
- {
- g_assert (0 < s && s < state->stdin_length);
- g_assert (state->stdin_offset + s <= state->stdin_length);
- state->stdin_offset += s;
-
- if (state->stdin_offset != state->stdin_length)
- {
- /* write more... */
- g_output_stream_write_async (subprocess->stdin_pipe,
- state->stdin_data + state->stdin_offset,
- state->stdin_length - state->stdin_offset,
- G_PRIORITY_DEFAULT,
- state->cancellable,
- g_subprocess_communicate_made_progress,
- task);
- return;
- }
- }
- }
- else if (source == subprocess->stdout_pipe)
+ if (source == subprocess->stdin_pipe ||
+ source == state->stdout_buf ||
+ source == state->stderr_buf)
{
- gssize s;
-
- s = g_input_stream_read_finish (subprocess->stdout_pipe, result, &error);
- g_assert (s <= COMMUNICATE_READ_SIZE);
-
- /* If s is 0 then we have EOF and should not read more, but should
- * continue to try the other event sources.
- *
- * If s is -1 then error will be set and we deal with that below.
- *
- * Only have to handle the result > 0 case.
- */
- if (s > 0)
- {
- state->stdout_string.len += s;
-
- ensure_string_allocated (&state->stdout_string);
-
- g_input_stream_read_async (subprocess->stdout_pipe, state->stdout_string.str + state->stdout_string.len,
- COMMUNICATE_READ_SIZE, G_PRIORITY_DEFAULT - 1, state->cancellable,
- g_subprocess_communicate_made_progress, g_object_ref (task));
- return;
- }
- }
- else if (source == subprocess->stderr_pipe)
- {
- gssize s;
-
- s = g_input_stream_read_finish (subprocess->stdout_pipe, result, &error);
- g_assert (s <= COMMUNICATE_READ_SIZE);
-
- /* As above... */
- if (s > 0)
- {
- state->stderr_string.len += s;
-
- ensure_string_allocated (&state->stderr_string);
-
- g_input_stream_read_async (subprocess->stderr_pipe, state->stderr_string.str + state->stderr_string.len,
- COMMUNICATE_READ_SIZE, G_PRIORITY_DEFAULT - 1, state->cancellable,
- g_subprocess_communicate_made_progress, g_object_ref (task));
- return;
- }
+ (void) g_output_stream_splice_finish ((GOutputStream*)source, result, &error);
}
else if (source == subprocess)
{
- if (g_subprocess_wait_finish (subprocess, result, &error))
- {
- /* It is not possible that we had a successful completion if
- * the task was already completed because we flag our own
- * cancellable in that case.
- */
- g_assert (!state->completion_reported);
- state->completion_reported = TRUE;
- g_task_return_boolean (task, TRUE);
- }
+ (void) g_subprocess_wait_finish (subprocess, result, &error);
}
else
g_assert_not_reached ();
* We might be seeing an error as a result of the cancellation
* done when the process quits.
*/
- if (!state->completion_reported)
+ if (!state->reported_error)
{
- state->completion_reported = TRUE;
-
+ state->reported_error = TRUE;
g_cancellable_cancel (state->cancellable);
g_task_return_error (task, error);
}
else
g_error_free (error);
}
+ else if (state->outstanding_ops == 0)
+ {
+ g_task_return_boolean (task, TRUE);
+ }
+ /* And drop the original ref */
g_object_unref (task);
}
{
CommunicateState *state = data;
- g_free (state->stdout_string.str);
- g_free (state->stderr_string.str);
- g_free (state->free_this_later);
+ g_clear_object (&state->stdin_buf);
+ g_clear_object (&state->stdout_buf);
+ g_clear_object (&state->stderr_buf);
if (!g_source_is_destroyed (state->cancellable_source))
g_source_destroy (state->cancellable_source);
g_source_unref (state->cancellable_source);
- if (state->unref_this_later)
- g_bytes_unref (state->unref_this_later);
-
g_slice_free (CommunicateState, state);
}
static CommunicateState *
g_subprocess_communicate_internal (GSubprocess *subprocess,
- GBytes *stdin_bytes,
- const gchar *stdin_data,
- gssize stdin_length,
+ GBytes *stdin_buf,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
state = g_slice_new0 (CommunicateState);
g_task_set_task_data (task, state, g_subprocess_communicate_state_free);
- if (stdin_bytes)
- {
- g_assert (!stdin_data && !stdin_length && (subprocess->flags & G_SUBPROCESS_FLAGS_STDIN_PIPE));
- state->stdin_data = g_bytes_get_data (stdin_bytes, &state->stdin_length);
- state->unref_this_later = g_bytes_ref (stdin_bytes);
- }
- else if (stdin_data)
- {
- g_assert (subprocess->flags & G_SUBPROCESS_FLAGS_STDIN_PIPE);
- if (stdin_length < 0)
- state->stdin_length = strlen (stdin_data);
- else
- state->stdin_length = stdin_length;
-
- state->free_this_later = g_memdup (stdin_data, state->stdin_length);
- state->stdin_data = state->free_this_later;
- }
-
state->cancellable = g_cancellable_new ();
if (cancellable)
g_source_attach (state->cancellable_source, g_main_context_get_thread_default ());
}
- if (subprocess->stdin_pipe && state->stdin_length)
- g_output_stream_write_async (subprocess->stdin_pipe, state->stdin_data, state->stdin_length, G_PRIORITY_DEFAULT,
- state->cancellable, g_subprocess_communicate_made_progress, g_object_ref (task));
+ if (subprocess->stdin_pipe)
+ {
+ g_assert (stdin_buf != NULL);
+ state->stdin_buf = g_memory_input_stream_new_from_bytes (stdin_buf);
+ g_output_stream_splice_async (subprocess->stdin_pipe, (GInputStream*)state->stdin_buf,
+ G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ G_PRIORITY_DEFAULT, state->cancellable,
+ g_subprocess_communicate_made_progress, g_object_ref (task));
+ state->outstanding_ops++;
+ }
if (subprocess->stdout_pipe)
{
- ensure_string_allocated (&state->stdout_string);
-
- g_input_stream_read_async (subprocess->stdout_pipe, state->stdout_string.str, COMMUNICATE_READ_SIZE,
- G_PRIORITY_DEFAULT - 1, state->cancellable,
- g_subprocess_communicate_made_progress, g_object_ref (task));
+ state->stdout_buf = (GMemoryOutputStream*)g_memory_output_stream_new_resizable ();
+ g_output_stream_splice_async ((GOutputStream*)state->stdout_buf, subprocess->stdout_pipe,
+ G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ G_PRIORITY_DEFAULT, state->cancellable,
+ g_subprocess_communicate_made_progress, g_object_ref (task));
+ state->outstanding_ops++;
}
if (subprocess->stderr_pipe)
{
- ensure_string_allocated (&state->stderr_string);
-
- g_input_stream_read_async (subprocess->stderr_pipe, state->stderr_string.str, COMMUNICATE_READ_SIZE,
- G_PRIORITY_DEFAULT - 1, state->cancellable,
- g_subprocess_communicate_made_progress, g_object_ref (task));
+ state->stderr_buf = (GMemoryOutputStream*)g_memory_output_stream_new_resizable ();
+ g_output_stream_splice_async ((GOutputStream*)state->stderr_buf, subprocess->stderr_pipe,
+ G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ G_PRIORITY_DEFAULT, state->cancellable,
+ g_subprocess_communicate_made_progress, g_object_ref (task));
+ state->outstanding_ops++;
}
g_subprocess_wait_async (subprocess, state->cancellable,
g_subprocess_communicate_made_progress, g_object_ref (task));
+ state->outstanding_ops++;
return state;
}
/**
- * g_Subprocess_communicate:
+ * g_subprocess_communicate:
* @self: a #GSubprocess
- * @stdin_data: data to send to the stdin of the subprocess, or %NULL
- * @stdin_length: the length of @stdin_data, or -1
+ * @stdin_buf: data to send to the stdin of the subprocess, or %NULL
* @cancellable: a #GCancellable
- * @stdout_data: (out): data read from the subprocess stdout
- * @stdout_length: (out): the length of @stdout_data returned
- * @stderr_data: (out): data read from the subprocess stderr
- * @stderr_length: (out): the length of @stderr_data returned
+ * @stdout_buf: (out): data read from the subprocess stdout
+ * @stderr_buf: (out): data read from the subprocess stderr
* @error: a pointer to a %NULL #GError pointer, or %NULL
*
- * Communicate with the subprocess until it terminates.
+ * Communicate with the subprocess until it terminates, and all input
+ * and output has been completed.
*
- * If @stdin_data is given, the subprocess must have been created with
+ * If @stdin is given, the subprocess must have been created with
* %G_SUBPROCESS_FLAGS_STDIN_PIPE. The given data is fed to the
* stdin of the subprocess and the pipe is closed (ie: EOF).
*
* At the same time (as not to cause blocking when dealing with large
* amounts of data), if %G_SUBPROCESS_FLAGS_STDOUT_PIPE or
- * %G_SUBPROCESS_FLAGS_STDERR_PIPE were used, reads from those streams.
- * The data that was read is returned in @stdout_data and/or
- * @stderr_data.
- *
- * @stdin_length specifies the length of @stdin_data. If it is -1 then
- * @stdin_data is taken to be a nul-terminated string. If the
- * subprocess was not created with %G_SUBPROCESS_FLAGS_STDIN_PIPE then
- * you must pass %NULL for @stdin_data and 0 for @stdin_length.
+ * %G_SUBPROCESS_FLAGS_STDERR_PIPE were used, reads from those
+ * streams. The data that was read is returned in @stdout and/or
+ * the @stderr.
*
* If the subprocess was created with %G_SUBPROCESS_FLAGS_STDOUT_PIPE,
- * @stdout_data will contain the data read from stdout, plus a
- * terminating nul character; it will always be non-%NULL (ie:
- * containing at least the nul). @stdout_length will be the length of
- * the data, excluding the added nul. For subprocesses not created with
- * %G_SUBPROCESS_FLAGS_STDOUT_PIPE, @stdout_data will be set to %NULL
- * and @stdout_length will be set to zero. stderr is handled in the
- * same way.
+ * @stdout_buf will contain the data read from stdout. Otherwise, for
+ * subprocesses not created with %G_SUBPROCESS_FLAGS_STDOUT_PIPE,
+ * @stdout_buf will be set to %NULL. Similar provisions apply to
+ * @stderr_buf and %G_SUBPROCESS_FLAGS_STDERR_PIPE.
*
* As usual, any output variable may be given as %NULL to ignore it.
*
* If you desire the stdout and stderr data to be interleaved, create
* the subprocess with %G_SUBPROCESS_FLAGS_STDOUT_PIPE and
* %G_SUBPROCESS_FLAGS_STDERR_MERGE. The merged result will be returned
- * in @stdout_data and @stderr_data will be set to %NULL.
+ * in @stdout_buf and @stderr_buf will be set to %NULL.
*
* In case of any error (including cancellation), %FALSE will be
* returned with @error set. Some or all of the stdin data may have
*
* Returns: %TRUE if successful
*
- * Since: 2.36
+ * Since: 2.40
**/
gboolean
g_subprocess_communicate (GSubprocess *subprocess,
- const gchar *stdin_data,
- gssize stdin_length,
+ GBytes *stdin_buf,
GCancellable *cancellable,
- gchar **stdout_data,
- gsize *stdout_length,
- gchar **stderr_data,
- gsize *stderr_length,
+ GBytes **stdout_buf,
+ GBytes **stderr_buf,
GError **error)
{
GAsyncResult *result = NULL;
gboolean success;
g_return_val_if_fail (G_IS_SUBPROCESS (subprocess), FALSE);
- g_return_val_if_fail (stdin_length == 0 || stdin_data != NULL, FALSE);
- g_return_val_if_fail (stdin_data == NULL || (subprocess->flags & G_SUBPROCESS_FLAGS_STDIN_PIPE), FALSE);
+ g_return_val_if_fail (stdin_buf == NULL || (subprocess->flags & G_SUBPROCESS_FLAGS_STDIN_PIPE), FALSE);
g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
g_subprocess_sync_setup ();
- g_subprocess_communicate_internal (subprocess, NULL, stdin_data, stdin_length,
- cancellable, g_subprocess_sync_done, &result);
+ g_subprocess_communicate_internal (subprocess, stdin_buf, cancellable, g_subprocess_sync_done, &result);
g_subprocess_sync_complete (&result);
- success = g_subprocess_communicate_finish (subprocess, result,
- stdout_data, stdout_length,
- stderr_data, stderr_length, error);
+ success = g_subprocess_communicate_finish (subprocess, result, stdout_buf, stderr_buf, error);
g_object_unref (result);
return success;
}
+/**
+ * g_subprocess_communicate_async:
+ * @subprocess: Self
+ * @stdin_buf: Input data
+ * @cancellable: Cancellable
+ * @callback: Callback
+ * @user_data: User data
+ *
+ * Asynchronous version of g_subprocess_communicate(). Complete
+ * invocation with g_subprocess_communicate_finish().
+ */
void
g_subprocess_communicate_async (GSubprocess *subprocess,
- const gchar *stdin_data,
- gssize stdin_length,
+ GBytes *stdin_buf,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
g_return_if_fail (G_IS_SUBPROCESS (subprocess));
- g_return_if_fail (stdin_length == 0 || stdin_data != NULL);
- g_return_if_fail (stdin_data == NULL || (subprocess->flags & G_SUBPROCESS_FLAGS_STDIN_PIPE));
+ g_return_if_fail (stdin_buf == NULL || (subprocess->flags & G_SUBPROCESS_FLAGS_STDIN_PIPE));
g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
- g_subprocess_communicate_internal (subprocess, NULL, stdin_data, stdin_length, cancellable, callback, user_data);
+ g_subprocess_communicate_internal (subprocess, stdin_buf, cancellable, callback, user_data);
}
+/**
+ * g_subprocess_communicate_finish:
+ * @subprocess: Self
+ * @result: Result
+ * @stdout_buf: (out): Return location for stdout data
+ * @stderr_buf: (out): Return location for stderr data
+ * @error: Error
+ *
+ * Complete an invocation of g_subprocess_communicate_async().
+ */
gboolean
g_subprocess_communicate_finish (GSubprocess *subprocess,
GAsyncResult *result,
- gchar **stdout_data,
- gsize *stdout_length,
- gchar **stderr_data,
- gsize *stderr_length,
+ GBytes **stdout_buf,
+ GBytes **stderr_buf,
GError **error)
{
- CommunicateState *state;
gboolean success;
- GTask *task;
+ CommunicateState *state;
g_return_val_if_fail (G_IS_SUBPROCESS (subprocess), FALSE);
g_return_val_if_fail (g_task_is_valid (result, subprocess), FALSE);
g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
- task = G_TASK (result);
- state = g_task_get_task_data (task);
+ g_object_ref (result);
- success = g_task_propagate_boolean (task, error);
+ state = g_task_get_task_data ((GTask*)result);
+ success = g_task_propagate_boolean ((GTask*)result, error);
if (success)
{
- if (stdout_data)
- {
- gchar *string;
-
- string = g_realloc (state->stdout_string.str, state->stdout_string.len + 1);
- string[state->stdout_string.len] = '\0';
- state->stdout_string.str = NULL;
- *stdout_data = string;
- }
-
- if (stdout_length)
- *stdout_length = state->stdout_string.len;
-
- if (stderr_data)
- {
- gchar *string;
-
- string = g_realloc (state->stderr_string.str, state->stderr_string.len + 1);
- string[state->stderr_string.len] = '\0';
- state->stderr_string.str = NULL;
- *stderr_data = string;
- }
-
- if (stderr_length)
- *stderr_length = state->stderr_string.len;
+ if (stdout_buf)
+ *stdout_buf = g_memory_output_stream_steal_as_bytes (state->stdout_buf);
+ if (stderr_buf)
+ *stderr_buf = g_memory_output_stream_steal_as_bytes (state->stderr_buf);
}
- return success;
-}
-
-gboolean
-g_subprocess_communicate_bytes (GSubprocess *subprocess,
- GBytes *stdin_bytes,
- GCancellable *cancellable,
- GBytes **stdout_bytes,
- GBytes **stderr_bytes,
- GError **error)
-{
- GAsyncResult *result = NULL;
- gboolean success;
-
- g_return_val_if_fail (G_IS_SUBPROCESS (subprocess), FALSE);
- g_return_val_if_fail (stdin_bytes == NULL || (subprocess->flags & G_SUBPROCESS_FLAGS_STDIN_PIPE), FALSE);
- g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
- g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
-
- g_subprocess_sync_setup ();
- g_subprocess_communicate_internal (subprocess, stdin_bytes, NULL, 0, cancellable, g_subprocess_sync_done, &result);
- g_subprocess_sync_complete (&result);
- success = g_subprocess_communicate_bytes_finish (subprocess, result, stdout_bytes, stderr_bytes, error);
g_object_unref (result);
-
- return success;
-}
-
-void
-g_subprocess_communicate_bytes_async (GSubprocess *subprocess,
- GBytes *stdin_bytes,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- g_return_if_fail (G_IS_SUBPROCESS (subprocess));
- g_return_if_fail (stdin_bytes == NULL || (subprocess->flags & G_SUBPROCESS_FLAGS_STDIN_PIPE));
- g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
-
- g_subprocess_communicate_internal (subprocess, stdin_bytes, NULL, 0, cancellable, callback, user_data);
-}
-
-gboolean
-g_subprocess_communicate_bytes_finish (GSubprocess *subprocess,
- GAsyncResult *result,
- GBytes **stdout_bytes,
- GBytes **stderr_bytes,
- GError **error)
-{
- gboolean success;
- gchar *stdout_data;
- gsize stdout_length;
- gchar *stderr_data;
- gsize stderr_length;
-
- g_return_val_if_fail (G_IS_SUBPROCESS (subprocess), FALSE);
- g_return_val_if_fail (g_task_is_valid (result, subprocess), FALSE);
- g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
-
- success = g_subprocess_communicate_finish (subprocess, result,
- stdout_bytes ? &stdout_data : NULL,
- stdout_bytes ? &stdout_length : NULL,
- stderr_bytes ? &stderr_data : NULL,
- stderr_bytes ? &stderr_length : NULL,
- error);
-
- if (success)
- {
- if (stdout_bytes)
- *stdout_bytes = g_bytes_new_take (stdout_data, stdout_length);
-
- if (stderr_bytes)
- *stderr_bytes = g_bytes_new_take (stderr_data, stderr_length);
- }
-
return success;
}