gio: Add names to idles and timeouts
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: David Zeuthen <davidz@redhat.com>
19  */
20
21 #include "config.h"
22
23 #include <stdlib.h>
24 #include <string.h>
25
26 #include "giotypes.h"
27 #include "gsocket.h"
28 #include "gdbusprivate.h"
29 #include "gdbusmessage.h"
30 #include "gdbuserror.h"
31 #include "gdbusintrospection.h"
32 #include "gasyncresult.h"
33 #include "gsimpleasyncresult.h"
34 #include "ginputstream.h"
35 #include "gmemoryinputstream.h"
36 #include "giostream.h"
37 #include "glib/gstdio.h"
38 #include "gsocketcontrolmessage.h"
39 #include "gsocketconnection.h"
40 #include "gsocketoutputstream.h"
41
42 #ifdef G_OS_UNIX
43 #include "gunixfdmessage.h"
44 #include "gunixconnection.h"
45 #include "gunixcredentialsmessage.h"
46 #endif
47
48 #ifdef G_OS_WIN32
49 #include <windows.h>
50 #endif
51
52 #include "glibintl.h"
53
54 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
55
56 /* ---------------------------------------------------------------------------------------------------- */
57
58 gchar *
59 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
60 {
61  guint n, m;
62  GString *ret;
63
64  ret = g_string_new (NULL);
65
66  for (n = 0; n < len; n += 16)
67    {
68      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
69
70      for (m = n; m < n + 16; m++)
71        {
72          if (m > n && (m%4) == 0)
73            g_string_append_c (ret, ' ');
74          if (m < len)
75            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
76          else
77            g_string_append (ret, "   ");
78        }
79
80      g_string_append (ret, "   ");
81
82      for (m = n; m < len && m < n + 16; m++)
83        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
84
85      g_string_append_c (ret, '\n');
86    }
87
88  return g_string_free (ret, FALSE);
89 }
90
91 /* ---------------------------------------------------------------------------------------------------- */
92
93 /* Unfortunately ancillary messages are discarded when reading from a
94  * socket using the GSocketInputStream abstraction. So we provide a
95  * very GInputStream-ish API that uses GSocket in this case (very
96  * similar to GSocketInputStream).
97  */
98
99 typedef struct
100 {
101   GSocket *socket;
102   GCancellable *cancellable;
103
104   void *buffer;
105   gsize count;
106
107   GSocketControlMessage ***messages;
108   gint *num_messages;
109
110   GSimpleAsyncResult *simple;
111
112   gboolean from_mainloop;
113 } ReadWithControlData;
114
115 static void
116 read_with_control_data_free (ReadWithControlData *data)
117 {
118   g_object_unref (data->socket);
119   if (data->cancellable != NULL)
120     g_object_unref (data->cancellable);
121   g_object_unref (data->simple);
122   g_free (data);
123 }
124
125 static gboolean
126 _g_socket_read_with_control_messages_ready (GSocket      *socket,
127                                             GIOCondition  condition,
128                                             gpointer      user_data)
129 {
130   ReadWithControlData *data = user_data;
131   GError *error;
132   gssize result;
133   GInputVector vector;
134
135   error = NULL;
136   vector.buffer = data->buffer;
137   vector.size = data->count;
138   result = g_socket_receive_message (data->socket,
139                                      NULL, /* address */
140                                      &vector,
141                                      1,
142                                      data->messages,
143                                      data->num_messages,
144                                      NULL,
145                                      data->cancellable,
146                                      &error);
147   if (result >= 0)
148     {
149       g_simple_async_result_set_op_res_gssize (data->simple, result);
150     }
151   else
152     {
153       g_assert (error != NULL);
154       g_simple_async_result_take_error (data->simple, error);
155     }
156
157   if (data->from_mainloop)
158     g_simple_async_result_complete (data->simple);
159   else
160     g_simple_async_result_complete_in_idle (data->simple);
161
162   return FALSE;
163 }
164
165 static void
166 _g_socket_read_with_control_messages (GSocket                 *socket,
167                                       void                    *buffer,
168                                       gsize                    count,
169                                       GSocketControlMessage ***messages,
170                                       gint                    *num_messages,
171                                       gint                     io_priority,
172                                       GCancellable            *cancellable,
173                                       GAsyncReadyCallback      callback,
174                                       gpointer                 user_data)
175 {
176   ReadWithControlData *data;
177
178   data = g_new0 (ReadWithControlData, 1);
179   data->socket = g_object_ref (socket);
180   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
181   data->buffer = buffer;
182   data->count = count;
183   data->messages = messages;
184   data->num_messages = num_messages;
185
186   data->simple = g_simple_async_result_new (G_OBJECT (socket),
187                                             callback,
188                                             user_data,
189                                             _g_socket_read_with_control_messages);
190   g_simple_async_result_set_check_cancellable (data->simple, cancellable);
191
192   if (!g_socket_condition_check (socket, G_IO_IN))
193     {
194       GSource *source;
195       data->from_mainloop = TRUE;
196       source = g_socket_create_source (data->socket,
197                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
198                                        cancellable);
199       g_source_set_callback (source,
200                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
201                              data,
202                              (GDestroyNotify) read_with_control_data_free);
203       g_source_attach (source, g_main_context_get_thread_default ());
204       g_source_unref (source);
205     }
206   else
207     {
208       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
209       read_with_control_data_free (data);
210     }
211 }
212
213 static gssize
214 _g_socket_read_with_control_messages_finish (GSocket       *socket,
215                                              GAsyncResult  *result,
216                                              GError       **error)
217 {
218   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
219
220   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
221   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
222
223   if (g_simple_async_result_propagate_error (simple, error))
224       return -1;
225   else
226     return g_simple_async_result_get_op_res_gssize (simple);
227 }
228
229 /* ---------------------------------------------------------------------------------------------------- */
230
231 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
232
233 static GPtrArray *ensured_classes = NULL;
234
235 static void
236 ensure_type (GType gtype)
237 {
238   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
239 }
240
241 static void
242 release_required_types (void)
243 {
244   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
245   g_ptr_array_unref (ensured_classes);
246   ensured_classes = NULL;
247 }
248
249 static void
250 ensure_required_types (void)
251 {
252   g_assert (ensured_classes == NULL);
253   ensured_classes = g_ptr_array_new ();
254   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
255   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
256 }
257 /* ---------------------------------------------------------------------------------------------------- */
258
259 typedef struct
260 {
261   volatile gint refcount;
262   GThread *thread;
263   GMainContext *context;
264   GMainLoop *loop;
265 } SharedThreadData;
266
267 static gpointer
268 gdbus_shared_thread_func (gpointer user_data)
269 {
270   SharedThreadData *data = user_data;
271
272   g_main_context_push_thread_default (data->context);
273   g_main_loop_run (data->loop);
274   g_main_context_pop_thread_default (data->context);
275
276   release_required_types ();
277
278   return NULL;
279 }
280
281 /* ---------------------------------------------------------------------------------------------------- */
282
283 static SharedThreadData *
284 _g_dbus_shared_thread_ref (void)
285 {
286   static gsize shared_thread_data = 0;
287   SharedThreadData *ret;
288
289   if (g_once_init_enter (&shared_thread_data))
290     {
291       SharedThreadData *data;
292
293       /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
294       ensure_required_types ();
295
296       data = g_new0 (SharedThreadData, 1);
297       data->refcount = 0;
298       
299       data->context = g_main_context_new ();
300       data->loop = g_main_loop_new (data->context, FALSE);
301       data->thread = g_thread_new ("gdbus",
302                                    gdbus_shared_thread_func,
303                                    data);
304       /* We can cast between gsize and gpointer safely */
305       g_once_init_leave (&shared_thread_data, (gsize) data);
306     }
307
308   ret = (SharedThreadData*) shared_thread_data;
309   g_atomic_int_inc (&ret->refcount);
310   return ret;
311 }
312
313 static void
314 _g_dbus_shared_thread_unref (SharedThreadData *data)
315 {
316   /* TODO: actually destroy the shared thread here */
317 #if 0
318   g_assert (data != NULL);
319   if (g_atomic_int_dec_and_test (&data->refcount))
320     {
321       g_main_loop_quit (data->loop);
322       //g_thread_join (data->thread);
323       g_main_loop_unref (data->loop);
324       g_main_context_unref (data->context);
325     }
326 #endif
327 }
328
329 /* ---------------------------------------------------------------------------------------------------- */
330
331 typedef enum {
332     PENDING_NONE = 0,
333     PENDING_WRITE,
334     PENDING_FLUSH,
335     PENDING_CLOSE
336 } OutputPending;
337
338 struct GDBusWorker
339 {
340   volatile gint                       ref_count;
341
342   SharedThreadData                   *shared_thread_data;
343
344   /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
345   volatile gint                       stopped;
346
347   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
348    * only affects messages received from the other peer (since GDBusServer is the
349    * only user) - we might want it to affect messages sent to the other peer too?
350    */
351   gboolean                            frozen;
352   GDBusCapabilityFlags                capabilities;
353   GQueue                             *received_messages_while_frozen;
354
355   GIOStream                          *stream;
356   GCancellable                       *cancellable;
357   GDBusWorkerMessageReceivedCallback  message_received_callback;
358   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
359   GDBusWorkerDisconnectedCallback     disconnected_callback;
360   gpointer                            user_data;
361
362   /* if not NULL, stream is GSocketConnection */
363   GSocket *socket;
364
365   /* used for reading */
366   GMutex                              read_lock;
367   gchar                              *read_buffer;
368   gsize                               read_buffer_allocated_size;
369   gsize                               read_buffer_cur_size;
370   gsize                               read_buffer_bytes_wanted;
371   GUnixFDList                        *read_fd_list;
372   GSocketControlMessage             **read_ancillary_messages;
373   gint                                read_num_ancillary_messages;
374
375   /* Whether an async write, flush or close, or none of those, is pending.
376    * Only the worker thread may change its value, and only with the write_lock.
377    * Other threads may read its value when holding the write_lock.
378    * The worker thread may read its value at any time.
379    */
380   OutputPending                       output_pending;
381   /* used for writing */
382   GMutex                              write_lock;
383   /* queue of MessageToWriteData, protected by write_lock */
384   GQueue                             *write_queue;
385   /* protected by write_lock */
386   guint64                             write_num_messages_written;
387   /* number of messages we'd written out last time we flushed;
388    * protected by write_lock
389    */
390   guint64                             write_num_messages_flushed;
391   /* list of FlushData, protected by write_lock */
392   GList                              *write_pending_flushes;
393   /* list of CloseData, protected by write_lock */
394   GList                              *pending_close_attempts;
395   /* no lock - only used from the worker thread */
396   gboolean                            close_expected;
397 };
398
399 static void _g_dbus_worker_unref (GDBusWorker *worker);
400
401 /* ---------------------------------------------------------------------------------------------------- */
402
403 typedef struct
404 {
405   GMutex  mutex;
406   GCond   cond;
407   guint64 number_to_wait_for;
408   GError *error;
409 } FlushData;
410
411 struct _MessageToWriteData ;
412 typedef struct _MessageToWriteData MessageToWriteData;
413
414 static void message_to_write_data_free (MessageToWriteData *data);
415
416 static void read_message_print_transport_debug (gssize bytes_read,
417                                                 GDBusWorker *worker);
418
419 static void write_message_print_transport_debug (gssize bytes_written,
420                                                  MessageToWriteData *data);
421
422 typedef struct {
423     GDBusWorker *worker;
424     GCancellable *cancellable;
425     GSimpleAsyncResult *result;
426 } CloseData;
427
428 static void close_data_free (CloseData *close_data)
429 {
430   if (close_data->cancellable != NULL)
431     g_object_unref (close_data->cancellable);
432
433   if (close_data->result != NULL)
434     g_object_unref (close_data->result);
435
436   _g_dbus_worker_unref (close_data->worker);
437   g_slice_free (CloseData, close_data);
438 }
439
440 /* ---------------------------------------------------------------------------------------------------- */
441
442 static GDBusWorker *
443 _g_dbus_worker_ref (GDBusWorker *worker)
444 {
445   g_atomic_int_inc (&worker->ref_count);
446   return worker;
447 }
448
449 static void
450 _g_dbus_worker_unref (GDBusWorker *worker)
451 {
452   if (g_atomic_int_dec_and_test (&worker->ref_count))
453     {
454       g_assert (worker->write_pending_flushes == NULL);
455
456       _g_dbus_shared_thread_unref (worker->shared_thread_data);
457
458       g_object_unref (worker->stream);
459
460       g_mutex_clear (&worker->read_lock);
461       g_object_unref (worker->cancellable);
462       if (worker->read_fd_list != NULL)
463         g_object_unref (worker->read_fd_list);
464
465       g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
466       g_mutex_clear (&worker->write_lock);
467       g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
468       g_free (worker->read_buffer);
469
470       g_free (worker);
471     }
472 }
473
474 static void
475 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
476                                   gboolean      remote_peer_vanished,
477                                   GError       *error)
478 {
479   if (!g_atomic_int_get (&worker->stopped))
480     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
481 }
482
483 static void
484 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
485                                       GDBusMessage *message)
486 {
487   if (!g_atomic_int_get (&worker->stopped))
488     worker->message_received_callback (worker, message, worker->user_data);
489 }
490
491 static GDBusMessage *
492 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
493                                               GDBusMessage *message)
494 {
495   GDBusMessage *ret;
496   if (!g_atomic_int_get (&worker->stopped))
497     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
498   else
499     ret = message;
500   return ret;
501 }
502
503 /* can only be called from private thread with read-lock held - takes ownership of @message */
504 static void
505 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
506                                                   GDBusMessage *message)
507 {
508   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
509     {
510       /* queue up */
511       g_queue_push_tail (worker->received_messages_while_frozen, message);
512     }
513   else
514     {
515       /* not frozen, nor anything in queue */
516       _g_dbus_worker_emit_message_received (worker, message);
517       g_object_unref (message);
518     }
519 }
520
521 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
522 static gboolean
523 unfreeze_in_idle_cb (gpointer user_data)
524 {
525   GDBusWorker *worker = user_data;
526   GDBusMessage *message;
527
528   g_mutex_lock (&worker->read_lock);
529   if (worker->frozen)
530     {
531       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
532         {
533           _g_dbus_worker_emit_message_received (worker, message);
534           g_object_unref (message);
535         }
536       worker->frozen = FALSE;
537     }
538   else
539     {
540       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
541     }
542   g_mutex_unlock (&worker->read_lock);
543   return FALSE;
544 }
545
546 /* can be called from any thread */
547 void
548 _g_dbus_worker_unfreeze (GDBusWorker *worker)
549 {
550   GSource *idle_source;
551   idle_source = g_idle_source_new ();
552   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
553   g_source_set_callback (idle_source,
554                          unfreeze_in_idle_cb,
555                          _g_dbus_worker_ref (worker),
556                          (GDestroyNotify) _g_dbus_worker_unref);
557   g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb");
558   g_source_attach (idle_source, worker->shared_thread_data->context);
559   g_source_unref (idle_source);
560 }
561
562 /* ---------------------------------------------------------------------------------------------------- */
563
564 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
565
566 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
567 static void
568 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
569                            GAsyncResult  *res,
570                            gpointer       user_data)
571 {
572   GDBusWorker *worker = user_data;
573   GError *error;
574   gssize bytes_read;
575
576   g_mutex_lock (&worker->read_lock);
577
578   /* If already stopped, don't even process the reply */
579   if (g_atomic_int_get (&worker->stopped))
580     goto out;
581
582   error = NULL;
583   if (worker->socket == NULL)
584     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
585                                              res,
586                                              &error);
587   else
588     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
589                                                               res,
590                                                               &error);
591   if (worker->read_num_ancillary_messages > 0)
592     {
593       gint n;
594       for (n = 0; n < worker->read_num_ancillary_messages; n++)
595         {
596           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
597
598           if (FALSE)
599             {
600             }
601 #ifdef G_OS_UNIX
602           else if (G_IS_UNIX_FD_MESSAGE (control_message))
603             {
604               GUnixFDMessage *fd_message;
605               gint *fds;
606               gint num_fds;
607
608               fd_message = G_UNIX_FD_MESSAGE (control_message);
609               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
610               if (worker->read_fd_list == NULL)
611                 {
612                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
613                 }
614               else
615                 {
616                   gint n;
617                   for (n = 0; n < num_fds; n++)
618                     {
619                       /* TODO: really want a append_steal() */
620                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
621                       (void) g_close (fds[n], NULL);
622                     }
623                 }
624               g_free (fds);
625             }
626           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
627             {
628               /* do nothing */
629             }
630 #endif
631           else
632             {
633               if (error == NULL)
634                 {
635                   g_set_error (&error,
636                                G_IO_ERROR,
637                                G_IO_ERROR_FAILED,
638                                "Unexpected ancillary message of type %s received from peer",
639                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
640                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
641                   g_error_free (error);
642                   g_object_unref (control_message);
643                   n++;
644                   while (n < worker->read_num_ancillary_messages)
645                     g_object_unref (worker->read_ancillary_messages[n++]);
646                   g_free (worker->read_ancillary_messages);
647                   goto out;
648                 }
649             }
650           g_object_unref (control_message);
651         }
652       g_free (worker->read_ancillary_messages);
653     }
654
655   if (bytes_read == -1)
656     {
657       if (G_UNLIKELY (_g_dbus_debug_transport ()))
658         {
659           _g_dbus_debug_print_lock ();
660           g_print ("========================================================================\n"
661                    "GDBus-debug:Transport:\n"
662                    "  ---- READ ERROR on stream of type %s:\n"
663                    "  ---- %s %d: %s\n",
664                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
665                    g_quark_to_string (error->domain), error->code,
666                    error->message);
667           _g_dbus_debug_print_unlock ();
668         }
669
670       /* Every async read that uses this callback uses worker->cancellable
671        * as its GCancellable. worker->cancellable gets cancelled if and only
672        * if the GDBusConnection tells us to close (either via
673        * _g_dbus_worker_stop, which is called on last-unref, or directly),
674        * so a cancelled read must mean our connection was closed locally.
675        *
676        * If we're closing, other errors are possible - notably,
677        * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
678        * read in-flight. It seems sensible to treat all read errors during
679        * closing as an expected thing that doesn't trip exit-on-close.
680        *
681        * Because close_expected can't be set until we get into the worker
682        * thread, but the cancellable is signalled sooner (from another
683        * thread), we do still need to check the error.
684        */
685       if (worker->close_expected ||
686           g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
687         _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
688       else
689         _g_dbus_worker_emit_disconnected (worker, TRUE, error);
690
691       g_error_free (error);
692       goto out;
693     }
694
695 #if 0
696   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
697            (gint) bytes_read,
698            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
699            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
700            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
701                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
702            worker->stream,
703            worker);
704 #endif
705
706   /* TODO: hmm, hmm... */
707   if (bytes_read == 0)
708     {
709       g_set_error (&error,
710                    G_IO_ERROR,
711                    G_IO_ERROR_FAILED,
712                    "Underlying GIOStream returned 0 bytes on an async read");
713       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
714       g_error_free (error);
715       goto out;
716     }
717
718   read_message_print_transport_debug (bytes_read, worker);
719
720   worker->read_buffer_cur_size += bytes_read;
721   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
722     {
723       /* OK, got what we asked for! */
724       if (worker->read_buffer_bytes_wanted == 16)
725         {
726           gssize message_len;
727           /* OK, got the header - determine how many more bytes are needed */
728           error = NULL;
729           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
730                                                      16,
731                                                      &error);
732           if (message_len == -1)
733             {
734               g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
735               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
736               g_error_free (error);
737               goto out;
738             }
739
740           worker->read_buffer_bytes_wanted = message_len;
741           _g_dbus_worker_do_read_unlocked (worker);
742         }
743       else
744         {
745           GDBusMessage *message;
746           error = NULL;
747
748           /* TODO: use connection->priv->auth to decode the message */
749
750           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
751                                                   worker->read_buffer_cur_size,
752                                                   worker->capabilities,
753                                                   &error);
754           if (message == NULL)
755             {
756               gchar *s;
757               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
758               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
759                          "The error is: %s\n"
760                          "The payload is as follows:\n"
761                          "%s\n",
762                          worker->read_buffer_cur_size,
763                          error->message,
764                          s);
765               g_free (s);
766               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
767               g_error_free (error);
768               goto out;
769             }
770
771 #ifdef G_OS_UNIX
772           if (worker->read_fd_list != NULL)
773             {
774               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
775               g_object_unref (worker->read_fd_list);
776               worker->read_fd_list = NULL;
777             }
778 #endif
779
780           if (G_UNLIKELY (_g_dbus_debug_message ()))
781             {
782               gchar *s;
783               _g_dbus_debug_print_lock ();
784               g_print ("========================================================================\n"
785                        "GDBus-debug:Message:\n"
786                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
787                        worker->read_buffer_cur_size);
788               s = g_dbus_message_print (message, 2);
789               g_print ("%s", s);
790               g_free (s);
791               if (G_UNLIKELY (_g_dbus_debug_payload ()))
792                 {
793                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
794                   g_print ("%s\n", s);
795                   g_free (s);
796                 }
797               _g_dbus_debug_print_unlock ();
798             }
799
800           /* yay, got a message, go deliver it */
801           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
802
803           /* start reading another message! */
804           worker->read_buffer_bytes_wanted = 0;
805           worker->read_buffer_cur_size = 0;
806           _g_dbus_worker_do_read_unlocked (worker);
807         }
808     }
809   else
810     {
811       /* didn't get all the bytes we requested - so repeat the request... */
812       _g_dbus_worker_do_read_unlocked (worker);
813     }
814
815  out:
816   g_mutex_unlock (&worker->read_lock);
817
818   /* gives up the reference acquired when calling g_input_stream_read_async() */
819   _g_dbus_worker_unref (worker);
820 }
821
822 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
823 static void
824 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
825 {
826   /* Note that we do need to keep trying to read even if close_expected is
827    * true, because only failing a read causes us to signal 'closed'.
828    */
829
830   /* if bytes_wanted is zero, it means start reading a message */
831   if (worker->read_buffer_bytes_wanted == 0)
832     {
833       worker->read_buffer_cur_size = 0;
834       worker->read_buffer_bytes_wanted = 16;
835     }
836
837   /* ensure we have a (big enough) buffer */
838   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
839     {
840       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
841       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
842       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
843     }
844
845   if (worker->socket == NULL)
846     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
847                                worker->read_buffer + worker->read_buffer_cur_size,
848                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
849                                G_PRIORITY_DEFAULT,
850                                worker->cancellable,
851                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
852                                _g_dbus_worker_ref (worker));
853   else
854     {
855       worker->read_ancillary_messages = NULL;
856       worker->read_num_ancillary_messages = 0;
857       _g_socket_read_with_control_messages (worker->socket,
858                                             worker->read_buffer + worker->read_buffer_cur_size,
859                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
860                                             &worker->read_ancillary_messages,
861                                             &worker->read_num_ancillary_messages,
862                                             G_PRIORITY_DEFAULT,
863                                             worker->cancellable,
864                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
865                                             _g_dbus_worker_ref (worker));
866     }
867 }
868
869 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
870 static gboolean
871 _g_dbus_worker_do_initial_read (gpointer data)
872 {
873   GDBusWorker *worker = data;
874   g_mutex_lock (&worker->read_lock);
875   _g_dbus_worker_do_read_unlocked (worker);
876   g_mutex_unlock (&worker->read_lock);
877   return FALSE;
878 }
879
880 /* ---------------------------------------------------------------------------------------------------- */
881
882 struct _MessageToWriteData
883 {
884   GDBusWorker  *worker;
885   GDBusMessage *message;
886   gchar        *blob;
887   gsize         blob_size;
888
889   gsize               total_written;
890   GSimpleAsyncResult *simple;
891
892 };
893
894 static void
895 message_to_write_data_free (MessageToWriteData *data)
896 {
897   _g_dbus_worker_unref (data->worker);
898   if (data->message)
899     g_object_unref (data->message);
900   g_free (data->blob);
901   g_free (data);
902 }
903
904 /* ---------------------------------------------------------------------------------------------------- */
905
906 static void write_message_continue_writing (MessageToWriteData *data);
907
908 /* called in private thread shared by all GDBusConnection instances
909  *
910  * write-lock is not held on entry
911  * output_pending is PENDING_WRITE on entry
912  */
913 static void
914 write_message_async_cb (GObject      *source_object,
915                         GAsyncResult *res,
916                         gpointer      user_data)
917 {
918   MessageToWriteData *data = user_data;
919   GSimpleAsyncResult *simple;
920   gssize bytes_written;
921   GError *error;
922
923   /* Note: we can't access data->simple after calling g_async_result_complete () because the
924    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
925    */
926   simple = data->simple;
927
928   error = NULL;
929   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
930                                                 res,
931                                                 &error);
932   if (bytes_written == -1)
933     {
934       g_simple_async_result_take_error (simple, error);
935       g_simple_async_result_complete (simple);
936       g_object_unref (simple);
937       goto out;
938     }
939   g_assert (bytes_written > 0); /* zero is never returned */
940
941   write_message_print_transport_debug (bytes_written, data);
942
943   data->total_written += bytes_written;
944   g_assert (data->total_written <= data->blob_size);
945   if (data->total_written == data->blob_size)
946     {
947       g_simple_async_result_complete (simple);
948       g_object_unref (simple);
949       goto out;
950     }
951
952   write_message_continue_writing (data);
953
954  out:
955   ;
956 }
957
958 /* called in private thread shared by all GDBusConnection instances
959  *
960  * write-lock is not held on entry
961  * output_pending is PENDING_WRITE on entry
962  */
963 #ifdef G_OS_UNIX
964 static gboolean
965 on_socket_ready (GSocket      *socket,
966                  GIOCondition  condition,
967                  gpointer      user_data)
968 {
969   MessageToWriteData *data = user_data;
970   write_message_continue_writing (data);
971   return FALSE; /* remove source */
972 }
973 #endif
974
975 /* called in private thread shared by all GDBusConnection instances
976  *
977  * write-lock is not held on entry
978  * output_pending is PENDING_WRITE on entry
979  */
980 static void
981 write_message_continue_writing (MessageToWriteData *data)
982 {
983   GOutputStream *ostream;
984 #ifdef G_OS_UNIX
985   GSimpleAsyncResult *simple;
986   GUnixFDList *fd_list;
987 #endif
988
989 #ifdef G_OS_UNIX
990   /* Note: we can't access data->simple after calling g_async_result_complete () because the
991    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
992    */
993   simple = data->simple;
994 #endif
995
996   ostream = g_io_stream_get_output_stream (data->worker->stream);
997 #ifdef G_OS_UNIX
998   fd_list = g_dbus_message_get_unix_fd_list (data->message);
999 #endif
1000
1001   g_assert (!g_output_stream_has_pending (ostream));
1002   g_assert_cmpint (data->total_written, <, data->blob_size);
1003
1004   if (FALSE)
1005     {
1006     }
1007 #ifdef G_OS_UNIX
1008   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
1009     {
1010       GOutputVector vector;
1011       GSocketControlMessage *control_message;
1012       gssize bytes_written;
1013       GError *error;
1014
1015       vector.buffer = data->blob;
1016       vector.size = data->blob_size;
1017
1018       control_message = NULL;
1019       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
1020         {
1021           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
1022             {
1023               g_simple_async_result_set_error (simple,
1024                                                G_IO_ERROR,
1025                                                G_IO_ERROR_FAILED,
1026                                                "Tried sending a file descriptor but remote peer does not support this capability");
1027               g_simple_async_result_complete (simple);
1028               g_object_unref (simple);
1029               goto out;
1030             }
1031           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
1032         }
1033
1034       error = NULL;
1035       bytes_written = g_socket_send_message (data->worker->socket,
1036                                              NULL, /* address */
1037                                              &vector,
1038                                              1,
1039                                              control_message != NULL ? &control_message : NULL,
1040                                              control_message != NULL ? 1 : 0,
1041                                              G_SOCKET_MSG_NONE,
1042                                              data->worker->cancellable,
1043                                              &error);
1044       if (control_message != NULL)
1045         g_object_unref (control_message);
1046
1047       if (bytes_written == -1)
1048         {
1049           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1050           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1051             {
1052               GSource *source;
1053               source = g_socket_create_source (data->worker->socket,
1054                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
1055                                                data->worker->cancellable);
1056               g_source_set_callback (source,
1057                                      (GSourceFunc) on_socket_ready,
1058                                      data,
1059                                      NULL); /* GDestroyNotify */
1060               g_source_attach (source, g_main_context_get_thread_default ());
1061               g_source_unref (source);
1062               g_error_free (error);
1063               goto out;
1064             }
1065           g_simple_async_result_take_error (simple, error);
1066           g_simple_async_result_complete (simple);
1067           g_object_unref (simple);
1068           goto out;
1069         }
1070       g_assert (bytes_written > 0); /* zero is never returned */
1071
1072       write_message_print_transport_debug (bytes_written, data);
1073
1074       data->total_written += bytes_written;
1075       g_assert (data->total_written <= data->blob_size);
1076       if (data->total_written == data->blob_size)
1077         {
1078           g_simple_async_result_complete (simple);
1079           g_object_unref (simple);
1080           goto out;
1081         }
1082
1083       write_message_continue_writing (data);
1084     }
1085 #endif
1086   else
1087     {
1088 #ifdef G_OS_UNIX
1089       if (fd_list != NULL)
1090         {
1091           g_simple_async_result_set_error (simple,
1092                                            G_IO_ERROR,
1093                                            G_IO_ERROR_FAILED,
1094                                            "Tried sending a file descriptor on unsupported stream of type %s",
1095                                            g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1096           g_simple_async_result_complete (simple);
1097           g_object_unref (simple);
1098           goto out;
1099         }
1100 #endif
1101
1102       g_output_stream_write_async (ostream,
1103                                    (const gchar *) data->blob + data->total_written,
1104                                    data->blob_size - data->total_written,
1105                                    G_PRIORITY_DEFAULT,
1106                                    data->worker->cancellable,
1107                                    write_message_async_cb,
1108                                    data);
1109     }
1110 #ifdef G_OS_UNIX
1111  out:
1112 #endif
1113   ;
1114 }
1115
1116 /* called in private thread shared by all GDBusConnection instances
1117  *
1118  * write-lock is not held on entry
1119  * output_pending is PENDING_WRITE on entry
1120  */
1121 static void
1122 write_message_async (GDBusWorker         *worker,
1123                      MessageToWriteData  *data,
1124                      GAsyncReadyCallback  callback,
1125                      gpointer             user_data)
1126 {
1127   data->simple = g_simple_async_result_new (NULL,
1128                                             callback,
1129                                             user_data,
1130                                             write_message_async);
1131   data->total_written = 0;
1132   write_message_continue_writing (data);
1133 }
1134
1135 /* called in private thread shared by all GDBusConnection instances (with write-lock held) */
1136 static gboolean
1137 write_message_finish (GAsyncResult   *res,
1138                       GError        **error)
1139 {
1140   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1141   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1142     return FALSE;
1143   else
1144     return TRUE;
1145 }
1146 /* ---------------------------------------------------------------------------------------------------- */
1147
1148 static void continue_writing (GDBusWorker *worker);
1149
1150 typedef struct
1151 {
1152   GDBusWorker *worker;
1153   GList *flushers;
1154 } FlushAsyncData;
1155
1156 static void
1157 flush_data_list_complete (const GList  *flushers,
1158                           const GError *error)
1159 {
1160   const GList *l;
1161
1162   for (l = flushers; l != NULL; l = l->next)
1163     {
1164       FlushData *f = l->data;
1165
1166       f->error = error != NULL ? g_error_copy (error) : NULL;
1167
1168       g_mutex_lock (&f->mutex);
1169       g_cond_signal (&f->cond);
1170       g_mutex_unlock (&f->mutex);
1171     }
1172 }
1173
1174 /* called in private thread shared by all GDBusConnection instances
1175  *
1176  * write-lock is not held on entry
1177  * output_pending is PENDING_FLUSH on entry
1178  */
1179 static void
1180 ostream_flush_cb (GObject      *source_object,
1181                   GAsyncResult *res,
1182                   gpointer      user_data)
1183 {
1184   FlushAsyncData *data = user_data;
1185   GError *error;
1186
1187   error = NULL;
1188   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1189                                 res,
1190                                 &error);
1191
1192   if (error == NULL)
1193     {
1194       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1195         {
1196           _g_dbus_debug_print_lock ();
1197           g_print ("========================================================================\n"
1198                    "GDBus-debug:Transport:\n"
1199                    "  ---- FLUSHED stream of type %s\n",
1200                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1201           _g_dbus_debug_print_unlock ();
1202         }
1203     }
1204
1205   g_assert (data->flushers != NULL);
1206   flush_data_list_complete (data->flushers, error);
1207   g_list_free (data->flushers);
1208
1209   if (error != NULL)
1210     g_error_free (error);
1211
1212   /* Make sure we tell folks that we don't have additional
1213      flushes pending */
1214   g_mutex_lock (&data->worker->write_lock);
1215   data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
1216   g_assert (data->worker->output_pending == PENDING_FLUSH);
1217   data->worker->output_pending = PENDING_NONE;
1218   g_mutex_unlock (&data->worker->write_lock);
1219
1220   /* OK, cool, finally kick off the next write */
1221   continue_writing (data->worker);
1222
1223   _g_dbus_worker_unref (data->worker);
1224   g_free (data);
1225 }
1226
1227 /* called in private thread shared by all GDBusConnection instances
1228  *
1229  * write-lock is not held on entry
1230  * output_pending is PENDING_FLUSH on entry
1231  */
1232 static void
1233 start_flush (FlushAsyncData *data)
1234 {
1235   g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
1236                                G_PRIORITY_DEFAULT,
1237                                data->worker->cancellable,
1238                                ostream_flush_cb,
1239                                data);
1240 }
1241
1242 /* called in private thread shared by all GDBusConnection instances
1243  *
1244  * write-lock is held on entry
1245  * output_pending is PENDING_NONE on entry
1246  */
1247 static void
1248 message_written_unlocked (GDBusWorker *worker,
1249                           MessageToWriteData *message_data)
1250 {
1251   if (G_UNLIKELY (_g_dbus_debug_message ()))
1252     {
1253       gchar *s;
1254       _g_dbus_debug_print_lock ();
1255       g_print ("========================================================================\n"
1256                "GDBus-debug:Message:\n"
1257                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1258                message_data->blob_size);
1259       s = g_dbus_message_print (message_data->message, 2);
1260       g_print ("%s", s);
1261       g_free (s);
1262       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1263         {
1264           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1265           g_print ("%s\n", s);
1266           g_free (s);
1267         }
1268       _g_dbus_debug_print_unlock ();
1269     }
1270
1271   worker->write_num_messages_written += 1;
1272 }
1273
1274 /* called in private thread shared by all GDBusConnection instances
1275  *
1276  * write-lock is held on entry
1277  * output_pending is PENDING_NONE on entry
1278  *
1279  * Returns: non-%NULL, setting @output_pending, if we need to flush now
1280  */
1281 static FlushAsyncData *
1282 prepare_flush_unlocked (GDBusWorker *worker)
1283 {
1284   GList *l;
1285   GList *ll;
1286   GList *flushers;
1287
1288   flushers = NULL;
1289   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1290     {
1291       FlushData *f = l->data;
1292       ll = l->next;
1293
1294       if (f->number_to_wait_for == worker->write_num_messages_written)
1295         {
1296           flushers = g_list_append (flushers, f);
1297           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1298         }
1299     }
1300   if (flushers != NULL)
1301     {
1302       g_assert (worker->output_pending == PENDING_NONE);
1303       worker->output_pending = PENDING_FLUSH;
1304     }
1305
1306   if (flushers != NULL)
1307     {
1308       FlushAsyncData *data;
1309
1310       data = g_new0 (FlushAsyncData, 1);
1311       data->worker = _g_dbus_worker_ref (worker);
1312       data->flushers = flushers;
1313       return data;
1314     }
1315
1316   return NULL;
1317 }
1318
1319 /* called in private thread shared by all GDBusConnection instances
1320  *
1321  * write-lock is not held on entry
1322  * output_pending is PENDING_WRITE on entry
1323  */
1324 static void
1325 write_message_cb (GObject       *source_object,
1326                   GAsyncResult  *res,
1327                   gpointer       user_data)
1328 {
1329   MessageToWriteData *data = user_data;
1330   GError *error;
1331
1332   g_mutex_lock (&data->worker->write_lock);
1333   g_assert (data->worker->output_pending == PENDING_WRITE);
1334   data->worker->output_pending = PENDING_NONE;
1335
1336   error = NULL;
1337   if (!write_message_finish (res, &error))
1338     {
1339       g_mutex_unlock (&data->worker->write_lock);
1340
1341       /* TODO: handle */
1342       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1343       g_error_free (error);
1344
1345       g_mutex_lock (&data->worker->write_lock);
1346     }
1347
1348   message_written_unlocked (data->worker, data);
1349
1350   g_mutex_unlock (&data->worker->write_lock);
1351
1352   continue_writing (data->worker);
1353
1354   message_to_write_data_free (data);
1355 }
1356
1357 /* called in private thread shared by all GDBusConnection instances
1358  *
1359  * write-lock is not held on entry
1360  * output_pending is PENDING_CLOSE on entry
1361  */
1362 static void
1363 iostream_close_cb (GObject      *source_object,
1364                    GAsyncResult *res,
1365                    gpointer      user_data)
1366 {
1367   GDBusWorker *worker = user_data;
1368   GError *error = NULL;
1369   GList *pending_close_attempts, *pending_flush_attempts;
1370   GQueue *send_queue;
1371
1372   g_io_stream_close_finish (worker->stream, res, &error);
1373
1374   g_mutex_lock (&worker->write_lock);
1375
1376   pending_close_attempts = worker->pending_close_attempts;
1377   worker->pending_close_attempts = NULL;
1378
1379   pending_flush_attempts = worker->write_pending_flushes;
1380   worker->write_pending_flushes = NULL;
1381
1382   send_queue = worker->write_queue;
1383   worker->write_queue = g_queue_new ();
1384
1385   g_assert (worker->output_pending == PENDING_CLOSE);
1386   worker->output_pending = PENDING_NONE;
1387
1388   g_mutex_unlock (&worker->write_lock);
1389
1390   while (pending_close_attempts != NULL)
1391     {
1392       CloseData *close_data = pending_close_attempts->data;
1393
1394       pending_close_attempts = g_list_delete_link (pending_close_attempts,
1395                                                    pending_close_attempts);
1396
1397       if (close_data->result != NULL)
1398         {
1399           if (error != NULL)
1400             g_simple_async_result_set_from_error (close_data->result, error);
1401
1402           /* this must be in an idle because the result is likely to be
1403            * intended for another thread
1404            */
1405           g_simple_async_result_complete_in_idle (close_data->result);
1406         }
1407
1408       close_data_free (close_data);
1409     }
1410
1411   g_clear_error (&error);
1412
1413   /* all messages queued for sending are discarded */
1414   g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
1415   /* all queued flushes fail */
1416   error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
1417                        _("Operation was cancelled"));
1418   flush_data_list_complete (pending_flush_attempts, error);
1419   g_list_free (pending_flush_attempts);
1420   g_clear_error (&error);
1421
1422   _g_dbus_worker_unref (worker);
1423 }
1424
1425 /* called in private thread shared by all GDBusConnection instances
1426  *
1427  * write-lock is not held on entry
1428  * output_pending must be PENDING_NONE on entry
1429  */
1430 static void
1431 continue_writing (GDBusWorker *worker)
1432 {
1433   MessageToWriteData *data;
1434   FlushAsyncData *flush_async_data;
1435
1436  write_next:
1437   /* we mustn't try to write two things at once */
1438   g_assert (worker->output_pending == PENDING_NONE);
1439
1440   g_mutex_lock (&worker->write_lock);
1441
1442   data = NULL;
1443   flush_async_data = NULL;
1444
1445   /* if we want to close the connection, that takes precedence */
1446   if (worker->pending_close_attempts != NULL)
1447     {
1448       worker->close_expected = TRUE;
1449       worker->output_pending = PENDING_CLOSE;
1450
1451       g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
1452                                NULL, iostream_close_cb,
1453                                _g_dbus_worker_ref (worker));
1454     }
1455   else
1456     {
1457       flush_async_data = prepare_flush_unlocked (worker);
1458
1459       if (flush_async_data == NULL)
1460         {
1461           data = g_queue_pop_head (worker->write_queue);
1462
1463           if (data != NULL)
1464             worker->output_pending = PENDING_WRITE;
1465         }
1466     }
1467
1468   g_mutex_unlock (&worker->write_lock);
1469
1470   /* Note that write_lock is only used for protecting the @write_queue
1471    * and @output_pending fields of the GDBusWorker struct ... which we
1472    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1473    *
1474    * Therefore, it's fine to drop it here when calling back into user
1475    * code and then writing the message out onto the GIOStream since this
1476    * function only runs on the worker thread.
1477    */
1478
1479   if (flush_async_data != NULL)
1480     {
1481       start_flush (flush_async_data);
1482       g_assert (data == NULL);
1483     }
1484   else if (data != NULL)
1485     {
1486       GDBusMessage *old_message;
1487       guchar *new_blob;
1488       gsize new_blob_size;
1489       GError *error;
1490
1491       old_message = data->message;
1492       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1493       if (data->message == old_message)
1494         {
1495           /* filters had no effect - do nothing */
1496         }
1497       else if (data->message == NULL)
1498         {
1499           /* filters dropped message */
1500           g_mutex_lock (&worker->write_lock);
1501           worker->output_pending = PENDING_NONE;
1502           g_mutex_unlock (&worker->write_lock);
1503           message_to_write_data_free (data);
1504           goto write_next;
1505         }
1506       else
1507         {
1508           /* filters altered the message -> reencode */
1509           error = NULL;
1510           new_blob = g_dbus_message_to_blob (data->message,
1511                                              &new_blob_size,
1512                                              worker->capabilities,
1513                                              &error);
1514           if (new_blob == NULL)
1515             {
1516               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1517                * the old message instead
1518                */
1519               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1520                          g_dbus_message_get_serial (data->message),
1521                          error->message);
1522               g_error_free (error);
1523             }
1524           else
1525             {
1526               g_free (data->blob);
1527               data->blob = (gchar *) new_blob;
1528               data->blob_size = new_blob_size;
1529             }
1530         }
1531
1532       write_message_async (worker,
1533                            data,
1534                            write_message_cb,
1535                            data);
1536     }
1537 }
1538
1539 /* called in private thread shared by all GDBusConnection instances
1540  *
1541  * write-lock is not held on entry
1542  * output_pending may be anything
1543  */
1544 static gboolean
1545 continue_writing_in_idle_cb (gpointer user_data)
1546 {
1547   GDBusWorker *worker = user_data;
1548
1549   /* Because this is the worker thread, we can read this struct member
1550    * without holding the lock: no other thread ever modifies it.
1551    */
1552   if (worker->output_pending == PENDING_NONE)
1553     continue_writing (worker);
1554
1555   return FALSE;
1556 }
1557
1558 /*
1559  * @write_data: (transfer full) (allow-none):
1560  * @flush_data: (transfer full) (allow-none):
1561  * @close_data: (transfer full) (allow-none):
1562  *
1563  * Can be called from any thread
1564  *
1565  * write_lock is held on entry
1566  * output_pending may be anything
1567  */
1568 static void
1569 schedule_writing_unlocked (GDBusWorker        *worker,
1570                            MessageToWriteData *write_data,
1571                            FlushData          *flush_data,
1572                            CloseData          *close_data)
1573 {
1574   if (write_data != NULL)
1575     g_queue_push_tail (worker->write_queue, write_data);
1576
1577   if (flush_data != NULL)
1578     worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
1579
1580   if (close_data != NULL)
1581     worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
1582                                                      close_data);
1583
1584   /* If we had output pending, the next bit of output will happen
1585    * automatically when it finishes, so we only need to do this
1586    * if nothing was pending.
1587    *
1588    * The idle callback will re-check that output_pending is still
1589    * PENDING_NONE, to guard against output starting before the idle.
1590    */
1591   if (worker->output_pending == PENDING_NONE)
1592     {
1593       GSource *idle_source;
1594       idle_source = g_idle_source_new ();
1595       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1596       g_source_set_callback (idle_source,
1597                              continue_writing_in_idle_cb,
1598                              _g_dbus_worker_ref (worker),
1599                              (GDestroyNotify) _g_dbus_worker_unref);
1600       g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb");
1601       g_source_attach (idle_source, worker->shared_thread_data->context);
1602       g_source_unref (idle_source);
1603     }
1604 }
1605
1606 /* ---------------------------------------------------------------------------------------------------- */
1607
1608 /* can be called from any thread - steals blob
1609  *
1610  * write_lock is not held on entry
1611  * output_pending may be anything
1612  */
1613 void
1614 _g_dbus_worker_send_message (GDBusWorker    *worker,
1615                              GDBusMessage   *message,
1616                              gchar          *blob,
1617                              gsize           blob_len)
1618 {
1619   MessageToWriteData *data;
1620
1621   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1622   g_return_if_fail (blob != NULL);
1623   g_return_if_fail (blob_len > 16);
1624
1625   data = g_new0 (MessageToWriteData, 1);
1626   data->worker = _g_dbus_worker_ref (worker);
1627   data->message = g_object_ref (message);
1628   data->blob = blob; /* steal! */
1629   data->blob_size = blob_len;
1630
1631   g_mutex_lock (&worker->write_lock);
1632   schedule_writing_unlocked (worker, data, NULL, NULL);
1633   g_mutex_unlock (&worker->write_lock);
1634 }
1635
1636 /* ---------------------------------------------------------------------------------------------------- */
1637
1638 GDBusWorker *
1639 _g_dbus_worker_new (GIOStream                              *stream,
1640                     GDBusCapabilityFlags                    capabilities,
1641                     gboolean                                initially_frozen,
1642                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1643                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1644                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1645                     gpointer                                user_data)
1646 {
1647   GDBusWorker *worker;
1648   GSource *idle_source;
1649
1650   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1651   g_return_val_if_fail (message_received_callback != NULL, NULL);
1652   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1653   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1654
1655   worker = g_new0 (GDBusWorker, 1);
1656   worker->ref_count = 1;
1657
1658   g_mutex_init (&worker->read_lock);
1659   worker->message_received_callback = message_received_callback;
1660   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1661   worker->disconnected_callback = disconnected_callback;
1662   worker->user_data = user_data;
1663   worker->stream = g_object_ref (stream);
1664   worker->capabilities = capabilities;
1665   worker->cancellable = g_cancellable_new ();
1666   worker->output_pending = PENDING_NONE;
1667
1668   worker->frozen = initially_frozen;
1669   worker->received_messages_while_frozen = g_queue_new ();
1670
1671   g_mutex_init (&worker->write_lock);
1672   worker->write_queue = g_queue_new ();
1673
1674   if (G_IS_SOCKET_CONNECTION (worker->stream))
1675     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1676
1677   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1678
1679   /* begin reading */
1680   idle_source = g_idle_source_new ();
1681   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1682   g_source_set_callback (idle_source,
1683                          _g_dbus_worker_do_initial_read,
1684                          _g_dbus_worker_ref (worker),
1685                          (GDestroyNotify) _g_dbus_worker_unref);
1686   g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read");
1687   g_source_attach (idle_source, worker->shared_thread_data->context);
1688   g_source_unref (idle_source);
1689
1690   return worker;
1691 }
1692
1693 /* ---------------------------------------------------------------------------------------------------- */
1694
1695 /* can be called from any thread
1696  *
1697  * write_lock is not held on entry
1698  * output_pending may be anything
1699  */
1700 void
1701 _g_dbus_worker_close (GDBusWorker         *worker,
1702                       GCancellable        *cancellable,
1703                       GSimpleAsyncResult  *result)
1704 {
1705   CloseData *close_data;
1706
1707   close_data = g_slice_new0 (CloseData);
1708   close_data->worker = _g_dbus_worker_ref (worker);
1709   close_data->cancellable =
1710       (cancellable == NULL ? NULL : g_object_ref (cancellable));
1711   close_data->result = (result == NULL ? NULL : g_object_ref (result));
1712
1713   /* Don't set worker->close_expected here - we're in the wrong thread.
1714    * It'll be set before the actual close happens.
1715    */
1716   g_cancellable_cancel (worker->cancellable);
1717   g_mutex_lock (&worker->write_lock);
1718   schedule_writing_unlocked (worker, NULL, NULL, close_data);
1719   g_mutex_unlock (&worker->write_lock);
1720 }
1721
1722 /* This can be called from any thread - frees worker. Note that
1723  * callbacks might still happen if called from another thread than the
1724  * worker - use your own synchronization primitive in the callbacks.
1725  *
1726  * write_lock is not held on entry
1727  * output_pending may be anything
1728  */
1729 void
1730 _g_dbus_worker_stop (GDBusWorker *worker)
1731 {
1732   g_atomic_int_set (&worker->stopped, TRUE);
1733
1734   /* Cancel any pending operations and schedule a close of the underlying I/O
1735    * stream in the worker thread
1736    */
1737   _g_dbus_worker_close (worker, NULL, NULL);
1738
1739   /* _g_dbus_worker_close holds a ref until after an idle in the worker
1740    * thread has run, so we no longer need to unref in an idle like in
1741    * commit 322e25b535
1742    */
1743   _g_dbus_worker_unref (worker);
1744 }
1745
1746 /* ---------------------------------------------------------------------------------------------------- */
1747
1748 /* can be called from any thread (except the worker thread) - blocks
1749  * calling thread until all queued outgoing messages are written and
1750  * the transport has been flushed
1751  *
1752  * write_lock is not held on entry
1753  * output_pending may be anything
1754  */
1755 gboolean
1756 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1757                            GCancellable   *cancellable,
1758                            GError        **error)
1759 {
1760   gboolean ret;
1761   FlushData *data;
1762   guint64 pending_writes;
1763
1764   data = NULL;
1765   ret = TRUE;
1766
1767   g_mutex_lock (&worker->write_lock);
1768
1769   /* if the queue is empty, no write is in-flight and we haven't written
1770    * anything since the last flush, then there's nothing to wait for
1771    */
1772   pending_writes = g_queue_get_length (worker->write_queue);
1773
1774   /* if a write is in-flight, we shouldn't be satisfied until the first
1775    * flush operation that follows it
1776    */
1777   if (worker->output_pending == PENDING_WRITE)
1778     pending_writes += 1;
1779
1780   if (pending_writes > 0 ||
1781       worker->write_num_messages_written != worker->write_num_messages_flushed)
1782     {
1783       data = g_new0 (FlushData, 1);
1784       g_mutex_init (&data->mutex);
1785       g_cond_init (&data->cond);
1786       data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
1787       g_mutex_lock (&data->mutex);
1788
1789       schedule_writing_unlocked (worker, NULL, data, NULL);
1790     }
1791   g_mutex_unlock (&worker->write_lock);
1792
1793   if (data != NULL)
1794     {
1795       g_cond_wait (&data->cond, &data->mutex);
1796       g_mutex_unlock (&data->mutex);
1797
1798       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1799       g_cond_clear (&data->cond);
1800       g_mutex_clear (&data->mutex);
1801       if (data->error != NULL)
1802         {
1803           ret = FALSE;
1804           g_propagate_error (error, data->error);
1805         }
1806       g_free (data);
1807     }
1808
1809   return ret;
1810 }
1811
1812 /* ---------------------------------------------------------------------------------------------------- */
1813
1814 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1815 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1816 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1817 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1818 #define G_DBUS_DEBUG_CALL           (1<<4)
1819 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1820 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1821 #define G_DBUS_DEBUG_RETURN         (1<<7)
1822 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1823 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1824
1825 static gint _gdbus_debug_flags = 0;
1826
1827 gboolean
1828 _g_dbus_debug_authentication (void)
1829 {
1830   _g_dbus_initialize ();
1831   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1832 }
1833
1834 gboolean
1835 _g_dbus_debug_transport (void)
1836 {
1837   _g_dbus_initialize ();
1838   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1839 }
1840
1841 gboolean
1842 _g_dbus_debug_message (void)
1843 {
1844   _g_dbus_initialize ();
1845   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1846 }
1847
1848 gboolean
1849 _g_dbus_debug_payload (void)
1850 {
1851   _g_dbus_initialize ();
1852   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1853 }
1854
1855 gboolean
1856 _g_dbus_debug_call (void)
1857 {
1858   _g_dbus_initialize ();
1859   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1860 }
1861
1862 gboolean
1863 _g_dbus_debug_signal (void)
1864 {
1865   _g_dbus_initialize ();
1866   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1867 }
1868
1869 gboolean
1870 _g_dbus_debug_incoming (void)
1871 {
1872   _g_dbus_initialize ();
1873   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1874 }
1875
1876 gboolean
1877 _g_dbus_debug_return (void)
1878 {
1879   _g_dbus_initialize ();
1880   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1881 }
1882
1883 gboolean
1884 _g_dbus_debug_emission (void)
1885 {
1886   _g_dbus_initialize ();
1887   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1888 }
1889
1890 gboolean
1891 _g_dbus_debug_address (void)
1892 {
1893   _g_dbus_initialize ();
1894   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1895 }
1896
1897 G_LOCK_DEFINE_STATIC (print_lock);
1898
1899 void
1900 _g_dbus_debug_print_lock (void)
1901 {
1902   G_LOCK (print_lock);
1903 }
1904
1905 void
1906 _g_dbus_debug_print_unlock (void)
1907 {
1908   G_UNLOCK (print_lock);
1909 }
1910
1911 /*
1912  * _g_dbus_initialize:
1913  *
1914  * Does various one-time init things such as
1915  *
1916  *  - registering the G_DBUS_ERROR error domain
1917  *  - parses the G_DBUS_DEBUG environment variable
1918  */
1919 void
1920 _g_dbus_initialize (void)
1921 {
1922   static volatile gsize initialized = 0;
1923
1924   if (g_once_init_enter (&initialized))
1925     {
1926       volatile GQuark g_dbus_error_domain;
1927       const gchar *debug;
1928
1929       g_dbus_error_domain = G_DBUS_ERROR;
1930       (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
1931
1932       debug = g_getenv ("G_DBUS_DEBUG");
1933       if (debug != NULL)
1934         {
1935           const GDebugKey keys[] = {
1936             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1937             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
1938             { "message",        G_DBUS_DEBUG_MESSAGE        },
1939             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
1940             { "call",           G_DBUS_DEBUG_CALL           },
1941             { "signal",         G_DBUS_DEBUG_SIGNAL         },
1942             { "incoming",       G_DBUS_DEBUG_INCOMING       },
1943             { "return",         G_DBUS_DEBUG_RETURN         },
1944             { "emission",       G_DBUS_DEBUG_EMISSION       },
1945             { "address",        G_DBUS_DEBUG_ADDRESS        }
1946           };
1947
1948           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1949           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1950             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1951         }
1952
1953       g_once_init_leave (&initialized, 1);
1954     }
1955 }
1956
1957 /* ---------------------------------------------------------------------------------------------------- */
1958
1959 GVariantType *
1960 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1961 {
1962   const GVariantType *arg_types[256];
1963   guint n;
1964
1965   if (args)
1966     for (n = 0; args[n] != NULL; n++)
1967       {
1968         /* DBus places a hard limit of 255 on signature length.
1969          * therefore number of args must be less than 256.
1970          */
1971         g_assert (n < 256);
1972
1973         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1974
1975         if G_UNLIKELY (arg_types[n] == NULL)
1976           return NULL;
1977       }
1978   else
1979     n = 0;
1980
1981   return g_variant_type_new_tuple (arg_types, n);
1982 }
1983
1984 /* ---------------------------------------------------------------------------------------------------- */
1985
1986 #ifdef G_OS_WIN32
1987
1988 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1989
1990 gchar *
1991 _g_dbus_win32_get_user_sid (void)
1992 {
1993   HANDLE h;
1994   TOKEN_USER *user;
1995   DWORD token_information_len;
1996   PSID psid;
1997   gchar *sid;
1998   gchar *ret;
1999
2000   ret = NULL;
2001   user = NULL;
2002   h = INVALID_HANDLE_VALUE;
2003
2004   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
2005     {
2006       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
2007       goto out;
2008     }
2009
2010   /* Get length of buffer */
2011   token_information_len = 0;
2012   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
2013     {
2014       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
2015         {
2016           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2017           goto out;
2018         }
2019     }
2020   user = g_malloc (token_information_len);
2021   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
2022     {
2023       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2024       goto out;
2025     }
2026
2027   psid = user->User.Sid;
2028   if (!IsValidSid (psid))
2029     {
2030       g_warning ("Invalid SID");
2031       goto out;
2032     }
2033
2034   if (!ConvertSidToStringSidA (psid, &sid))
2035     {
2036       g_warning ("Invalid SID");
2037       goto out;
2038     }
2039
2040   ret = g_strdup (sid);
2041   LocalFree (sid);
2042
2043 out:
2044   g_free (user);
2045   if (h != INVALID_HANDLE_VALUE)
2046     CloseHandle (h);
2047   return ret;
2048 }
2049 #endif
2050
2051 /* ---------------------------------------------------------------------------------------------------- */
2052
2053 gchar *
2054 _g_dbus_get_machine_id (GError **error)
2055 {
2056 #ifdef G_OS_WIN32
2057   HW_PROFILE_INFOA info;
2058   char *src, *dest, *res;
2059   int i;
2060
2061   if (!GetCurrentHwProfileA (&info))
2062     {
2063       char *message = g_win32_error_message (GetLastError ());
2064       g_set_error (error,
2065                    G_IO_ERROR,
2066                    G_IO_ERROR_FAILED,
2067                    _("Unable to get Hardware profile: %s"), message);
2068       g_free (message);
2069       return NULL;
2070     }
2071
2072   /* Form: {12340001-4980-1920-6788-123456789012} */
2073   src = &info.szHwProfileGuid[0];
2074
2075   res = g_malloc (32+1);
2076   dest = res;
2077
2078   src++; /* Skip { */
2079   for (i = 0; i < 8; i++)
2080     *dest++ = *src++;
2081   src++; /* Skip - */
2082   for (i = 0; i < 4; i++)
2083     *dest++ = *src++;
2084   src++; /* Skip - */
2085   for (i = 0; i < 4; i++)
2086     *dest++ = *src++;
2087   src++; /* Skip - */
2088   for (i = 0; i < 4; i++)
2089     *dest++ = *src++;
2090   src++; /* Skip - */
2091   for (i = 0; i < 12; i++)
2092     *dest++ = *src++;
2093   *dest = 0;
2094
2095   return res;
2096 #else
2097   gchar *ret;
2098   GError *first_error;
2099   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
2100   ret = NULL;
2101   first_error = NULL;
2102   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
2103                             &ret,
2104                             NULL,
2105                             &first_error) &&
2106       !g_file_get_contents ("/etc/machine-id",
2107                             &ret,
2108                             NULL,
2109                             NULL))
2110     {
2111       g_propagate_prefixed_error (error, first_error,
2112                                   _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
2113     }
2114   else
2115     {
2116       /* ignore the error from the first try, if any */
2117       g_clear_error (&first_error);
2118       /* TODO: validate value */
2119       g_strstrip (ret);
2120     }
2121   return ret;
2122 #endif
2123 }
2124
2125 /* ---------------------------------------------------------------------------------------------------- */
2126
2127 gchar *
2128 _g_dbus_enum_to_string (GType enum_type, gint value)
2129 {
2130   gchar *ret;
2131   GEnumClass *klass;
2132   GEnumValue *enum_value;
2133
2134   klass = g_type_class_ref (enum_type);
2135   enum_value = g_enum_get_value (klass, value);
2136   if (enum_value != NULL)
2137     ret = g_strdup (enum_value->value_nick);
2138   else
2139     ret = g_strdup_printf ("unknown (value %d)", value);
2140   g_type_class_unref (klass);
2141   return ret;
2142 }
2143
2144 /* ---------------------------------------------------------------------------------------------------- */
2145
2146 static void
2147 write_message_print_transport_debug (gssize bytes_written,
2148                                      MessageToWriteData *data)
2149 {
2150   if (G_LIKELY (!_g_dbus_debug_transport ()))
2151     goto out;
2152
2153   _g_dbus_debug_print_lock ();
2154   g_print ("========================================================================\n"
2155            "GDBus-debug:Transport:\n"
2156            "  >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2157            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
2158            bytes_written,
2159            g_dbus_message_get_serial (data->message),
2160            data->blob_size,
2161            data->total_written,
2162            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
2163   _g_dbus_debug_print_unlock ();
2164  out:
2165   ;
2166 }
2167
2168 /* ---------------------------------------------------------------------------------------------------- */
2169
2170 static void
2171 read_message_print_transport_debug (gssize bytes_read,
2172                                     GDBusWorker *worker)
2173 {
2174   gsize size;
2175   gint32 serial;
2176   gint32 message_length;
2177
2178   if (G_LIKELY (!_g_dbus_debug_transport ()))
2179     goto out;
2180
2181   size = bytes_read + worker->read_buffer_cur_size;
2182   serial = 0;
2183   message_length = 0;
2184   if (size >= 16)
2185     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
2186   if (size >= 1)
2187     {
2188       switch (worker->read_buffer[0])
2189         {
2190         case 'l':
2191           if (size >= 12)
2192             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
2193           break;
2194         case 'B':
2195           if (size >= 12)
2196             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
2197           break;
2198         default:
2199           /* an error will be set elsewhere if this happens */
2200           goto out;
2201         }
2202     }
2203
2204     _g_dbus_debug_print_lock ();
2205   g_print ("========================================================================\n"
2206            "GDBus-debug:Transport:\n"
2207            "  <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2208            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
2209            bytes_read,
2210            serial,
2211            message_length,
2212            worker->read_buffer_cur_size,
2213            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
2214   _g_dbus_debug_print_unlock ();
2215  out:
2216   ;
2217 }
2218
2219 /* ---------------------------------------------------------------------------------------------------- */
2220
2221 gboolean
2222 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
2223                                      GValue                *return_accu,
2224                                      const GValue          *handler_return,
2225                                      gpointer               dummy)
2226 {
2227   gboolean continue_emission;
2228   gboolean signal_return;
2229
2230   signal_return = g_value_get_boolean (handler_return);
2231   g_value_set_boolean (return_accu, signal_return);
2232   continue_emission = signal_return;
2233
2234   return continue_emission;
2235 }