Updated FSF's address
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: David Zeuthen <davidz@redhat.com>
19  */
20
21 #include "config.h"
22
23 #include <stdlib.h>
24 #include <string.h>
25
26 #include "giotypes.h"
27 #include "gsocket.h"
28 #include "gdbusprivate.h"
29 #include "gdbusmessage.h"
30 #include "gdbuserror.h"
31 #include "gdbusintrospection.h"
32 #include "gasyncresult.h"
33 #include "gsimpleasyncresult.h"
34 #include "ginputstream.h"
35 #include "gmemoryinputstream.h"
36 #include "giostream.h"
37 #include "glib/gstdio.h"
38 #include "gsocketcontrolmessage.h"
39 #include "gsocketconnection.h"
40 #include "gsocketoutputstream.h"
41
42 #ifdef G_OS_UNIX
43 #include "gunixfdmessage.h"
44 #include "gunixconnection.h"
45 #include "gunixcredentialsmessage.h"
46 #endif
47
48 #ifdef G_OS_WIN32
49 #include <windows.h>
50 #endif
51
52 #include "glibintl.h"
53
54 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
55
56 /* ---------------------------------------------------------------------------------------------------- */
57
58 gchar *
59 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
60 {
61  guint n, m;
62  GString *ret;
63
64  ret = g_string_new (NULL);
65
66  for (n = 0; n < len; n += 16)
67    {
68      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
69
70      for (m = n; m < n + 16; m++)
71        {
72          if (m > n && (m%4) == 0)
73            g_string_append_c (ret, ' ');
74          if (m < len)
75            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
76          else
77            g_string_append (ret, "   ");
78        }
79
80      g_string_append (ret, "   ");
81
82      for (m = n; m < len && m < n + 16; m++)
83        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
84
85      g_string_append_c (ret, '\n');
86    }
87
88  return g_string_free (ret, FALSE);
89 }
90
91 /* ---------------------------------------------------------------------------------------------------- */
92
93 /* Unfortunately ancillary messages are discarded when reading from a
94  * socket using the GSocketInputStream abstraction. So we provide a
95  * very GInputStream-ish API that uses GSocket in this case (very
96  * similar to GSocketInputStream).
97  */
98
99 typedef struct
100 {
101   GSocket *socket;
102   GCancellable *cancellable;
103
104   void *buffer;
105   gsize count;
106
107   GSocketControlMessage ***messages;
108   gint *num_messages;
109
110   GSimpleAsyncResult *simple;
111
112   gboolean from_mainloop;
113 } ReadWithControlData;
114
115 static void
116 read_with_control_data_free (ReadWithControlData *data)
117 {
118   g_object_unref (data->socket);
119   if (data->cancellable != NULL)
120     g_object_unref (data->cancellable);
121   g_object_unref (data->simple);
122   g_free (data);
123 }
124
125 static gboolean
126 _g_socket_read_with_control_messages_ready (GSocket      *socket,
127                                             GIOCondition  condition,
128                                             gpointer      user_data)
129 {
130   ReadWithControlData *data = user_data;
131   GError *error;
132   gssize result;
133   GInputVector vector;
134
135   error = NULL;
136   vector.buffer = data->buffer;
137   vector.size = data->count;
138   result = g_socket_receive_message (data->socket,
139                                      NULL, /* address */
140                                      &vector,
141                                      1,
142                                      data->messages,
143                                      data->num_messages,
144                                      NULL,
145                                      data->cancellable,
146                                      &error);
147   if (result >= 0)
148     {
149       g_simple_async_result_set_op_res_gssize (data->simple, result);
150     }
151   else
152     {
153       g_assert (error != NULL);
154       g_simple_async_result_take_error (data->simple, error);
155     }
156
157   if (data->from_mainloop)
158     g_simple_async_result_complete (data->simple);
159   else
160     g_simple_async_result_complete_in_idle (data->simple);
161
162   return FALSE;
163 }
164
165 static void
166 _g_socket_read_with_control_messages (GSocket                 *socket,
167                                       void                    *buffer,
168                                       gsize                    count,
169                                       GSocketControlMessage ***messages,
170                                       gint                    *num_messages,
171                                       gint                     io_priority,
172                                       GCancellable            *cancellable,
173                                       GAsyncReadyCallback      callback,
174                                       gpointer                 user_data)
175 {
176   ReadWithControlData *data;
177
178   data = g_new0 (ReadWithControlData, 1);
179   data->socket = g_object_ref (socket);
180   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
181   data->buffer = buffer;
182   data->count = count;
183   data->messages = messages;
184   data->num_messages = num_messages;
185
186   data->simple = g_simple_async_result_new (G_OBJECT (socket),
187                                             callback,
188                                             user_data,
189                                             _g_socket_read_with_control_messages);
190   g_simple_async_result_set_check_cancellable (data->simple, cancellable);
191
192   if (!g_socket_condition_check (socket, G_IO_IN))
193     {
194       GSource *source;
195       data->from_mainloop = TRUE;
196       source = g_socket_create_source (data->socket,
197                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
198                                        cancellable);
199       g_source_set_callback (source,
200                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
201                              data,
202                              (GDestroyNotify) read_with_control_data_free);
203       g_source_attach (source, g_main_context_get_thread_default ());
204       g_source_unref (source);
205     }
206   else
207     {
208       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
209       read_with_control_data_free (data);
210     }
211 }
212
213 static gssize
214 _g_socket_read_with_control_messages_finish (GSocket       *socket,
215                                              GAsyncResult  *result,
216                                              GError       **error)
217 {
218   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
219
220   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
221   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
222
223   if (g_simple_async_result_propagate_error (simple, error))
224       return -1;
225   else
226     return g_simple_async_result_get_op_res_gssize (simple);
227 }
228
229 /* ---------------------------------------------------------------------------------------------------- */
230
231 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
232
233 static GPtrArray *ensured_classes = NULL;
234
235 static void
236 ensure_type (GType gtype)
237 {
238   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
239 }
240
241 static void
242 release_required_types (void)
243 {
244   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
245   g_ptr_array_unref (ensured_classes);
246   ensured_classes = NULL;
247 }
248
249 static void
250 ensure_required_types (void)
251 {
252   g_assert (ensured_classes == NULL);
253   ensured_classes = g_ptr_array_new ();
254   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
255   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
256 }
257 /* ---------------------------------------------------------------------------------------------------- */
258
259 typedef struct
260 {
261   volatile gint refcount;
262   GThread *thread;
263   GMainContext *context;
264   GMainLoop *loop;
265 } SharedThreadData;
266
267 static gpointer
268 gdbus_shared_thread_func (gpointer user_data)
269 {
270   SharedThreadData *data = user_data;
271
272   g_main_context_push_thread_default (data->context);
273   g_main_loop_run (data->loop);
274   g_main_context_pop_thread_default (data->context);
275
276   release_required_types ();
277
278   return NULL;
279 }
280
281 /* ---------------------------------------------------------------------------------------------------- */
282
283 static SharedThreadData *
284 _g_dbus_shared_thread_ref (void)
285 {
286   static gsize shared_thread_data = 0;
287   SharedThreadData *ret;
288
289   if (g_once_init_enter (&shared_thread_data))
290     {
291       SharedThreadData *data;
292
293       /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
294       ensure_required_types ();
295
296       data = g_new0 (SharedThreadData, 1);
297       data->refcount = 0;
298       
299       data->context = g_main_context_new ();
300       data->loop = g_main_loop_new (data->context, FALSE);
301       data->thread = g_thread_new ("gdbus",
302                                    gdbus_shared_thread_func,
303                                    data);
304       /* We can cast between gsize and gpointer safely */
305       g_once_init_leave (&shared_thread_data, (gsize) data);
306     }
307
308   ret = (SharedThreadData*) shared_thread_data;
309   g_atomic_int_inc (&ret->refcount);
310   return ret;
311 }
312
313 static void
314 _g_dbus_shared_thread_unref (SharedThreadData *data)
315 {
316   /* TODO: actually destroy the shared thread here */
317 #if 0
318   g_assert (data != NULL);
319   if (g_atomic_int_dec_and_test (&data->refcount))
320     {
321       g_main_loop_quit (data->loop);
322       //g_thread_join (data->thread);
323       g_main_loop_unref (data->loop);
324       g_main_context_unref (data->context);
325     }
326 #endif
327 }
328
329 /* ---------------------------------------------------------------------------------------------------- */
330
331 typedef enum {
332     PENDING_NONE = 0,
333     PENDING_WRITE,
334     PENDING_FLUSH,
335     PENDING_CLOSE
336 } OutputPending;
337
338 struct GDBusWorker
339 {
340   volatile gint                       ref_count;
341
342   SharedThreadData                   *shared_thread_data;
343
344   /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
345   volatile gint                       stopped;
346
347   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
348    * only affects messages received from the other peer (since GDBusServer is the
349    * only user) - we might want it to affect messages sent to the other peer too?
350    */
351   gboolean                            frozen;
352   GDBusCapabilityFlags                capabilities;
353   GQueue                             *received_messages_while_frozen;
354
355   GIOStream                          *stream;
356   GCancellable                       *cancellable;
357   GDBusWorkerMessageReceivedCallback  message_received_callback;
358   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
359   GDBusWorkerDisconnectedCallback     disconnected_callback;
360   gpointer                            user_data;
361
362   /* if not NULL, stream is GSocketConnection */
363   GSocket *socket;
364
365   /* used for reading */
366   GMutex                              read_lock;
367   gchar                              *read_buffer;
368   gsize                               read_buffer_allocated_size;
369   gsize                               read_buffer_cur_size;
370   gsize                               read_buffer_bytes_wanted;
371   GUnixFDList                        *read_fd_list;
372   GSocketControlMessage             **read_ancillary_messages;
373   gint                                read_num_ancillary_messages;
374
375   /* Whether an async write, flush or close, or none of those, is pending.
376    * Only the worker thread may change its value, and only with the write_lock.
377    * Other threads may read its value when holding the write_lock.
378    * The worker thread may read its value at any time.
379    */
380   OutputPending                       output_pending;
381   /* used for writing */
382   GMutex                              write_lock;
383   /* queue of MessageToWriteData, protected by write_lock */
384   GQueue                             *write_queue;
385   /* protected by write_lock */
386   guint64                             write_num_messages_written;
387   /* number of messages we'd written out last time we flushed;
388    * protected by write_lock
389    */
390   guint64                             write_num_messages_flushed;
391   /* list of FlushData, protected by write_lock */
392   GList                              *write_pending_flushes;
393   /* list of CloseData, protected by write_lock */
394   GList                              *pending_close_attempts;
395   /* no lock - only used from the worker thread */
396   gboolean                            close_expected;
397 };
398
399 static void _g_dbus_worker_unref (GDBusWorker *worker);
400
401 /* ---------------------------------------------------------------------------------------------------- */
402
403 typedef struct
404 {
405   GMutex  mutex;
406   GCond   cond;
407   guint64 number_to_wait_for;
408   GError *error;
409 } FlushData;
410
411 struct _MessageToWriteData ;
412 typedef struct _MessageToWriteData MessageToWriteData;
413
414 static void message_to_write_data_free (MessageToWriteData *data);
415
416 static void read_message_print_transport_debug (gssize bytes_read,
417                                                 GDBusWorker *worker);
418
419 static void write_message_print_transport_debug (gssize bytes_written,
420                                                  MessageToWriteData *data);
421
422 typedef struct {
423     GDBusWorker *worker;
424     GCancellable *cancellable;
425     GSimpleAsyncResult *result;
426 } CloseData;
427
428 static void close_data_free (CloseData *close_data)
429 {
430   if (close_data->cancellable != NULL)
431     g_object_unref (close_data->cancellable);
432
433   if (close_data->result != NULL)
434     g_object_unref (close_data->result);
435
436   _g_dbus_worker_unref (close_data->worker);
437   g_slice_free (CloseData, close_data);
438 }
439
440 /* ---------------------------------------------------------------------------------------------------- */
441
442 static GDBusWorker *
443 _g_dbus_worker_ref (GDBusWorker *worker)
444 {
445   g_atomic_int_inc (&worker->ref_count);
446   return worker;
447 }
448
449 static void
450 _g_dbus_worker_unref (GDBusWorker *worker)
451 {
452   if (g_atomic_int_dec_and_test (&worker->ref_count))
453     {
454       g_assert (worker->write_pending_flushes == NULL);
455
456       _g_dbus_shared_thread_unref (worker->shared_thread_data);
457
458       g_object_unref (worker->stream);
459
460       g_mutex_clear (&worker->read_lock);
461       g_object_unref (worker->cancellable);
462       if (worker->read_fd_list != NULL)
463         g_object_unref (worker->read_fd_list);
464
465       g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
466       g_mutex_clear (&worker->write_lock);
467       g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
468       g_free (worker->read_buffer);
469
470       g_free (worker);
471     }
472 }
473
474 static void
475 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
476                                   gboolean      remote_peer_vanished,
477                                   GError       *error)
478 {
479   if (!g_atomic_int_get (&worker->stopped))
480     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
481 }
482
483 static void
484 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
485                                       GDBusMessage *message)
486 {
487   if (!g_atomic_int_get (&worker->stopped))
488     worker->message_received_callback (worker, message, worker->user_data);
489 }
490
491 static GDBusMessage *
492 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
493                                               GDBusMessage *message)
494 {
495   GDBusMessage *ret;
496   if (!g_atomic_int_get (&worker->stopped))
497     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
498   else
499     ret = message;
500   return ret;
501 }
502
503 /* can only be called from private thread with read-lock held - takes ownership of @message */
504 static void
505 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
506                                                   GDBusMessage *message)
507 {
508   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
509     {
510       /* queue up */
511       g_queue_push_tail (worker->received_messages_while_frozen, message);
512     }
513   else
514     {
515       /* not frozen, nor anything in queue */
516       _g_dbus_worker_emit_message_received (worker, message);
517       g_object_unref (message);
518     }
519 }
520
521 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
522 static gboolean
523 unfreeze_in_idle_cb (gpointer user_data)
524 {
525   GDBusWorker *worker = user_data;
526   GDBusMessage *message;
527
528   g_mutex_lock (&worker->read_lock);
529   if (worker->frozen)
530     {
531       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
532         {
533           _g_dbus_worker_emit_message_received (worker, message);
534           g_object_unref (message);
535         }
536       worker->frozen = FALSE;
537     }
538   else
539     {
540       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
541     }
542   g_mutex_unlock (&worker->read_lock);
543   return FALSE;
544 }
545
546 /* can be called from any thread */
547 void
548 _g_dbus_worker_unfreeze (GDBusWorker *worker)
549 {
550   GSource *idle_source;
551   idle_source = g_idle_source_new ();
552   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
553   g_source_set_callback (idle_source,
554                          unfreeze_in_idle_cb,
555                          _g_dbus_worker_ref (worker),
556                          (GDestroyNotify) _g_dbus_worker_unref);
557   g_source_attach (idle_source, worker->shared_thread_data->context);
558   g_source_unref (idle_source);
559 }
560
561 /* ---------------------------------------------------------------------------------------------------- */
562
563 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
564
565 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
566 static void
567 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
568                            GAsyncResult  *res,
569                            gpointer       user_data)
570 {
571   GDBusWorker *worker = user_data;
572   GError *error;
573   gssize bytes_read;
574
575   g_mutex_lock (&worker->read_lock);
576
577   /* If already stopped, don't even process the reply */
578   if (g_atomic_int_get (&worker->stopped))
579     goto out;
580
581   error = NULL;
582   if (worker->socket == NULL)
583     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
584                                              res,
585                                              &error);
586   else
587     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
588                                                               res,
589                                                               &error);
590   if (worker->read_num_ancillary_messages > 0)
591     {
592       gint n;
593       for (n = 0; n < worker->read_num_ancillary_messages; n++)
594         {
595           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
596
597           if (FALSE)
598             {
599             }
600 #ifdef G_OS_UNIX
601           else if (G_IS_UNIX_FD_MESSAGE (control_message))
602             {
603               GUnixFDMessage *fd_message;
604               gint *fds;
605               gint num_fds;
606
607               fd_message = G_UNIX_FD_MESSAGE (control_message);
608               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
609               if (worker->read_fd_list == NULL)
610                 {
611                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
612                 }
613               else
614                 {
615                   gint n;
616                   for (n = 0; n < num_fds; n++)
617                     {
618                       /* TODO: really want a append_steal() */
619                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
620                       (void) g_close (fds[n], NULL);
621                     }
622                 }
623               g_free (fds);
624             }
625           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
626             {
627               /* do nothing */
628             }
629 #endif
630           else
631             {
632               if (error == NULL)
633                 {
634                   g_set_error (&error,
635                                G_IO_ERROR,
636                                G_IO_ERROR_FAILED,
637                                "Unexpected ancillary message of type %s received from peer",
638                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
639                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
640                   g_error_free (error);
641                   g_object_unref (control_message);
642                   n++;
643                   while (n < worker->read_num_ancillary_messages)
644                     g_object_unref (worker->read_ancillary_messages[n++]);
645                   g_free (worker->read_ancillary_messages);
646                   goto out;
647                 }
648             }
649           g_object_unref (control_message);
650         }
651       g_free (worker->read_ancillary_messages);
652     }
653
654   if (bytes_read == -1)
655     {
656       if (G_UNLIKELY (_g_dbus_debug_transport ()))
657         {
658           _g_dbus_debug_print_lock ();
659           g_print ("========================================================================\n"
660                    "GDBus-debug:Transport:\n"
661                    "  ---- READ ERROR on stream of type %s:\n"
662                    "  ---- %s %d: %s\n",
663                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
664                    g_quark_to_string (error->domain), error->code,
665                    error->message);
666           _g_dbus_debug_print_unlock ();
667         }
668
669       /* Every async read that uses this callback uses worker->cancellable
670        * as its GCancellable. worker->cancellable gets cancelled if and only
671        * if the GDBusConnection tells us to close (either via
672        * _g_dbus_worker_stop, which is called on last-unref, or directly),
673        * so a cancelled read must mean our connection was closed locally.
674        *
675        * If we're closing, other errors are possible - notably,
676        * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
677        * read in-flight. It seems sensible to treat all read errors during
678        * closing as an expected thing that doesn't trip exit-on-close.
679        *
680        * Because close_expected can't be set until we get into the worker
681        * thread, but the cancellable is signalled sooner (from another
682        * thread), we do still need to check the error.
683        */
684       if (worker->close_expected ||
685           g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
686         _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
687       else
688         _g_dbus_worker_emit_disconnected (worker, TRUE, error);
689
690       g_error_free (error);
691       goto out;
692     }
693
694 #if 0
695   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
696            (gint) bytes_read,
697            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
698            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
699            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
700                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
701            worker->stream,
702            worker);
703 #endif
704
705   /* TODO: hmm, hmm... */
706   if (bytes_read == 0)
707     {
708       g_set_error (&error,
709                    G_IO_ERROR,
710                    G_IO_ERROR_FAILED,
711                    "Underlying GIOStream returned 0 bytes on an async read");
712       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
713       g_error_free (error);
714       goto out;
715     }
716
717   read_message_print_transport_debug (bytes_read, worker);
718
719   worker->read_buffer_cur_size += bytes_read;
720   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
721     {
722       /* OK, got what we asked for! */
723       if (worker->read_buffer_bytes_wanted == 16)
724         {
725           gssize message_len;
726           /* OK, got the header - determine how many more bytes are needed */
727           error = NULL;
728           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
729                                                      16,
730                                                      &error);
731           if (message_len == -1)
732             {
733               g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
734               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
735               g_error_free (error);
736               goto out;
737             }
738
739           worker->read_buffer_bytes_wanted = message_len;
740           _g_dbus_worker_do_read_unlocked (worker);
741         }
742       else
743         {
744           GDBusMessage *message;
745           error = NULL;
746
747           /* TODO: use connection->priv->auth to decode the message */
748
749           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
750                                                   worker->read_buffer_cur_size,
751                                                   worker->capabilities,
752                                                   &error);
753           if (message == NULL)
754             {
755               gchar *s;
756               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
757               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
758                          "The error is: %s\n"
759                          "The payload is as follows:\n"
760                          "%s\n",
761                          worker->read_buffer_cur_size,
762                          error->message,
763                          s);
764               g_free (s);
765               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
766               g_error_free (error);
767               goto out;
768             }
769
770 #ifdef G_OS_UNIX
771           if (worker->read_fd_list != NULL)
772             {
773               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
774               g_object_unref (worker->read_fd_list);
775               worker->read_fd_list = NULL;
776             }
777 #endif
778
779           if (G_UNLIKELY (_g_dbus_debug_message ()))
780             {
781               gchar *s;
782               _g_dbus_debug_print_lock ();
783               g_print ("========================================================================\n"
784                        "GDBus-debug:Message:\n"
785                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
786                        worker->read_buffer_cur_size);
787               s = g_dbus_message_print (message, 2);
788               g_print ("%s", s);
789               g_free (s);
790               if (G_UNLIKELY (_g_dbus_debug_payload ()))
791                 {
792                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
793                   g_print ("%s\n", s);
794                   g_free (s);
795                 }
796               _g_dbus_debug_print_unlock ();
797             }
798
799           /* yay, got a message, go deliver it */
800           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
801
802           /* start reading another message! */
803           worker->read_buffer_bytes_wanted = 0;
804           worker->read_buffer_cur_size = 0;
805           _g_dbus_worker_do_read_unlocked (worker);
806         }
807     }
808   else
809     {
810       /* didn't get all the bytes we requested - so repeat the request... */
811       _g_dbus_worker_do_read_unlocked (worker);
812     }
813
814  out:
815   g_mutex_unlock (&worker->read_lock);
816
817   /* gives up the reference acquired when calling g_input_stream_read_async() */
818   _g_dbus_worker_unref (worker);
819 }
820
821 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
822 static void
823 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
824 {
825   /* Note that we do need to keep trying to read even if close_expected is
826    * true, because only failing a read causes us to signal 'closed'.
827    */
828
829   /* if bytes_wanted is zero, it means start reading a message */
830   if (worker->read_buffer_bytes_wanted == 0)
831     {
832       worker->read_buffer_cur_size = 0;
833       worker->read_buffer_bytes_wanted = 16;
834     }
835
836   /* ensure we have a (big enough) buffer */
837   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
838     {
839       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
840       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
841       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
842     }
843
844   if (worker->socket == NULL)
845     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
846                                worker->read_buffer + worker->read_buffer_cur_size,
847                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
848                                G_PRIORITY_DEFAULT,
849                                worker->cancellable,
850                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
851                                _g_dbus_worker_ref (worker));
852   else
853     {
854       worker->read_ancillary_messages = NULL;
855       worker->read_num_ancillary_messages = 0;
856       _g_socket_read_with_control_messages (worker->socket,
857                                             worker->read_buffer + worker->read_buffer_cur_size,
858                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
859                                             &worker->read_ancillary_messages,
860                                             &worker->read_num_ancillary_messages,
861                                             G_PRIORITY_DEFAULT,
862                                             worker->cancellable,
863                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
864                                             _g_dbus_worker_ref (worker));
865     }
866 }
867
868 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
869 static gboolean
870 _g_dbus_worker_do_initial_read (gpointer data)
871 {
872   GDBusWorker *worker = data;
873   g_mutex_lock (&worker->read_lock);
874   _g_dbus_worker_do_read_unlocked (worker);
875   g_mutex_unlock (&worker->read_lock);
876   return FALSE;
877 }
878
879 /* ---------------------------------------------------------------------------------------------------- */
880
881 struct _MessageToWriteData
882 {
883   GDBusWorker  *worker;
884   GDBusMessage *message;
885   gchar        *blob;
886   gsize         blob_size;
887
888   gsize               total_written;
889   GSimpleAsyncResult *simple;
890
891 };
892
893 static void
894 message_to_write_data_free (MessageToWriteData *data)
895 {
896   _g_dbus_worker_unref (data->worker);
897   if (data->message)
898     g_object_unref (data->message);
899   g_free (data->blob);
900   g_free (data);
901 }
902
903 /* ---------------------------------------------------------------------------------------------------- */
904
905 static void write_message_continue_writing (MessageToWriteData *data);
906
907 /* called in private thread shared by all GDBusConnection instances
908  *
909  * write-lock is not held on entry
910  * output_pending is PENDING_WRITE on entry
911  */
912 static void
913 write_message_async_cb (GObject      *source_object,
914                         GAsyncResult *res,
915                         gpointer      user_data)
916 {
917   MessageToWriteData *data = user_data;
918   GSimpleAsyncResult *simple;
919   gssize bytes_written;
920   GError *error;
921
922   /* Note: we can't access data->simple after calling g_async_result_complete () because the
923    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
924    */
925   simple = data->simple;
926
927   error = NULL;
928   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
929                                                 res,
930                                                 &error);
931   if (bytes_written == -1)
932     {
933       g_simple_async_result_take_error (simple, error);
934       g_simple_async_result_complete (simple);
935       g_object_unref (simple);
936       goto out;
937     }
938   g_assert (bytes_written > 0); /* zero is never returned */
939
940   write_message_print_transport_debug (bytes_written, data);
941
942   data->total_written += bytes_written;
943   g_assert (data->total_written <= data->blob_size);
944   if (data->total_written == data->blob_size)
945     {
946       g_simple_async_result_complete (simple);
947       g_object_unref (simple);
948       goto out;
949     }
950
951   write_message_continue_writing (data);
952
953  out:
954   ;
955 }
956
957 /* called in private thread shared by all GDBusConnection instances
958  *
959  * write-lock is not held on entry
960  * output_pending is PENDING_WRITE on entry
961  */
962 #ifdef G_OS_UNIX
963 static gboolean
964 on_socket_ready (GSocket      *socket,
965                  GIOCondition  condition,
966                  gpointer      user_data)
967 {
968   MessageToWriteData *data = user_data;
969   write_message_continue_writing (data);
970   return FALSE; /* remove source */
971 }
972 #endif
973
974 /* called in private thread shared by all GDBusConnection instances
975  *
976  * write-lock is not held on entry
977  * output_pending is PENDING_WRITE on entry
978  */
979 static void
980 write_message_continue_writing (MessageToWriteData *data)
981 {
982   GOutputStream *ostream;
983 #ifdef G_OS_UNIX
984   GSimpleAsyncResult *simple;
985   GUnixFDList *fd_list;
986 #endif
987
988 #ifdef G_OS_UNIX
989   /* Note: we can't access data->simple after calling g_async_result_complete () because the
990    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
991    */
992   simple = data->simple;
993 #endif
994
995   ostream = g_io_stream_get_output_stream (data->worker->stream);
996 #ifdef G_OS_UNIX
997   fd_list = g_dbus_message_get_unix_fd_list (data->message);
998 #endif
999
1000   g_assert (!g_output_stream_has_pending (ostream));
1001   g_assert_cmpint (data->total_written, <, data->blob_size);
1002
1003   if (FALSE)
1004     {
1005     }
1006 #ifdef G_OS_UNIX
1007   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
1008     {
1009       GOutputVector vector;
1010       GSocketControlMessage *control_message;
1011       gssize bytes_written;
1012       GError *error;
1013
1014       vector.buffer = data->blob;
1015       vector.size = data->blob_size;
1016
1017       control_message = NULL;
1018       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
1019         {
1020           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
1021             {
1022               g_simple_async_result_set_error (simple,
1023                                                G_IO_ERROR,
1024                                                G_IO_ERROR_FAILED,
1025                                                "Tried sending a file descriptor but remote peer does not support this capability");
1026               g_simple_async_result_complete (simple);
1027               g_object_unref (simple);
1028               goto out;
1029             }
1030           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
1031         }
1032
1033       error = NULL;
1034       bytes_written = g_socket_send_message (data->worker->socket,
1035                                              NULL, /* address */
1036                                              &vector,
1037                                              1,
1038                                              control_message != NULL ? &control_message : NULL,
1039                                              control_message != NULL ? 1 : 0,
1040                                              G_SOCKET_MSG_NONE,
1041                                              data->worker->cancellable,
1042                                              &error);
1043       if (control_message != NULL)
1044         g_object_unref (control_message);
1045
1046       if (bytes_written == -1)
1047         {
1048           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1049           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1050             {
1051               GSource *source;
1052               source = g_socket_create_source (data->worker->socket,
1053                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
1054                                                data->worker->cancellable);
1055               g_source_set_callback (source,
1056                                      (GSourceFunc) on_socket_ready,
1057                                      data,
1058                                      NULL); /* GDestroyNotify */
1059               g_source_attach (source, g_main_context_get_thread_default ());
1060               g_source_unref (source);
1061               g_error_free (error);
1062               goto out;
1063             }
1064           g_simple_async_result_take_error (simple, error);
1065           g_simple_async_result_complete (simple);
1066           g_object_unref (simple);
1067           goto out;
1068         }
1069       g_assert (bytes_written > 0); /* zero is never returned */
1070
1071       write_message_print_transport_debug (bytes_written, data);
1072
1073       data->total_written += bytes_written;
1074       g_assert (data->total_written <= data->blob_size);
1075       if (data->total_written == data->blob_size)
1076         {
1077           g_simple_async_result_complete (simple);
1078           g_object_unref (simple);
1079           goto out;
1080         }
1081
1082       write_message_continue_writing (data);
1083     }
1084 #endif
1085   else
1086     {
1087 #ifdef G_OS_UNIX
1088       if (fd_list != NULL)
1089         {
1090           g_simple_async_result_set_error (simple,
1091                                            G_IO_ERROR,
1092                                            G_IO_ERROR_FAILED,
1093                                            "Tried sending a file descriptor on unsupported stream of type %s",
1094                                            g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1095           g_simple_async_result_complete (simple);
1096           g_object_unref (simple);
1097           goto out;
1098         }
1099 #endif
1100
1101       g_output_stream_write_async (ostream,
1102                                    (const gchar *) data->blob + data->total_written,
1103                                    data->blob_size - data->total_written,
1104                                    G_PRIORITY_DEFAULT,
1105                                    data->worker->cancellable,
1106                                    write_message_async_cb,
1107                                    data);
1108     }
1109 #ifdef G_OS_UNIX
1110  out:
1111 #endif
1112   ;
1113 }
1114
1115 /* called in private thread shared by all GDBusConnection instances
1116  *
1117  * write-lock is not held on entry
1118  * output_pending is PENDING_WRITE on entry
1119  */
1120 static void
1121 write_message_async (GDBusWorker         *worker,
1122                      MessageToWriteData  *data,
1123                      GAsyncReadyCallback  callback,
1124                      gpointer             user_data)
1125 {
1126   data->simple = g_simple_async_result_new (NULL,
1127                                             callback,
1128                                             user_data,
1129                                             write_message_async);
1130   data->total_written = 0;
1131   write_message_continue_writing (data);
1132 }
1133
1134 /* called in private thread shared by all GDBusConnection instances (with write-lock held) */
1135 static gboolean
1136 write_message_finish (GAsyncResult   *res,
1137                       GError        **error)
1138 {
1139   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1140   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1141     return FALSE;
1142   else
1143     return TRUE;
1144 }
1145 /* ---------------------------------------------------------------------------------------------------- */
1146
1147 static void continue_writing (GDBusWorker *worker);
1148
1149 typedef struct
1150 {
1151   GDBusWorker *worker;
1152   GList *flushers;
1153 } FlushAsyncData;
1154
1155 static void
1156 flush_data_list_complete (const GList  *flushers,
1157                           const GError *error)
1158 {
1159   const GList *l;
1160
1161   for (l = flushers; l != NULL; l = l->next)
1162     {
1163       FlushData *f = l->data;
1164
1165       f->error = error != NULL ? g_error_copy (error) : NULL;
1166
1167       g_mutex_lock (&f->mutex);
1168       g_cond_signal (&f->cond);
1169       g_mutex_unlock (&f->mutex);
1170     }
1171 }
1172
1173 /* called in private thread shared by all GDBusConnection instances
1174  *
1175  * write-lock is not held on entry
1176  * output_pending is PENDING_FLUSH on entry
1177  */
1178 static void
1179 ostream_flush_cb (GObject      *source_object,
1180                   GAsyncResult *res,
1181                   gpointer      user_data)
1182 {
1183   FlushAsyncData *data = user_data;
1184   GError *error;
1185
1186   error = NULL;
1187   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1188                                 res,
1189                                 &error);
1190
1191   if (error == NULL)
1192     {
1193       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1194         {
1195           _g_dbus_debug_print_lock ();
1196           g_print ("========================================================================\n"
1197                    "GDBus-debug:Transport:\n"
1198                    "  ---- FLUSHED stream of type %s\n",
1199                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1200           _g_dbus_debug_print_unlock ();
1201         }
1202     }
1203
1204   g_assert (data->flushers != NULL);
1205   flush_data_list_complete (data->flushers, error);
1206   g_list_free (data->flushers);
1207
1208   if (error != NULL)
1209     g_error_free (error);
1210
1211   /* Make sure we tell folks that we don't have additional
1212      flushes pending */
1213   g_mutex_lock (&data->worker->write_lock);
1214   data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
1215   g_assert (data->worker->output_pending == PENDING_FLUSH);
1216   data->worker->output_pending = PENDING_NONE;
1217   g_mutex_unlock (&data->worker->write_lock);
1218
1219   /* OK, cool, finally kick off the next write */
1220   continue_writing (data->worker);
1221
1222   _g_dbus_worker_unref (data->worker);
1223   g_free (data);
1224 }
1225
1226 /* called in private thread shared by all GDBusConnection instances
1227  *
1228  * write-lock is not held on entry
1229  * output_pending is PENDING_FLUSH on entry
1230  */
1231 static void
1232 start_flush (FlushAsyncData *data)
1233 {
1234   g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
1235                                G_PRIORITY_DEFAULT,
1236                                data->worker->cancellable,
1237                                ostream_flush_cb,
1238                                data);
1239 }
1240
1241 /* called in private thread shared by all GDBusConnection instances
1242  *
1243  * write-lock is held on entry
1244  * output_pending is PENDING_NONE on entry
1245  */
1246 static void
1247 message_written_unlocked (GDBusWorker *worker,
1248                           MessageToWriteData *message_data)
1249 {
1250   if (G_UNLIKELY (_g_dbus_debug_message ()))
1251     {
1252       gchar *s;
1253       _g_dbus_debug_print_lock ();
1254       g_print ("========================================================================\n"
1255                "GDBus-debug:Message:\n"
1256                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1257                message_data->blob_size);
1258       s = g_dbus_message_print (message_data->message, 2);
1259       g_print ("%s", s);
1260       g_free (s);
1261       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1262         {
1263           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1264           g_print ("%s\n", s);
1265           g_free (s);
1266         }
1267       _g_dbus_debug_print_unlock ();
1268     }
1269
1270   worker->write_num_messages_written += 1;
1271 }
1272
1273 /* called in private thread shared by all GDBusConnection instances
1274  *
1275  * write-lock is held on entry
1276  * output_pending is PENDING_NONE on entry
1277  *
1278  * Returns: non-%NULL, setting @output_pending, if we need to flush now
1279  */
1280 static FlushAsyncData *
1281 prepare_flush_unlocked (GDBusWorker *worker)
1282 {
1283   GList *l;
1284   GList *ll;
1285   GList *flushers;
1286
1287   flushers = NULL;
1288   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1289     {
1290       FlushData *f = l->data;
1291       ll = l->next;
1292
1293       if (f->number_to_wait_for == worker->write_num_messages_written)
1294         {
1295           flushers = g_list_append (flushers, f);
1296           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1297         }
1298     }
1299   if (flushers != NULL)
1300     {
1301       g_assert (worker->output_pending == PENDING_NONE);
1302       worker->output_pending = PENDING_FLUSH;
1303     }
1304
1305   if (flushers != NULL)
1306     {
1307       FlushAsyncData *data;
1308
1309       data = g_new0 (FlushAsyncData, 1);
1310       data->worker = _g_dbus_worker_ref (worker);
1311       data->flushers = flushers;
1312       return data;
1313     }
1314
1315   return NULL;
1316 }
1317
1318 /* called in private thread shared by all GDBusConnection instances
1319  *
1320  * write-lock is not held on entry
1321  * output_pending is PENDING_WRITE on entry
1322  */
1323 static void
1324 write_message_cb (GObject       *source_object,
1325                   GAsyncResult  *res,
1326                   gpointer       user_data)
1327 {
1328   MessageToWriteData *data = user_data;
1329   GError *error;
1330
1331   g_mutex_lock (&data->worker->write_lock);
1332   g_assert (data->worker->output_pending == PENDING_WRITE);
1333   data->worker->output_pending = PENDING_NONE;
1334
1335   error = NULL;
1336   if (!write_message_finish (res, &error))
1337     {
1338       g_mutex_unlock (&data->worker->write_lock);
1339
1340       /* TODO: handle */
1341       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1342       g_error_free (error);
1343
1344       g_mutex_lock (&data->worker->write_lock);
1345     }
1346
1347   message_written_unlocked (data->worker, data);
1348
1349   g_mutex_unlock (&data->worker->write_lock);
1350
1351   continue_writing (data->worker);
1352
1353   message_to_write_data_free (data);
1354 }
1355
1356 /* called in private thread shared by all GDBusConnection instances
1357  *
1358  * write-lock is not held on entry
1359  * output_pending is PENDING_CLOSE on entry
1360  */
1361 static void
1362 iostream_close_cb (GObject      *source_object,
1363                    GAsyncResult *res,
1364                    gpointer      user_data)
1365 {
1366   GDBusWorker *worker = user_data;
1367   GError *error = NULL;
1368   GList *pending_close_attempts, *pending_flush_attempts;
1369   GQueue *send_queue;
1370
1371   g_io_stream_close_finish (worker->stream, res, &error);
1372
1373   g_mutex_lock (&worker->write_lock);
1374
1375   pending_close_attempts = worker->pending_close_attempts;
1376   worker->pending_close_attempts = NULL;
1377
1378   pending_flush_attempts = worker->write_pending_flushes;
1379   worker->write_pending_flushes = NULL;
1380
1381   send_queue = worker->write_queue;
1382   worker->write_queue = g_queue_new ();
1383
1384   g_assert (worker->output_pending == PENDING_CLOSE);
1385   worker->output_pending = PENDING_NONE;
1386
1387   g_mutex_unlock (&worker->write_lock);
1388
1389   while (pending_close_attempts != NULL)
1390     {
1391       CloseData *close_data = pending_close_attempts->data;
1392
1393       pending_close_attempts = g_list_delete_link (pending_close_attempts,
1394                                                    pending_close_attempts);
1395
1396       if (close_data->result != NULL)
1397         {
1398           if (error != NULL)
1399             g_simple_async_result_set_from_error (close_data->result, error);
1400
1401           /* this must be in an idle because the result is likely to be
1402            * intended for another thread
1403            */
1404           g_simple_async_result_complete_in_idle (close_data->result);
1405         }
1406
1407       close_data_free (close_data);
1408     }
1409
1410   g_clear_error (&error);
1411
1412   /* all messages queued for sending are discarded */
1413   g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
1414   /* all queued flushes fail */
1415   error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
1416                        _("Operation was cancelled"));
1417   flush_data_list_complete (pending_flush_attempts, error);
1418   g_list_free (pending_flush_attempts);
1419   g_clear_error (&error);
1420
1421   _g_dbus_worker_unref (worker);
1422 }
1423
1424 /* called in private thread shared by all GDBusConnection instances
1425  *
1426  * write-lock is not held on entry
1427  * output_pending must be PENDING_NONE on entry
1428  */
1429 static void
1430 continue_writing (GDBusWorker *worker)
1431 {
1432   MessageToWriteData *data;
1433   FlushAsyncData *flush_async_data;
1434
1435  write_next:
1436   /* we mustn't try to write two things at once */
1437   g_assert (worker->output_pending == PENDING_NONE);
1438
1439   g_mutex_lock (&worker->write_lock);
1440
1441   data = NULL;
1442   flush_async_data = NULL;
1443
1444   /* if we want to close the connection, that takes precedence */
1445   if (worker->pending_close_attempts != NULL)
1446     {
1447       worker->close_expected = TRUE;
1448       worker->output_pending = PENDING_CLOSE;
1449
1450       g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
1451                                NULL, iostream_close_cb,
1452                                _g_dbus_worker_ref (worker));
1453     }
1454   else
1455     {
1456       flush_async_data = prepare_flush_unlocked (worker);
1457
1458       if (flush_async_data == NULL)
1459         {
1460           data = g_queue_pop_head (worker->write_queue);
1461
1462           if (data != NULL)
1463             worker->output_pending = PENDING_WRITE;
1464         }
1465     }
1466
1467   g_mutex_unlock (&worker->write_lock);
1468
1469   /* Note that write_lock is only used for protecting the @write_queue
1470    * and @output_pending fields of the GDBusWorker struct ... which we
1471    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1472    *
1473    * Therefore, it's fine to drop it here when calling back into user
1474    * code and then writing the message out onto the GIOStream since this
1475    * function only runs on the worker thread.
1476    */
1477
1478   if (flush_async_data != NULL)
1479     {
1480       start_flush (flush_async_data);
1481       g_assert (data == NULL);
1482     }
1483   else if (data != NULL)
1484     {
1485       GDBusMessage *old_message;
1486       guchar *new_blob;
1487       gsize new_blob_size;
1488       GError *error;
1489
1490       old_message = data->message;
1491       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1492       if (data->message == old_message)
1493         {
1494           /* filters had no effect - do nothing */
1495         }
1496       else if (data->message == NULL)
1497         {
1498           /* filters dropped message */
1499           g_mutex_lock (&worker->write_lock);
1500           worker->output_pending = PENDING_NONE;
1501           g_mutex_unlock (&worker->write_lock);
1502           message_to_write_data_free (data);
1503           goto write_next;
1504         }
1505       else
1506         {
1507           /* filters altered the message -> reencode */
1508           error = NULL;
1509           new_blob = g_dbus_message_to_blob (data->message,
1510                                              &new_blob_size,
1511                                              worker->capabilities,
1512                                              &error);
1513           if (new_blob == NULL)
1514             {
1515               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1516                * the old message instead
1517                */
1518               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1519                          g_dbus_message_get_serial (data->message),
1520                          error->message);
1521               g_error_free (error);
1522             }
1523           else
1524             {
1525               g_free (data->blob);
1526               data->blob = (gchar *) new_blob;
1527               data->blob_size = new_blob_size;
1528             }
1529         }
1530
1531       write_message_async (worker,
1532                            data,
1533                            write_message_cb,
1534                            data);
1535     }
1536 }
1537
1538 /* called in private thread shared by all GDBusConnection instances
1539  *
1540  * write-lock is not held on entry
1541  * output_pending may be anything
1542  */
1543 static gboolean
1544 continue_writing_in_idle_cb (gpointer user_data)
1545 {
1546   GDBusWorker *worker = user_data;
1547
1548   /* Because this is the worker thread, we can read this struct member
1549    * without holding the lock: no other thread ever modifies it.
1550    */
1551   if (worker->output_pending == PENDING_NONE)
1552     continue_writing (worker);
1553
1554   return FALSE;
1555 }
1556
1557 /*
1558  * @write_data: (transfer full) (allow-none):
1559  * @flush_data: (transfer full) (allow-none):
1560  * @close_data: (transfer full) (allow-none):
1561  *
1562  * Can be called from any thread
1563  *
1564  * write_lock is held on entry
1565  * output_pending may be anything
1566  */
1567 static void
1568 schedule_writing_unlocked (GDBusWorker        *worker,
1569                            MessageToWriteData *write_data,
1570                            FlushData          *flush_data,
1571                            CloseData          *close_data)
1572 {
1573   if (write_data != NULL)
1574     g_queue_push_tail (worker->write_queue, write_data);
1575
1576   if (flush_data != NULL)
1577     worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
1578
1579   if (close_data != NULL)
1580     worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
1581                                                      close_data);
1582
1583   /* If we had output pending, the next bit of output will happen
1584    * automatically when it finishes, so we only need to do this
1585    * if nothing was pending.
1586    *
1587    * The idle callback will re-check that output_pending is still
1588    * PENDING_NONE, to guard against output starting before the idle.
1589    */
1590   if (worker->output_pending == PENDING_NONE)
1591     {
1592       GSource *idle_source;
1593       idle_source = g_idle_source_new ();
1594       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1595       g_source_set_callback (idle_source,
1596                              continue_writing_in_idle_cb,
1597                              _g_dbus_worker_ref (worker),
1598                              (GDestroyNotify) _g_dbus_worker_unref);
1599       g_source_attach (idle_source, worker->shared_thread_data->context);
1600       g_source_unref (idle_source);
1601     }
1602 }
1603
1604 /* ---------------------------------------------------------------------------------------------------- */
1605
1606 /* can be called from any thread - steals blob
1607  *
1608  * write_lock is not held on entry
1609  * output_pending may be anything
1610  */
1611 void
1612 _g_dbus_worker_send_message (GDBusWorker    *worker,
1613                              GDBusMessage   *message,
1614                              gchar          *blob,
1615                              gsize           blob_len)
1616 {
1617   MessageToWriteData *data;
1618
1619   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1620   g_return_if_fail (blob != NULL);
1621   g_return_if_fail (blob_len > 16);
1622
1623   data = g_new0 (MessageToWriteData, 1);
1624   data->worker = _g_dbus_worker_ref (worker);
1625   data->message = g_object_ref (message);
1626   data->blob = blob; /* steal! */
1627   data->blob_size = blob_len;
1628
1629   g_mutex_lock (&worker->write_lock);
1630   schedule_writing_unlocked (worker, data, NULL, NULL);
1631   g_mutex_unlock (&worker->write_lock);
1632 }
1633
1634 /* ---------------------------------------------------------------------------------------------------- */
1635
1636 GDBusWorker *
1637 _g_dbus_worker_new (GIOStream                              *stream,
1638                     GDBusCapabilityFlags                    capabilities,
1639                     gboolean                                initially_frozen,
1640                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1641                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1642                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1643                     gpointer                                user_data)
1644 {
1645   GDBusWorker *worker;
1646   GSource *idle_source;
1647
1648   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1649   g_return_val_if_fail (message_received_callback != NULL, NULL);
1650   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1651   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1652
1653   worker = g_new0 (GDBusWorker, 1);
1654   worker->ref_count = 1;
1655
1656   g_mutex_init (&worker->read_lock);
1657   worker->message_received_callback = message_received_callback;
1658   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1659   worker->disconnected_callback = disconnected_callback;
1660   worker->user_data = user_data;
1661   worker->stream = g_object_ref (stream);
1662   worker->capabilities = capabilities;
1663   worker->cancellable = g_cancellable_new ();
1664   worker->output_pending = PENDING_NONE;
1665
1666   worker->frozen = initially_frozen;
1667   worker->received_messages_while_frozen = g_queue_new ();
1668
1669   g_mutex_init (&worker->write_lock);
1670   worker->write_queue = g_queue_new ();
1671
1672   if (G_IS_SOCKET_CONNECTION (worker->stream))
1673     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1674
1675   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1676
1677   /* begin reading */
1678   idle_source = g_idle_source_new ();
1679   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1680   g_source_set_callback (idle_source,
1681                          _g_dbus_worker_do_initial_read,
1682                          _g_dbus_worker_ref (worker),
1683                          (GDestroyNotify) _g_dbus_worker_unref);
1684   g_source_attach (idle_source, worker->shared_thread_data->context);
1685   g_source_unref (idle_source);
1686
1687   return worker;
1688 }
1689
1690 /* ---------------------------------------------------------------------------------------------------- */
1691
1692 /* can be called from any thread
1693  *
1694  * write_lock is not held on entry
1695  * output_pending may be anything
1696  */
1697 void
1698 _g_dbus_worker_close (GDBusWorker         *worker,
1699                       GCancellable        *cancellable,
1700                       GSimpleAsyncResult  *result)
1701 {
1702   CloseData *close_data;
1703
1704   close_data = g_slice_new0 (CloseData);
1705   close_data->worker = _g_dbus_worker_ref (worker);
1706   close_data->cancellable =
1707       (cancellable == NULL ? NULL : g_object_ref (cancellable));
1708   close_data->result = (result == NULL ? NULL : g_object_ref (result));
1709
1710   /* Don't set worker->close_expected here - we're in the wrong thread.
1711    * It'll be set before the actual close happens.
1712    */
1713   g_cancellable_cancel (worker->cancellable);
1714   g_mutex_lock (&worker->write_lock);
1715   schedule_writing_unlocked (worker, NULL, NULL, close_data);
1716   g_mutex_unlock (&worker->write_lock);
1717 }
1718
1719 /* This can be called from any thread - frees worker. Note that
1720  * callbacks might still happen if called from another thread than the
1721  * worker - use your own synchronization primitive in the callbacks.
1722  *
1723  * write_lock is not held on entry
1724  * output_pending may be anything
1725  */
1726 void
1727 _g_dbus_worker_stop (GDBusWorker *worker)
1728 {
1729   g_atomic_int_set (&worker->stopped, TRUE);
1730
1731   /* Cancel any pending operations and schedule a close of the underlying I/O
1732    * stream in the worker thread
1733    */
1734   _g_dbus_worker_close (worker, NULL, NULL);
1735
1736   /* _g_dbus_worker_close holds a ref until after an idle in the worker
1737    * thread has run, so we no longer need to unref in an idle like in
1738    * commit 322e25b535
1739    */
1740   _g_dbus_worker_unref (worker);
1741 }
1742
1743 /* ---------------------------------------------------------------------------------------------------- */
1744
1745 /* can be called from any thread (except the worker thread) - blocks
1746  * calling thread until all queued outgoing messages are written and
1747  * the transport has been flushed
1748  *
1749  * write_lock is not held on entry
1750  * output_pending may be anything
1751  */
1752 gboolean
1753 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1754                            GCancellable   *cancellable,
1755                            GError        **error)
1756 {
1757   gboolean ret;
1758   FlushData *data;
1759   guint64 pending_writes;
1760
1761   data = NULL;
1762   ret = TRUE;
1763
1764   g_mutex_lock (&worker->write_lock);
1765
1766   /* if the queue is empty, no write is in-flight and we haven't written
1767    * anything since the last flush, then there's nothing to wait for
1768    */
1769   pending_writes = g_queue_get_length (worker->write_queue);
1770
1771   /* if a write is in-flight, we shouldn't be satisfied until the first
1772    * flush operation that follows it
1773    */
1774   if (worker->output_pending == PENDING_WRITE)
1775     pending_writes += 1;
1776
1777   if (pending_writes > 0 ||
1778       worker->write_num_messages_written != worker->write_num_messages_flushed)
1779     {
1780       data = g_new0 (FlushData, 1);
1781       g_mutex_init (&data->mutex);
1782       g_cond_init (&data->cond);
1783       data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
1784       g_mutex_lock (&data->mutex);
1785
1786       schedule_writing_unlocked (worker, NULL, data, NULL);
1787     }
1788   g_mutex_unlock (&worker->write_lock);
1789
1790   if (data != NULL)
1791     {
1792       g_cond_wait (&data->cond, &data->mutex);
1793       g_mutex_unlock (&data->mutex);
1794
1795       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1796       g_cond_clear (&data->cond);
1797       g_mutex_clear (&data->mutex);
1798       if (data->error != NULL)
1799         {
1800           ret = FALSE;
1801           g_propagate_error (error, data->error);
1802         }
1803       g_free (data);
1804     }
1805
1806   return ret;
1807 }
1808
1809 /* ---------------------------------------------------------------------------------------------------- */
1810
1811 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1812 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1813 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1814 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1815 #define G_DBUS_DEBUG_CALL           (1<<4)
1816 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1817 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1818 #define G_DBUS_DEBUG_RETURN         (1<<7)
1819 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1820 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1821
1822 static gint _gdbus_debug_flags = 0;
1823
1824 gboolean
1825 _g_dbus_debug_authentication (void)
1826 {
1827   _g_dbus_initialize ();
1828   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1829 }
1830
1831 gboolean
1832 _g_dbus_debug_transport (void)
1833 {
1834   _g_dbus_initialize ();
1835   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1836 }
1837
1838 gboolean
1839 _g_dbus_debug_message (void)
1840 {
1841   _g_dbus_initialize ();
1842   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1843 }
1844
1845 gboolean
1846 _g_dbus_debug_payload (void)
1847 {
1848   _g_dbus_initialize ();
1849   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1850 }
1851
1852 gboolean
1853 _g_dbus_debug_call (void)
1854 {
1855   _g_dbus_initialize ();
1856   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1857 }
1858
1859 gboolean
1860 _g_dbus_debug_signal (void)
1861 {
1862   _g_dbus_initialize ();
1863   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1864 }
1865
1866 gboolean
1867 _g_dbus_debug_incoming (void)
1868 {
1869   _g_dbus_initialize ();
1870   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1871 }
1872
1873 gboolean
1874 _g_dbus_debug_return (void)
1875 {
1876   _g_dbus_initialize ();
1877   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1878 }
1879
1880 gboolean
1881 _g_dbus_debug_emission (void)
1882 {
1883   _g_dbus_initialize ();
1884   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1885 }
1886
1887 gboolean
1888 _g_dbus_debug_address (void)
1889 {
1890   _g_dbus_initialize ();
1891   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1892 }
1893
1894 G_LOCK_DEFINE_STATIC (print_lock);
1895
1896 void
1897 _g_dbus_debug_print_lock (void)
1898 {
1899   G_LOCK (print_lock);
1900 }
1901
1902 void
1903 _g_dbus_debug_print_unlock (void)
1904 {
1905   G_UNLOCK (print_lock);
1906 }
1907
1908 /*
1909  * _g_dbus_initialize:
1910  *
1911  * Does various one-time init things such as
1912  *
1913  *  - registering the G_DBUS_ERROR error domain
1914  *  - parses the G_DBUS_DEBUG environment variable
1915  */
1916 void
1917 _g_dbus_initialize (void)
1918 {
1919   static volatile gsize initialized = 0;
1920
1921   if (g_once_init_enter (&initialized))
1922     {
1923       volatile GQuark g_dbus_error_domain;
1924       const gchar *debug;
1925
1926       g_dbus_error_domain = G_DBUS_ERROR;
1927       (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
1928
1929       debug = g_getenv ("G_DBUS_DEBUG");
1930       if (debug != NULL)
1931         {
1932           const GDebugKey keys[] = {
1933             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1934             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
1935             { "message",        G_DBUS_DEBUG_MESSAGE        },
1936             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
1937             { "call",           G_DBUS_DEBUG_CALL           },
1938             { "signal",         G_DBUS_DEBUG_SIGNAL         },
1939             { "incoming",       G_DBUS_DEBUG_INCOMING       },
1940             { "return",         G_DBUS_DEBUG_RETURN         },
1941             { "emission",       G_DBUS_DEBUG_EMISSION       },
1942             { "address",        G_DBUS_DEBUG_ADDRESS        }
1943           };
1944
1945           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1946           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1947             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1948         }
1949
1950       g_once_init_leave (&initialized, 1);
1951     }
1952 }
1953
1954 /* ---------------------------------------------------------------------------------------------------- */
1955
1956 GVariantType *
1957 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1958 {
1959   const GVariantType *arg_types[256];
1960   guint n;
1961
1962   if (args)
1963     for (n = 0; args[n] != NULL; n++)
1964       {
1965         /* DBus places a hard limit of 255 on signature length.
1966          * therefore number of args must be less than 256.
1967          */
1968         g_assert (n < 256);
1969
1970         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1971
1972         if G_UNLIKELY (arg_types[n] == NULL)
1973           return NULL;
1974       }
1975   else
1976     n = 0;
1977
1978   return g_variant_type_new_tuple (arg_types, n);
1979 }
1980
1981 /* ---------------------------------------------------------------------------------------------------- */
1982
1983 #ifdef G_OS_WIN32
1984
1985 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1986
1987 gchar *
1988 _g_dbus_win32_get_user_sid (void)
1989 {
1990   HANDLE h;
1991   TOKEN_USER *user;
1992   DWORD token_information_len;
1993   PSID psid;
1994   gchar *sid;
1995   gchar *ret;
1996
1997   ret = NULL;
1998   user = NULL;
1999   h = INVALID_HANDLE_VALUE;
2000
2001   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
2002     {
2003       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
2004       goto out;
2005     }
2006
2007   /* Get length of buffer */
2008   token_information_len = 0;
2009   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
2010     {
2011       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
2012         {
2013           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2014           goto out;
2015         }
2016     }
2017   user = g_malloc (token_information_len);
2018   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
2019     {
2020       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2021       goto out;
2022     }
2023
2024   psid = user->User.Sid;
2025   if (!IsValidSid (psid))
2026     {
2027       g_warning ("Invalid SID");
2028       goto out;
2029     }
2030
2031   if (!ConvertSidToStringSidA (psid, &sid))
2032     {
2033       g_warning ("Invalid SID");
2034       goto out;
2035     }
2036
2037   ret = g_strdup (sid);
2038   LocalFree (sid);
2039
2040 out:
2041   g_free (user);
2042   if (h != INVALID_HANDLE_VALUE)
2043     CloseHandle (h);
2044   return ret;
2045 }
2046 #endif
2047
2048 /* ---------------------------------------------------------------------------------------------------- */
2049
2050 gchar *
2051 _g_dbus_get_machine_id (GError **error)
2052 {
2053 #ifdef G_OS_WIN32
2054   HW_PROFILE_INFOA info;
2055   char *src, *dest, *res;
2056   int i;
2057
2058   if (!GetCurrentHwProfileA (&info))
2059     {
2060       char *message = g_win32_error_message (GetLastError ());
2061       g_set_error (error,
2062                    G_IO_ERROR,
2063                    G_IO_ERROR_FAILED,
2064                    _("Unable to get Hardware profile: %s"), message);
2065       g_free (message);
2066       return NULL;
2067     }
2068
2069   /* Form: {12340001-4980-1920-6788-123456789012} */
2070   src = &info.szHwProfileGuid[0];
2071
2072   res = g_malloc (32+1);
2073   dest = res;
2074
2075   src++; /* Skip { */
2076   for (i = 0; i < 8; i++)
2077     *dest++ = *src++;
2078   src++; /* Skip - */
2079   for (i = 0; i < 4; i++)
2080     *dest++ = *src++;
2081   src++; /* Skip - */
2082   for (i = 0; i < 4; i++)
2083     *dest++ = *src++;
2084   src++; /* Skip - */
2085   for (i = 0; i < 4; i++)
2086     *dest++ = *src++;
2087   src++; /* Skip - */
2088   for (i = 0; i < 12; i++)
2089     *dest++ = *src++;
2090   *dest = 0;
2091
2092   return res;
2093 #else
2094   gchar *ret;
2095   GError *first_error;
2096   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
2097   ret = NULL;
2098   first_error = NULL;
2099   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
2100                             &ret,
2101                             NULL,
2102                             &first_error) &&
2103       !g_file_get_contents ("/etc/machine-id",
2104                             &ret,
2105                             NULL,
2106                             NULL))
2107     {
2108       g_propagate_prefixed_error (error, first_error,
2109                                   _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
2110     }
2111   else
2112     {
2113       /* ignore the error from the first try, if any */
2114       g_clear_error (&first_error);
2115       /* TODO: validate value */
2116       g_strstrip (ret);
2117     }
2118   return ret;
2119 #endif
2120 }
2121
2122 /* ---------------------------------------------------------------------------------------------------- */
2123
2124 gchar *
2125 _g_dbus_enum_to_string (GType enum_type, gint value)
2126 {
2127   gchar *ret;
2128   GEnumClass *klass;
2129   GEnumValue *enum_value;
2130
2131   klass = g_type_class_ref (enum_type);
2132   enum_value = g_enum_get_value (klass, value);
2133   if (enum_value != NULL)
2134     ret = g_strdup (enum_value->value_nick);
2135   else
2136     ret = g_strdup_printf ("unknown (value %d)", value);
2137   g_type_class_unref (klass);
2138   return ret;
2139 }
2140
2141 /* ---------------------------------------------------------------------------------------------------- */
2142
2143 static void
2144 write_message_print_transport_debug (gssize bytes_written,
2145                                      MessageToWriteData *data)
2146 {
2147   if (G_LIKELY (!_g_dbus_debug_transport ()))
2148     goto out;
2149
2150   _g_dbus_debug_print_lock ();
2151   g_print ("========================================================================\n"
2152            "GDBus-debug:Transport:\n"
2153            "  >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2154            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
2155            bytes_written,
2156            g_dbus_message_get_serial (data->message),
2157            data->blob_size,
2158            data->total_written,
2159            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
2160   _g_dbus_debug_print_unlock ();
2161  out:
2162   ;
2163 }
2164
2165 /* ---------------------------------------------------------------------------------------------------- */
2166
2167 static void
2168 read_message_print_transport_debug (gssize bytes_read,
2169                                     GDBusWorker *worker)
2170 {
2171   gsize size;
2172   gint32 serial;
2173   gint32 message_length;
2174
2175   if (G_LIKELY (!_g_dbus_debug_transport ()))
2176     goto out;
2177
2178   size = bytes_read + worker->read_buffer_cur_size;
2179   serial = 0;
2180   message_length = 0;
2181   if (size >= 16)
2182     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
2183   if (size >= 1)
2184     {
2185       switch (worker->read_buffer[0])
2186         {
2187         case 'l':
2188           if (size >= 12)
2189             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
2190           break;
2191         case 'B':
2192           if (size >= 12)
2193             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
2194           break;
2195         default:
2196           /* an error will be set elsewhere if this happens */
2197           goto out;
2198         }
2199     }
2200
2201     _g_dbus_debug_print_lock ();
2202   g_print ("========================================================================\n"
2203            "GDBus-debug:Transport:\n"
2204            "  <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2205            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
2206            bytes_read,
2207            serial,
2208            message_length,
2209            worker->read_buffer_cur_size,
2210            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
2211   _g_dbus_debug_print_unlock ();
2212  out:
2213   ;
2214 }
2215
2216 /* ---------------------------------------------------------------------------------------------------- */
2217
2218 gboolean
2219 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
2220                                      GValue                *return_accu,
2221                                      const GValue          *handler_return,
2222                                      gpointer               dummy)
2223 {
2224   gboolean continue_emission;
2225   gboolean signal_return;
2226
2227   signal_return = g_value_get_boolean (handler_return);
2228   g_value_set_boolean (return_accu, signal_return);
2229   continue_emission = signal_return;
2230
2231   return continue_emission;
2232 }