soup-message-io: use gio streams rather than SoupSocket
[platform/upstream/libsoup.git] / libsoup / soup-socket.c
index b4396c5..2d72b38 100644 (file)
@@ -16,8 +16,9 @@
 #include <string.h>
 #include <unistd.h>
 
-#include "soup-address.h"
 #include "soup-socket.h"
+#include "soup-address.h"
+#include "soup-filter-input-stream.h"
 #include "soup-marshal.h"
 #include "soup-misc.h"
 #include "soup-misc-private.h"
@@ -39,6 +40,7 @@ enum {
        WRITABLE,
        DISCONNECTED,
        NEW_CONNECTION,
+       EVENT,
        LAST_SIGNAL
 };
 
@@ -69,8 +71,8 @@ typedef struct {
        SoupAddress *local_addr, *remote_addr;
        GIOStream *conn;
        GSocket *gsock;
-       GPollableInputStream *istream;
-       GPollableOutputStream *ostream;
+       GInputStream *istream;
+       GOutputStream *ostream;
        GTlsCertificateFlags tls_errors;
 
        guint non_blocking:1;
@@ -85,7 +87,6 @@ typedef struct {
        GMainContext   *async_context;
        GSource        *watch_src;
        GSource        *read_src, *write_src;
-       GByteArray     *read_buf;
 
        GMutex iolock, addrlock;
        guint timeout;
@@ -114,22 +115,20 @@ soup_socket_init (SoupSocket *sock)
 }
 
 static void
-disconnect_internal (SoupSocket *sock)
+disconnect_internal (SoupSocket *sock, gboolean close)
 {
        SoupSocketPrivate *priv = SOUP_SOCKET_GET_PRIVATE (sock);
 
        if (priv->gsock) {
-               g_socket_close (priv->gsock, NULL);
+               if (close)
+                       g_socket_close (priv->gsock, NULL);
                g_object_unref (priv->gsock);
                priv->gsock = NULL;
        }
        if (priv->conn) {
                if (G_IS_TLS_CONNECTION (priv->conn))
                        g_signal_handlers_disconnect_by_func (priv->conn, soup_socket_peer_certificate_changed, sock);
-               g_object_unref (priv->conn);
-               priv->conn = NULL;
-               priv->istream = NULL;
-               priv->ostream = NULL;
+               g_clear_object (&priv->conn);
        }
 
        if (priv->read_src) {
@@ -155,9 +154,12 @@ finalize (GObject *object)
        if (priv->conn) {
                if (priv->clean_dispose)
                        g_warning ("Disposing socket %p while still connected", object);
-               disconnect_internal (SOUP_SOCKET (object));
+               disconnect_internal (SOUP_SOCKET (object), TRUE);
        }
 
+       g_clear_object (&priv->istream);
+       g_clear_object (&priv->ostream);
+
        if (priv->local_addr)
                g_object_unref (priv->local_addr);
        if (priv->remote_addr)
@@ -171,9 +173,6 @@ finalize (GObject *object)
        if (priv->async_context)
                g_main_context_unref (priv->async_context);
 
-       if (priv->read_buf)
-               g_byte_array_free (priv->read_buf, TRUE);
-
        g_mutex_clear (&priv->addrlock);
        g_mutex_clear (&priv->iolock);
 
@@ -208,7 +207,7 @@ soup_socket_class_init (SoupSocketClass *socket_class)
                              G_SIGNAL_RUN_LAST,
                              G_STRUCT_OFFSET (SoupSocketClass, readable),
                              NULL, NULL,
-                             soup_marshal_NONE__NONE,
+                             _soup_marshal_NONE__NONE,
                              G_TYPE_NONE, 0);
 
        /**
@@ -224,7 +223,7 @@ soup_socket_class_init (SoupSocketClass *socket_class)
                              G_SIGNAL_RUN_LAST,
                              G_STRUCT_OFFSET (SoupSocketClass, writable),
                              NULL, NULL,
-                             soup_marshal_NONE__NONE,
+                             _soup_marshal_NONE__NONE,
                              G_TYPE_NONE, 0);
 
        /**
@@ -240,7 +239,7 @@ soup_socket_class_init (SoupSocketClass *socket_class)
                              G_SIGNAL_RUN_LAST,
                              G_STRUCT_OFFSET (SoupSocketClass, disconnected),
                              NULL, NULL,
-                             soup_marshal_NONE__NONE,
+                             _soup_marshal_NONE__NONE,
                              G_TYPE_NONE, 0);
 
        /**
@@ -260,9 +259,31 @@ soup_socket_class_init (SoupSocketClass *socket_class)
                              G_SIGNAL_RUN_FIRST,
                              G_STRUCT_OFFSET (SoupSocketClass, new_connection),
                              NULL, NULL,
-                             soup_marshal_NONE__OBJECT,
+                             _soup_marshal_NONE__OBJECT,
                              G_TYPE_NONE, 1,
                              SOUP_TYPE_SOCKET);
+       /**
+        * SoupSocket::event:
+        * @sock: the socket
+        * @event: the event that occurred
+        * @connection: the current connection state
+        *
+        * Emitted when a network-related event occurs. See
+        * #GSocketClient::event for more details.
+        *
+        * Since: 2.38
+        **/
+       signals[EVENT] =
+               g_signal_new ("event",
+                             G_OBJECT_CLASS_TYPE (object_class),
+                             G_SIGNAL_RUN_LAST,
+                             0,
+                             NULL, NULL,
+                             NULL,
+                             G_TYPE_NONE, 2,
+                             G_TYPE_SOCKET_CLIENT_EVENT,
+                             G_TYPE_IO_STREAM);
+
 
        /* properties */
        /**
@@ -497,9 +518,9 @@ finish_socket_setup (SoupSocketPrivate *priv)
        if (!priv->conn)
                priv->conn = (GIOStream *)g_socket_connection_factory_create_connection (priv->gsock);
        if (!priv->istream)
-               priv->istream = G_POLLABLE_INPUT_STREAM (g_io_stream_get_input_stream (priv->conn));
+               priv->istream = soup_filter_input_stream_new (g_io_stream_get_input_stream (priv->conn));
        if (!priv->ostream)
-               priv->ostream = G_POLLABLE_OUTPUT_STREAM (g_io_stream_get_output_stream (priv->conn));
+               priv->ostream = g_object_ref (g_io_stream_get_output_stream (priv->conn));
 
        g_socket_set_timeout (priv->gsock, priv->timeout);
 }
@@ -630,13 +651,32 @@ soup_socket_new (const char *optname1, ...)
        return sock;
 }
 
+static void
+proxy_socket_client_event (GSocketClient       *client,
+                          GSocketClientEvent   event,
+                          GSocketConnectable  *connectable,
+                          GIOStream           *connection,
+                          gpointer             user_data)
+{
+       SoupSocket *sock = user_data;
+
+       g_signal_emit (sock, signals[EVENT], 0,
+                      event, connection);
+}
+
 static guint
 socket_connected (SoupSocket *sock, GSocketConnection *conn, GError *error)
 {
        SoupSocketPrivate *priv = SOUP_SOCKET_GET_PRIVATE (sock);
 
-       g_object_unref (priv->connect_cancel);
-       priv->connect_cancel = NULL;
+       if (priv->connect_cancel) {
+               GCancellable *cancellable = priv->connect_cancel;
+
+               g_object_unref (priv->connect_cancel);
+               priv->connect_cancel = NULL;
+               if (g_cancellable_is_cancelled (cancellable))
+                       return SOUP_STATUS_CANCELLED;
+       }
 
        if (error) {
                if (error->domain == G_RESOLVER_ERROR) {
@@ -729,6 +769,8 @@ soup_socket_connect_async (SoupSocket *sock, GCancellable *cancellable,
                g_main_context_push_thread_default (priv->async_context);
 
        client = g_socket_client_new ();
+       g_signal_connect (client, "event",
+                         G_CALLBACK (proxy_socket_client_event), sock);
        if (priv->timeout)
                g_socket_client_set_timeout (client, priv->timeout);
        g_socket_client_connect_async (client,
@@ -772,6 +814,8 @@ soup_socket_connect_sync (SoupSocket *sock, GCancellable *cancellable)
        priv->connect_cancel = cancellable;
 
        client = g_socket_client_new ();
+       g_signal_connect (client, "event",
+                         G_CALLBACK (proxy_socket_client_event), sock);
        if (priv->timeout)
                g_socket_client_set_timeout (client, priv->timeout);
        conn = g_socket_client_connect (client,
@@ -798,6 +842,14 @@ soup_socket_get_gsocket (SoupSocket *sock)
        return SOUP_SOCKET_GET_PRIVATE (sock)->gsock;
 }
 
+GIOStream *
+soup_socket_get_iostream (SoupSocket *sock)
+{
+       g_return_val_if_fail (SOUP_IS_SOCKET (sock), NULL);
+
+       return SOUP_SOCKET_GET_PRIVATE (sock)->conn;
+}
+
 static GSource *
 soup_socket_create_watch (SoupSocketPrivate *priv, GIOCondition cond,
                          GPollableSourceFunc callback, gpointer user_data,
@@ -807,9 +859,9 @@ soup_socket_create_watch (SoupSocketPrivate *priv, GIOCondition cond,
        GMainContext *async_context;
 
        if (cond == G_IO_IN)
-               watch = g_pollable_input_stream_create_source (priv->istream, cancellable);
+               watch = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (priv->istream), cancellable);
        else
-               watch = g_pollable_output_stream_create_source (priv->ostream, cancellable);
+               watch = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (priv->ostream), cancellable);
        g_source_set_callback (watch, (GSourceFunc)callback, user_data, NULL);
 
        if (priv->use_thread_context)
@@ -920,7 +972,7 @@ soup_socket_listen (SoupSocket *sock)
 
  cant_listen:
        if (priv->conn)
-               disconnect_internal (sock);
+               disconnect_internal (sock, TRUE);
        g_object_unref (addr);
 
        return FALSE;
@@ -983,6 +1035,9 @@ soup_socket_start_proxy_ssl (SoupSocket *sock, const char *ssl_host,
        if (G_IS_TLS_CONNECTION (priv->conn))
                return TRUE;
 
+       if (g_cancellable_is_cancelled (cancellable))
+               return FALSE;
+
        priv->ssl = TRUE;
 
        if (!priv->is_server) {
@@ -1031,8 +1086,13 @@ soup_socket_start_proxy_ssl (SoupSocket *sock, const char *ssl_host,
        g_signal_connect (priv->conn, "notify::peer-certificate",
                          G_CALLBACK (soup_socket_peer_certificate_changed), sock);
 
-       priv->istream = G_POLLABLE_INPUT_STREAM (g_io_stream_get_input_stream (priv->conn));
-       priv->ostream = G_POLLABLE_OUTPUT_STREAM (g_io_stream_get_output_stream (priv->conn));
+       if (priv->istream)
+               g_object_unref (priv->istream);
+       if (priv->ostream)
+               g_object_unref (priv->ostream);
+
+       priv->istream = soup_filter_input_stream_new (g_io_stream_get_input_stream (priv->conn));
+       priv->ostream = g_object_ref (g_io_stream_get_output_stream (priv->conn));
        return TRUE;
 }
        
@@ -1068,7 +1128,7 @@ handshake_async_ready (GObject *source, GAsyncResult *result, gpointer user_data
        if (priv->async_context && !priv->use_thread_context)
                g_main_context_pop_thread_default (priv->async_context);
 
-       if (g_tls_connection_handshake_finish (G_TLS_CONNECTION (priv->conn),
+       if (g_tls_connection_handshake_finish (G_TLS_CONNECTION (source),
                                               result, &error))
                status = SOUP_STATUS_OK;
        else if (!priv->ssl_fallback &&
@@ -1140,11 +1200,12 @@ soup_socket_disconnect (SoupSocket *sock)
        priv = SOUP_SOCKET_GET_PRIVATE (sock);
 
        if (priv->connect_cancel) {
+               disconnect_internal (sock, FALSE);
                g_cancellable_cancel (priv->connect_cancel);
                return;
        } else if (g_mutex_trylock (&priv->iolock)) {
                if (priv->conn)
-                       disconnect_internal (sock);
+                       disconnect_internal (sock, TRUE);
                else
                        already_disconnected = TRUE;
                g_mutex_unlock (&priv->iolock);
@@ -1262,6 +1323,22 @@ soup_socket_get_remote_address (SoupSocket *sock)
        return priv->remote_addr;
 }
 
+GInputStream *
+soup_socket_get_input_stream (SoupSocket *sock)
+{
+       g_return_val_if_fail (SOUP_IS_SOCKET (sock), NULL);
+
+       return SOUP_SOCKET_GET_PRIVATE (sock)->istream;
+}
+
+GOutputStream *
+soup_socket_get_output_stream (SoupSocket *sock)
+{
+       g_return_val_if_fail (SOUP_IS_SOCKET (sock), NULL);
+
+       return SOUP_SOCKET_GET_PRIVATE (sock)->ostream;
+}
+
 
 static gboolean
 socket_read_watch (GObject *pollable, gpointer user_data)
@@ -1275,34 +1352,18 @@ socket_read_watch (GObject *pollable, gpointer user_data)
 }
 
 static SoupSocketIOStatus
-read_from_network (SoupSocket *sock, gpointer buffer, gsize len,
-                  gsize *nread, GCancellable *cancellable, GError **error)
+translate_read_status (SoupSocket *sock, GCancellable *cancellable,
+                      gssize my_nread, gsize *nread,
+                      GError *my_err, GError **error)
 {
        SoupSocketPrivate *priv = SOUP_SOCKET_GET_PRIVATE (sock);
-       GError *my_err = NULL;
-       gssize my_nread;
-
-       *nread = 0;
-
-       if (!priv->conn)
-               return SOUP_SOCKET_EOF;
-
-       if (!priv->non_blocking) {
-               my_nread = g_input_stream_read (G_INPUT_STREAM (priv->istream),
-                                               buffer, len,
-                                               cancellable, &my_err);
-       } else {
-               my_nread = g_pollable_input_stream_read_nonblocking (
-                       priv->istream, buffer, len,
-                       cancellable, &my_err);
-       }
 
        if (my_nread > 0) {
-               g_clear_error (&my_err);
+               g_assert_no_error (my_err);
                *nread = my_nread;
                return SOUP_SOCKET_OK;
        } else if (my_nread == 0) {
-               g_clear_error (&my_err);
+               g_assert_no_error (my_err);
                *nread = my_nread;
                return SOUP_SOCKET_EOF;
        } else if (g_error_matches (my_err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
@@ -1320,27 +1381,6 @@ read_from_network (SoupSocket *sock, gpointer buffer, gsize len,
        return SOUP_SOCKET_ERROR;
 }
 
-static SoupSocketIOStatus
-read_from_buf (SoupSocket *sock, gpointer buffer, gsize len, gsize *nread)
-{
-       SoupSocketPrivate *priv = SOUP_SOCKET_GET_PRIVATE (sock);
-       GByteArray *read_buf = priv->read_buf;
-
-       *nread = MIN (read_buf->len, len);
-       memcpy (buffer, read_buf->data, *nread);
-
-       if (*nread == read_buf->len) {
-               g_byte_array_free (read_buf, TRUE);
-               priv->read_buf = NULL;
-       } else {
-               memmove (read_buf->data, read_buf->data + *nread, 
-                        read_buf->len - *nread);
-               g_byte_array_set_size (read_buf, read_buf->len - *nread);
-       }
-
-       return SOUP_SOCKET_OK;
-}
-
 /**
  * SoupSocketIOStatus:
  * @SOUP_SOCKET_OK: Success
@@ -1384,6 +1424,8 @@ soup_socket_read (SoupSocket *sock, gpointer buffer, gsize len,
 {
        SoupSocketPrivate *priv;
        SoupSocketIOStatus status;
+       gssize my_nread;
+       GError *my_err = NULL;
 
        g_return_val_if_fail (SOUP_IS_SOCKET (sock), SOUP_SOCKET_ERROR);
        g_return_val_if_fail (nread != NULL, SOUP_SOCKET_ERROR);
@@ -1391,10 +1433,24 @@ soup_socket_read (SoupSocket *sock, gpointer buffer, gsize len,
        priv = SOUP_SOCKET_GET_PRIVATE (sock);
 
        g_mutex_lock (&priv->iolock);
-       if (priv->read_buf)
-               status = read_from_buf (sock, buffer, len, nread);
-       else
-               status = read_from_network (sock, buffer, len, nread, cancellable, error);
+
+       if (!priv->istream) {
+               status = SOUP_SOCKET_EOF;
+               goto out;
+       }
+
+       if (!priv->non_blocking) {
+               my_nread = g_input_stream_read (priv->istream, buffer, len,
+                                               cancellable, &my_err);
+       } else {
+               my_nread = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (priv->istream),
+                                                                    buffer, len,
+                                                                    cancellable, &my_err);
+       }
+       status = translate_read_status (sock, cancellable,
+                                       my_nread, nread, my_err, error);
+
+out:
        g_mutex_unlock (&priv->iolock);
 
        return status;
@@ -1436,9 +1492,8 @@ soup_socket_read_until (SoupSocket *sock, gpointer buffer, gsize len,
 {
        SoupSocketPrivate *priv;
        SoupSocketIOStatus status;
-       GByteArray *read_buf;
-       guint match_len, prev_len;
-       guint8 *p, *end;
+       gssize my_nread;
+       GError *my_err = NULL;
 
        g_return_val_if_fail (SOUP_IS_SOCKET (sock), SOUP_SOCKET_ERROR);
        g_return_val_if_fail (nread != NULL, SOUP_SOCKET_ERROR);
@@ -1450,41 +1505,18 @@ soup_socket_read_until (SoupSocket *sock, gpointer buffer, gsize len,
 
        *got_boundary = FALSE;
 
-       if (!priv->read_buf)
-               priv->read_buf = g_byte_array_new ();
-       read_buf = priv->read_buf;
-
-       if (read_buf->len < boundary_len) {
-               prev_len = read_buf->len;
-               g_byte_array_set_size (read_buf, len);
-               status = read_from_network (sock,
-                                           read_buf->data + prev_len,
-                                           len - prev_len, nread, cancellable, error);
-               read_buf->len = prev_len + *nread;
-
-               if (status != SOUP_SOCKET_OK) {
-                       g_mutex_unlock (&priv->iolock);
-                       return status;
-               }
-       }
-
-       /* Scan for the boundary */
-       end = read_buf->data + read_buf->len;
-       for (p = read_buf->data; p <= end - boundary_len; p++) {
-               if (!memcmp (p, boundary, boundary_len)) {
-                       p += boundary_len;
-                       *got_boundary = TRUE;
-                       break;
-               }
+       if (!priv->istream)
+               status = SOUP_SOCKET_EOF;
+       else {
+               my_nread = soup_filter_input_stream_read_until (
+                       SOUP_FILTER_INPUT_STREAM (priv->istream),
+                       buffer, len, boundary, boundary_len,
+                       !priv->non_blocking,
+                       got_boundary, cancellable, &my_err);
+               status = translate_read_status (sock, cancellable,
+                                               my_nread, nread, my_err, error);
        }
 
-       /* Return everything up to 'p' (which is either just after the
-        * boundary, or @boundary_len - 1 bytes before the end of the
-        * buffer).
-        */
-       match_len = p - read_buf->data;
-       status = read_from_buf (sock, buffer, MIN (len, match_len), nread);
-
        g_mutex_unlock (&priv->iolock);
        return status;
 }
@@ -1552,13 +1584,13 @@ soup_socket_write (SoupSocket *sock, gconstpointer buffer,
        }
 
        if (!priv->non_blocking) {
-               my_nwrote = g_output_stream_write (G_OUTPUT_STREAM (priv->ostream),
+               my_nwrote = g_output_stream_write (priv->ostream,
                                                   buffer, len,
                                                   cancellable, &my_err);
        } else {
                my_nwrote = g_pollable_output_stream_write_nonblocking (
-                       priv->ostream, buffer, len,
-                       cancellable, &my_err);
+                       G_POLLABLE_OUTPUT_STREAM (priv->ostream),
+                       buffer, len, cancellable, &my_err);
        }
 
        if (my_nwrote > 0) {