Make this more complicated, with a SoupMessageQueueItem to keep track of
[platform/upstream/libsoup.git] / libsoup / soup-session-async.c
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * soup-session-async.c
4  *
5  * Copyright (C) 2000-2003, Ximian, Inc.
6  */
7
8 #ifdef HAVE_CONFIG_H
9 #include <config.h>
10 #endif
11
12 #include "soup-session-async.h"
13 #include "soup-session-private.h"
14 #include "soup-message-private.h"
15 #include "soup-misc.h"
16
17 /**
18  * SECTION:soup-session-async
19  * @short_description: Soup session for asynchronous (main-loop-based) I/O.
20  *
21  * #SoupSessionAsync is an implementation of #SoupSession that uses
22  * non-blocking I/O via the glib main loop. It is intended for use in
23  * single-threaded programs.
24  **/
25
26 static gboolean run_queue (SoupSessionAsync *sa);
27 static void do_idle_run_queue (SoupSession *session);
28
29 static void  queue_message   (SoupSession *session, SoupMessage *req,
30                               SoupSessionCallback callback, gpointer user_data);
31 static guint send_message    (SoupSession *session, SoupMessage *req);
32
33 G_DEFINE_TYPE (SoupSessionAsync, soup_session_async, SOUP_TYPE_SESSION)
34
35 typedef struct {
36         GSource *idle_run_queue_source;
37 } SoupSessionAsyncPrivate;
38 #define SOUP_SESSION_ASYNC_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_SESSION_ASYNC, SoupSessionAsyncPrivate))
39
40 static void
41 soup_session_async_init (SoupSessionAsync *sa)
42 {
43 }
44
45 static void
46 finalize (GObject *object)
47 {
48         SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (object);
49
50         if (priv->idle_run_queue_source)
51                 g_source_destroy (priv->idle_run_queue_source);
52
53         G_OBJECT_CLASS (soup_session_async_parent_class)->finalize (object);
54 }
55
56 static void
57 soup_session_async_class_init (SoupSessionAsyncClass *soup_session_async_class)
58 {
59         SoupSessionClass *session_class = SOUP_SESSION_CLASS (soup_session_async_class);
60         GObjectClass *object_class = G_OBJECT_CLASS (session_class);
61
62         g_type_class_add_private (soup_session_async_class,
63                                   sizeof (SoupSessionAsyncPrivate));
64
65         /* virtual method override */
66         session_class->queue_message = queue_message;
67         session_class->send_message = send_message;
68
69         object_class->finalize = finalize;
70 }
71
72
73 /**
74  * soup_session_async_new:
75  *
76  * Creates an asynchronous #SoupSession with the default options.
77  *
78  * Return value: the new session.
79  **/
80 SoupSession *
81 soup_session_async_new (void)
82 {
83         return g_object_new (SOUP_TYPE_SESSION_ASYNC, NULL);
84 }
85
86 /**
87  * soup_session_async_new_with_options:
88  * @optname1: name of first property to set
89  * @...: value of @optname1, followed by additional property/value pairs
90  *
91  * Creates an asynchronous #SoupSession with the specified options.
92  *
93  * Return value: the new session.
94  **/
95 SoupSession *
96 soup_session_async_new_with_options (const char *optname1, ...)
97 {
98         SoupSession *session;
99         va_list ap;
100
101         va_start (ap, optname1);
102         session = (SoupSession *)g_object_new_valist (SOUP_TYPE_SESSION_ASYNC,
103                                                       optname1, ap);
104         va_end (ap);
105
106         return session;
107 }
108
109
110 static void
111 connection_closed (SoupConnection *conn, gpointer session)
112 {
113         /* Run the queue in case anyone was waiting for a connection
114          * to be closed.
115          */
116         do_idle_run_queue (session);
117 }
118
119 static void
120 got_connection (SoupConnection *conn, guint status, gpointer user_data)
121 {
122         SoupSession *session = user_data;
123
124         if (status == SOUP_STATUS_OK) {
125                 g_signal_connect (conn, "disconnected",
126                                   G_CALLBACK (connection_closed), session);
127
128                 /* @conn has been marked reserved by SoupSession, but
129                  * we don't actually have any specific message in mind
130                  * for it. (In particular, the message we were
131                  * originally planning to queue on it may have already
132                  * been queued on some other connection that became
133                  * available while we were waiting for this one to
134                  * connect.) So we release the connection into the
135                  * idle pool and then just run the queue and see what
136                  * happens.
137                  */
138                 soup_connection_release (conn);
139         }
140
141         /* Even if the connection failed, we run the queue, since
142          * there may have been messages waiting for the connection
143          * count to go down.
144          */
145         do_idle_run_queue (session);
146         g_object_unref (session);
147 }
148
149 static gboolean
150 run_queue (SoupSessionAsync *sa)
151 {
152         SoupSession *session = SOUP_SESSION (sa);
153         SoupMessageQueue *queue = soup_session_get_queue (session);
154         SoupMessageQueueItem *item;
155         SoupMessage *msg;
156         SoupConnection *conn;
157         gboolean try_pruning = TRUE, should_prune = FALSE;
158         gboolean started_any = FALSE, is_new;
159
160         /* FIXME: prefer CONNECTING messages */
161
162  try_again:
163         for (item = soup_message_queue_first (queue);
164              item && !should_prune;
165              item = soup_message_queue_next (queue, item)) {
166                 msg = item->msg;
167
168                 if (!SOUP_MESSAGE_IS_STARTING (msg) ||
169                     soup_message_io_in_progress (msg))
170                         continue;
171
172                 conn = soup_session_get_connection (session, msg,
173                                                     &should_prune, &is_new);
174                 if (!conn)
175                         continue;
176
177                 if (is_new) {
178                         soup_connection_connect_async (conn, got_connection,
179                                                        g_object_ref (session));
180                 } else
181                         soup_connection_send_request (conn, msg);
182         }
183         if (item)
184                 soup_message_queue_item_unref (item);
185
186         if (try_pruning && should_prune) {
187                 /* There is at least one message in the queue that
188                  * could be sent if we pruned an idle connection from
189                  * some other server.
190                  */
191                 if (soup_session_try_prune_connection (session)) {
192                         try_pruning = should_prune = FALSE;
193                         goto try_again;
194                 }
195         }
196
197         return started_any;
198 }
199
200 static void
201 request_restarted (SoupMessage *req, gpointer sa)
202 {
203         run_queue (sa);
204 }
205
206 static void
207 final_finished (SoupMessage *req, gpointer user_data)
208 {
209         SoupMessageQueueItem *item = user_data;
210         SoupSession *session = item->session;
211
212         g_object_ref (session);
213
214         if (!SOUP_MESSAGE_IS_STARTING (req)) {
215                 g_signal_handlers_disconnect_by_func (req, final_finished, item);
216                 if (item->callback)
217                         item->callback (session, req, item->callback_data);
218
219                 g_object_unref (req);
220                 soup_message_queue_item_unref (item);
221         }
222
223         do_idle_run_queue (session);
224         g_object_unref (session);
225 }
226
227 static gboolean
228 idle_run_queue (gpointer sa)
229 {
230         SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (sa);
231
232         priv->idle_run_queue_source = NULL;
233         run_queue (sa);
234         return FALSE;
235 }
236
237 static void
238 do_idle_run_queue (SoupSession *session)
239 {
240         SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (session);
241
242         if (!priv->idle_run_queue_source) {
243                 priv->idle_run_queue_source = soup_add_completion (
244                         soup_session_get_async_context (session),
245                         idle_run_queue, session);
246         }
247 }
248
249 static void
250 queue_message (SoupSession *session, SoupMessage *req,
251                SoupSessionCallback callback, gpointer user_data)
252 {
253         SoupMessageQueueItem *item;
254
255         SOUP_SESSION_CLASS (soup_session_async_parent_class)->queue_message (session, req, callback, user_data);
256
257         item = soup_message_queue_lookup (soup_session_get_queue (session), req);
258         g_return_if_fail (item != NULL);
259
260         g_signal_connect (req, "restarted",
261                           G_CALLBACK (request_restarted), session);
262         g_signal_connect_after (req, "finished",
263                                 G_CALLBACK (final_finished), item);
264
265         do_idle_run_queue (session);
266 }
267
268 static guint
269 send_message (SoupSession *session, SoupMessage *req)
270 {
271         GMainContext *async_context =
272                 soup_session_get_async_context (session);
273
274         /* Balance out the unref that final_finished will do */
275         g_object_ref (req);
276
277         queue_message (session, req, NULL, NULL);
278
279         while (soup_message_get_io_status (req) != SOUP_MESSAGE_IO_STATUS_FINISHED &&
280                !SOUP_STATUS_IS_TRANSPORT_ERROR (req->status_code))
281                 g_main_context_iteration (async_context, TRUE);
282
283         return req->status_code;
284 }