1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
5 * Copyright (C) 2000-2003, Ximian, Inc.
12 #define LIBSOUP_I_HAVE_READ_BUG_594377_AND_KNOW_SOUP_PASSWORD_MANAGER_MIGHT_GO_AWAY
14 #include "soup-address.h"
15 #include "soup-session-sync.h"
16 #include "soup-session-private.h"
17 #include "soup-address.h"
18 #include "soup-message-private.h"
19 #include "soup-message-queue.h"
20 #include "soup-misc.h"
21 #include "soup-password-manager.h"
25 * SECTION:soup-session-sync
26 * @short_description: Soup session for blocking I/O in multithreaded
29 * #SoupSessionSync is an implementation of #SoupSession that uses
30 * synchronous I/O, intended for use in multi-threaded programs.
32 * You can use #SoupSessionSync from multiple threads concurrently.
33 * Eg, you can send a #SoupMessage in one thread, and then while
34 * waiting for the response, send another #SoupMessage from another
35 * thread. You can also send a message from one thread and then call
36 * soup_session_cancel_message() on it from any other thread (although
37 * you need to be careful to avoid race conditions, where the message
38 * finishes and is then unreffed by the sending thread just before you
41 * However, the majority of other types and methods in libsoup are not
42 * MT-safe. In particular, you <emphasis>cannot</emphasis> modify or
43 * examine a #SoupMessage while it is being transmitted by
44 * #SoupSessionSync in another thread. Once a message has been handed
45 * off to #SoupSessionSync, it can only be manipulated from its signal
46 * handler callbacks, until I/O is complete.
52 } SoupSessionSyncPrivate;
53 #define SOUP_SESSION_SYNC_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_SESSION_SYNC, SoupSessionSyncPrivate))
55 static void queue_message (SoupSession *session, SoupMessage *msg,
56 SoupSessionCallback callback, gpointer user_data);
57 static guint send_message (SoupSession *session, SoupMessage *msg);
58 static void cancel_message (SoupSession *session, SoupMessage *msg,
60 static void auth_required (SoupSession *session, SoupMessage *msg,
61 SoupAuth *auth, gboolean retrying);
62 static void flush_queue (SoupSession *session);
63 static void kick (SoupSession *session);
65 G_DEFINE_TYPE (SoupSessionSync, soup_session_sync, SOUP_TYPE_SESSION)
68 soup_session_sync_init (SoupSessionSync *ss)
70 SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (ss);
72 g_mutex_init (&priv->lock);
73 g_cond_init (&priv->cond);
77 finalize (GObject *object)
79 SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (object);
81 g_mutex_clear (&priv->lock);
82 g_cond_clear (&priv->cond);
84 G_OBJECT_CLASS (soup_session_sync_parent_class)->finalize (object);
88 soup_session_sync_class_init (SoupSessionSyncClass *session_sync_class)
90 GObjectClass *object_class = G_OBJECT_CLASS (session_sync_class);
91 SoupSessionClass *session_class = SOUP_SESSION_CLASS (session_sync_class);
93 g_type_class_add_private (session_sync_class, sizeof (SoupSessionSyncPrivate));
95 /* virtual method override */
96 session_class->queue_message = queue_message;
97 session_class->send_message = send_message;
98 session_class->cancel_message = cancel_message;
99 session_class->auth_required = auth_required;
100 session_class->flush_queue = flush_queue;
101 session_class->kick = kick;
103 object_class->finalize = finalize;
108 * soup_session_sync_new:
110 * Creates an synchronous #SoupSession with the default options.
112 * Return value: the new session.
115 soup_session_sync_new (void)
117 return g_object_new (SOUP_TYPE_SESSION_SYNC, NULL);
121 * soup_session_sync_new_with_options:
122 * @optname1: name of first property to set
123 * @...: value of @optname1, followed by additional property/value pairs
125 * Creates an synchronous #SoupSession with the specified options.
127 * Return value: the new session.
130 soup_session_sync_new_with_options (const char *optname1, ...)
132 SoupSession *session;
135 va_start (ap, optname1);
136 session = (SoupSession *)g_object_new_valist (SOUP_TYPE_SESSION_SYNC,
144 tunnel_connect (SoupSession *session, SoupMessageQueueItem *related)
146 SoupConnection *conn = related->conn;
147 SoupMessageQueueItem *item;
152 item = soup_session_make_connect_message (session, conn);
154 soup_session_send_queue_item (session, item, NULL);
155 status = item->msg->status_code;
156 if (item->state == SOUP_MESSAGE_RESTARTING &&
157 soup_message_io_in_progress (item->msg)) {
158 soup_message_restarted (item->msg);
159 item->state = SOUP_MESSAGE_RUNNING;
161 if (item->state == SOUP_MESSAGE_RESTARTING)
162 status = SOUP_STATUS_TRY_AGAIN;
163 item->state = SOUP_MESSAGE_FINISHED;
164 soup_message_finished (item->msg);
166 } while (item->state == SOUP_MESSAGE_STARTING);
167 soup_session_unqueue_item (session, item);
168 soup_message_queue_item_unref (item);
170 if (SOUP_STATUS_IS_SUCCESSFUL (status)) {
171 if (!soup_connection_start_ssl_sync (conn, related->cancellable))
172 status = SOUP_STATUS_SSL_FAILED;
173 soup_message_set_https_status (related->msg, conn);
176 if (!SOUP_STATUS_IS_SUCCESSFUL (status))
177 soup_connection_disconnect (conn);
179 g_object_unref (conn);
184 get_connection (SoupMessageQueueItem *item)
186 SoupSession *session = item->session;
187 SoupMessage *msg = item->msg;
188 gboolean try_pruning = FALSE;
192 soup_session_cleanup_connections (session, FALSE);
194 if (!soup_session_get_connection (session, item, &try_pruning)) {
197 soup_session_cleanup_connections (session, TRUE);
198 if (!soup_session_get_connection (session, item, &try_pruning))
203 if (soup_connection_get_state (item->conn) != SOUP_CONNECTION_NEW) {
204 item->state = SOUP_MESSAGE_READY;
208 status = soup_connection_connect_sync (item->conn, item->cancellable);
209 if (status == SOUP_STATUS_TRY_AGAIN) {
210 soup_connection_disconnect (item->conn);
211 soup_message_queue_item_set_connection (item, NULL);
215 soup_message_set_https_status (msg, item->conn);
217 if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
218 if (!msg->status_code)
219 soup_session_set_item_status (session, item, status);
220 item->state = SOUP_MESSAGE_FINISHING;
221 soup_connection_disconnect (item->conn);
222 soup_message_queue_item_set_connection (item, NULL);
226 if (soup_connection_is_tunnelled (item->conn)) {
227 status = tunnel_connect (session, item);
228 if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
229 soup_connection_disconnect (item->conn);
230 soup_message_queue_item_set_connection (item, NULL);
231 if (status == SOUP_STATUS_TRY_AGAIN)
233 soup_session_set_item_status (session, item, status);
234 item->state = SOUP_MESSAGE_FINISHING;
239 item->state = SOUP_MESSAGE_READY;
242 static void process_queue_item (SoupMessageQueueItem *item);
245 new_api_message_completed (SoupMessage *msg, gpointer user_data)
247 SoupMessageQueueItem *item = user_data;
249 if (item->state != SOUP_MESSAGE_RESTARTING) {
250 item->state = SOUP_MESSAGE_FINISHING;
251 process_queue_item (item);
256 process_queue_item (SoupMessageQueueItem *item)
258 SoupSession *session = item->session;
259 SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
261 soup_message_queue_item_ref (item);
265 g_mutex_lock (&priv->lock);
267 g_cond_wait (&priv->cond, &priv->lock);
268 g_mutex_unlock (&priv->lock);
271 switch (item->state) {
272 case SOUP_MESSAGE_STARTING:
273 item->state = SOUP_MESSAGE_AWAITING_CONNECTION;
276 case SOUP_MESSAGE_AWAITING_CONNECTION:
277 g_mutex_lock (&priv->lock);
279 get_connection (item);
280 if (item->state == SOUP_MESSAGE_AWAITING_CONNECTION)
281 g_cond_wait (&priv->cond, &priv->lock);
282 } while (item->state == SOUP_MESSAGE_AWAITING_CONNECTION);
283 g_mutex_unlock (&priv->lock);
286 case SOUP_MESSAGE_READY:
287 item->state = SOUP_MESSAGE_RUNNING;
290 soup_session_send_queue_item (item->session, item, new_api_message_completed);
294 soup_session_send_queue_item (item->session, item, NULL);
295 if (item->state != SOUP_MESSAGE_RESTARTING)
296 item->state = SOUP_MESSAGE_FINISHING;
299 case SOUP_MESSAGE_RUNNING:
300 g_warn_if_fail (item->new_api);
301 item->state = SOUP_MESSAGE_FINISHING;
304 case SOUP_MESSAGE_RESTARTING:
305 item->state = SOUP_MESSAGE_STARTING;
306 soup_message_restarted (item->msg);
309 case SOUP_MESSAGE_FINISHING:
310 item->state = SOUP_MESSAGE_FINISHED;
311 soup_message_finished (item->msg);
312 soup_session_unqueue_item (session, item);
313 g_cond_broadcast (&priv->cond);
317 g_warn_if_reached ();
318 item->state = SOUP_MESSAGE_FINISHING;
321 } while (item->state != SOUP_MESSAGE_FINISHED);
324 soup_message_queue_item_unref (item);
328 queue_message_callback (gpointer data)
330 SoupMessageQueueItem *item = data;
332 item->callback (item->session, item->msg, item->callback_data);
333 g_object_unref (item->session);
334 g_object_unref (item->msg);
335 soup_message_queue_item_unref (item);
340 queue_message_thread (gpointer data)
342 SoupMessageQueueItem *item = data;
344 process_queue_item (item);
345 if (item->callback) {
346 soup_add_completion (soup_session_get_async_context (item->session),
347 queue_message_callback, item);
349 g_object_unref (item->session);
350 g_object_unref (item->msg);
351 soup_message_queue_item_unref (item);
358 queue_message (SoupSession *session, SoupMessage *msg,
359 SoupSessionCallback callback, gpointer user_data)
361 SoupMessageQueueItem *item;
364 SOUP_SESSION_CLASS (soup_session_sync_parent_class)->
365 queue_message (g_object_ref (session), msg, callback, user_data);
367 item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
368 g_return_if_fail (item != NULL);
370 thread = g_thread_new ("SoupSessionSync:queue_message",
371 queue_message_thread, item);
372 g_thread_unref (thread);
376 send_message (SoupSession *session, SoupMessage *msg)
378 SoupMessageQueueItem *item;
381 SOUP_SESSION_CLASS (soup_session_sync_parent_class)->queue_message (session, msg, NULL, NULL);
383 item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
384 g_return_val_if_fail (item != NULL, SOUP_STATUS_MALFORMED);
386 process_queue_item (item);
387 status = msg->status_code;
388 soup_message_queue_item_unref (item);
393 cancel_message (SoupSession *session, SoupMessage *msg, guint status_code)
395 SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
397 g_mutex_lock (&priv->lock);
398 SOUP_SESSION_CLASS (soup_session_sync_parent_class)->cancel_message (session, msg, status_code);
399 g_cond_broadcast (&priv->cond);
400 g_mutex_unlock (&priv->lock);
404 auth_required (SoupSession *session, SoupMessage *msg,
405 SoupAuth *auth, gboolean retrying)
407 SoupSessionFeature *password_manager;
409 password_manager = soup_session_get_feature_for_message (
410 session, SOUP_TYPE_PASSWORD_MANAGER, msg);
411 if (password_manager) {
412 soup_password_manager_get_passwords_sync (
413 SOUP_PASSWORD_MANAGER (password_manager),
414 msg, auth, NULL); /* FIXME cancellable */
417 SOUP_SESSION_CLASS (soup_session_sync_parent_class)->
418 auth_required (session, msg, auth, retrying);
422 flush_queue (SoupSession *session)
424 SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
425 SoupMessageQueue *queue;
426 SoupMessageQueueItem *item;
428 gboolean done = FALSE;
430 /* Record the current contents of the queue */
431 current = g_hash_table_new (NULL, NULL);
432 queue = soup_session_get_queue (session);
433 for (item = soup_message_queue_first (queue);
435 item = soup_message_queue_next (queue, item))
436 g_hash_table_insert (current, item, item);
438 /* Cancel everything */
439 SOUP_SESSION_CLASS (soup_session_sync_parent_class)->flush_queue (session);
441 /* Wait until all of the items in @current have been removed
442 * from the queue. (This is not the same as "wait for the
443 * queue to be empty", because the app may queue new requests
444 * in response to the cancellation of the old ones. We don't
445 * try to cancel those requests as well, since we'd likely
446 * just end up looping forever.)
448 g_mutex_lock (&priv->lock);
451 for (item = soup_message_queue_first (queue);
453 item = soup_message_queue_next (queue, item)) {
454 if (g_hash_table_lookup (current, item))
459 g_cond_wait (&priv->cond, &priv->lock);
461 g_mutex_unlock (&priv->lock);
463 g_hash_table_destroy (current);
467 kick (SoupSession *session)
469 SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
471 g_mutex_lock (&priv->lock);
472 g_cond_broadcast (&priv->cond);
473 g_mutex_unlock (&priv->lock);
478 soup_session_send_request (SoupSession *session,
480 GCancellable *cancellable,
483 SoupMessageQueueItem *item;
484 GInputStream *stream = NULL;
485 GOutputStream *ostream;
486 GMemoryOutputStream *mostream;
488 GError *my_error = NULL;
490 g_return_val_if_fail (SOUP_IS_SESSION_SYNC (session), NULL);
492 SOUP_SESSION_CLASS (soup_session_sync_parent_class)->queue_message (session, msg, NULL, NULL);
494 item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
495 g_return_val_if_fail (item != NULL, NULL);
497 item->new_api = TRUE;
499 g_object_unref (item->cancellable);
500 item->cancellable = g_object_ref (cancellable);
504 /* Get a connection, etc */
505 process_queue_item (item);
506 if (item->state != SOUP_MESSAGE_RUNNING)
509 /* Send request, read headers */
510 if (!soup_message_io_run_until_read (msg, item->cancellable, &my_error))
513 stream = soup_message_io_get_response_istream (msg, &my_error);
517 /* Break if the message doesn't look likely-to-be-requeued */
518 if (msg->status_code != SOUP_STATUS_UNAUTHORIZED &&
519 msg->status_code != SOUP_STATUS_PROXY_UNAUTHORIZED &&
520 !soup_session_would_redirect (session, msg))
523 /* Gather the current message body... */
524 ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
525 if (g_output_stream_splice (ostream, stream,
526 G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
527 G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
528 item->cancellable, &my_error) == -1) {
529 g_object_unref (stream);
530 g_object_unref (ostream);
534 g_object_unref (stream);
537 /* If the message was requeued, loop */
538 if (item->state == SOUP_MESSAGE_RESTARTING) {
539 g_object_unref (ostream);
543 /* Not requeued, so return the original body */
544 mostream = G_MEMORY_OUTPUT_STREAM (ostream);
545 size = g_memory_output_stream_get_data_size (mostream);
546 stream = g_memory_input_stream_new ();
548 g_memory_input_stream_add_data (G_MEMORY_INPUT_STREAM (stream),
549 g_memory_output_stream_steal_data (mostream),
552 g_object_unref (ostream);
556 g_propagate_error (error, my_error);
557 else if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
559 g_object_unref (stream);
562 g_set_error_literal (error, SOUP_HTTP_ERROR, msg->status_code,
565 stream = g_memory_input_stream_new ();
568 if (soup_message_io_in_progress (msg))
569 soup_message_io_finished (msg);
570 else if (item->state != SOUP_MESSAGE_FINISHED)
571 item->state = SOUP_MESSAGE_FINISHING;
573 if (item->state != SOUP_MESSAGE_FINISHED)
574 process_queue_item (item);
577 soup_message_queue_item_unref (item);