[kdbus] Integrate kdbus core with new API.
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: David Zeuthen <davidz@redhat.com>
19  */
20
21 #include "config.h"
22
23 #include <stdlib.h>
24 #include <string.h>
25
26 #include "giotypes.h"
27 #include "gsocket.h"
28 #include "gdbusprivate.h"
29 #include "gdbusmessage.h"
30 #include "gdbuserror.h"
31 #include "gdbusintrospection.h"
32 #include "gasyncresult.h"
33 #include "gsimpleasyncresult.h"
34 #include "ginputstream.h"
35 #include "gmemoryinputstream.h"
36 #include "giostream.h"
37 #include "glib/gstdio.h"
38 #include "gsocketcontrolmessage.h"
39 #include "gsocketconnection.h"
40 #include "gsocketoutputstream.h"
41
42 #ifdef G_OS_UNIX
43 #include "gkdbus.h"
44 #include "gkdbusconnection.h"
45 #include "gunixfdmessage.h"
46 #include "gunixconnection.h"
47 #include "gunixcredentialsmessage.h"
48 #endif
49
50 #ifdef G_OS_WIN32
51 #include <windows.h>
52 #endif
53
54 #include "glibintl.h"
55
56 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
57
58 /* ---------------------------------------------------------------------------------------------------- */
59
60 gchar *
61 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
62 {
63  guint n, m;
64  GString *ret;
65
66  ret = g_string_new (NULL);
67
68  for (n = 0; n < len; n += 16)
69    {
70      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
71
72      for (m = n; m < n + 16; m++)
73        {
74          if (m > n && (m%4) == 0)
75            g_string_append_c (ret, ' ');
76          if (m < len)
77            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
78          else
79            g_string_append (ret, "   ");
80        }
81
82      g_string_append (ret, "   ");
83
84      for (m = n; m < len && m < n + 16; m++)
85        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
86
87      g_string_append_c (ret, '\n');
88    }
89
90  return g_string_free (ret, FALSE);
91 }
92
93 /* ---------------------------------------------------------------------------------------------------- */
94
95 /* Unfortunately ancillary messages are discarded when reading from a
96  * socket using the GSocketInputStream abstraction. So we provide a
97  * very GInputStream-ish API that uses GSocket in this case (very
98  * similar to GSocketInputStream).
99  */
100
101 typedef struct
102 {
103   GSocket *socket;
104   GCancellable *cancellable;
105
106   void *buffer;
107   gsize count;
108
109   GSocketControlMessage ***messages;
110   gint *num_messages;
111
112   GSimpleAsyncResult *simple;
113
114   gboolean from_mainloop;
115 } ReadWithControlData;
116
117 static void
118 read_with_control_data_free (ReadWithControlData *data)
119 {
120   g_object_unref (data->socket);
121   if (data->cancellable != NULL)
122     g_object_unref (data->cancellable);
123   g_object_unref (data->simple);
124   g_free (data);
125 }
126
127 static gboolean
128 _g_socket_read_with_control_messages_ready (GSocket      *socket,
129                                             GIOCondition  condition,
130                                             gpointer      user_data)
131 {
132   ReadWithControlData *data = user_data;
133   GError *error;
134   gssize result;
135   GInputVector vector;
136
137   error = NULL;
138   vector.buffer = data->buffer;
139   vector.size = data->count;
140   result = g_socket_receive_message (data->socket,
141                                      NULL, /* address */
142                                      &vector,
143                                      1,
144                                      data->messages,
145                                      data->num_messages,
146                                      NULL,
147                                      data->cancellable,
148                                      &error);
149   if (result >= 0)
150     {
151       g_simple_async_result_set_op_res_gssize (data->simple, result);
152     }
153   else
154     {
155       g_assert (error != NULL);
156       g_simple_async_result_take_error (data->simple, error);
157     }
158
159   if (data->from_mainloop)
160     g_simple_async_result_complete (data->simple);
161   else
162     g_simple_async_result_complete_in_idle (data->simple);
163
164   return FALSE;
165 }
166
167 static void
168 _g_socket_read_with_control_messages (GSocket                 *socket,
169                                       void                    *buffer,
170                                       gsize                    count,
171                                       GSocketControlMessage ***messages,
172                                       gint                    *num_messages,
173                                       gint                     io_priority,
174                                       GCancellable            *cancellable,
175                                       GAsyncReadyCallback      callback,
176                                       gpointer                 user_data)
177 {
178   ReadWithControlData *data;
179
180   data = g_new0 (ReadWithControlData, 1);
181   data->socket = g_object_ref (socket);
182   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
183   data->buffer = buffer;
184   data->count = count;
185   data->messages = messages;
186   data->num_messages = num_messages;
187
188   data->simple = g_simple_async_result_new (G_OBJECT (socket),
189                                             callback,
190                                             user_data,
191                                             _g_socket_read_with_control_messages);
192   g_simple_async_result_set_check_cancellable (data->simple, cancellable);
193
194   if (!g_socket_condition_check (socket, G_IO_IN))
195     {
196       GSource *source;
197       data->from_mainloop = TRUE;
198       source = g_socket_create_source (data->socket,
199                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
200                                        cancellable);
201       g_source_set_callback (source,
202                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
203                              data,
204                              (GDestroyNotify) read_with_control_data_free);
205       g_source_attach (source, g_main_context_get_thread_default ());
206       g_source_unref (source);
207     }
208   else
209     {
210       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
211       read_with_control_data_free (data);
212     }
213 }
214
215 static gssize
216 _g_socket_read_with_control_messages_finish (GSocket       *socket,
217                                              GAsyncResult  *result,
218                                              GError       **error)
219 {
220   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
221
222   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
223   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
224
225   if (g_simple_async_result_propagate_error (simple, error))
226       return -1;
227   else
228     return g_simple_async_result_get_op_res_gssize (simple);
229 }
230
231 /* ---------------------------------------------------------------------------------------------------- */
232
233 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
234
235 static GPtrArray *ensured_classes = NULL;
236
237 static void
238 ensure_type (GType gtype)
239 {
240   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
241 }
242
243 static void
244 release_required_types (void)
245 {
246   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
247   g_ptr_array_unref (ensured_classes);
248   ensured_classes = NULL;
249 }
250
251 static void
252 ensure_required_types (void)
253 {
254   g_assert (ensured_classes == NULL);
255   ensured_classes = g_ptr_array_new ();
256   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
257   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
258 }
259 /* ---------------------------------------------------------------------------------------------------- */
260
261 typedef struct
262 {
263   volatile gint refcount;
264   GThread *thread;
265   GMainContext *context;
266   GMainLoop *loop;
267 } SharedThreadData;
268
269 static gpointer
270 gdbus_shared_thread_func (gpointer user_data)
271 {
272   SharedThreadData *data = user_data;
273
274   g_main_context_push_thread_default (data->context);
275   g_main_loop_run (data->loop);
276   g_main_context_pop_thread_default (data->context);
277
278   release_required_types ();
279
280   return NULL;
281 }
282
283 /* ---------------------------------------------------------------------------------------------------- */
284
285 static SharedThreadData *
286 _g_dbus_shared_thread_ref (void)
287 {
288   static gsize shared_thread_data = 0;
289   SharedThreadData *ret;
290
291   if (g_once_init_enter (&shared_thread_data))
292     {
293       SharedThreadData *data;
294
295       /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
296       ensure_required_types ();
297
298       data = g_new0 (SharedThreadData, 1);
299       data->refcount = 0;
300       
301       data->context = g_main_context_new ();
302       data->loop = g_main_loop_new (data->context, FALSE);
303       data->thread = g_thread_new ("gdbus",
304                                    gdbus_shared_thread_func,
305                                    data);
306       /* We can cast between gsize and gpointer safely */
307       g_once_init_leave (&shared_thread_data, (gsize) data);
308     }
309
310   ret = (SharedThreadData*) shared_thread_data;
311   g_atomic_int_inc (&ret->refcount);
312   return ret;
313 }
314
315 static void
316 _g_dbus_shared_thread_unref (SharedThreadData *data)
317 {
318   /* TODO: actually destroy the shared thread here */
319 #if 0
320   g_assert (data != NULL);
321   if (g_atomic_int_dec_and_test (&data->refcount))
322     {
323       g_main_loop_quit (data->loop);
324       //g_thread_join (data->thread);
325       g_main_loop_unref (data->loop);
326       g_main_context_unref (data->context);
327     }
328 #endif
329 }
330
331 /* ---------------------------------------------------------------------------------------------------- */
332
333 typedef enum {
334     PENDING_NONE = 0,
335     PENDING_WRITE,
336     PENDING_FLUSH,
337     PENDING_CLOSE
338 } OutputPending;
339
340 struct GDBusWorker
341 {
342   volatile gint                       ref_count;
343
344   SharedThreadData                   *shared_thread_data;
345
346   /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
347   volatile gint                       stopped;
348
349   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
350    * only affects messages received from the other peer (since GDBusServer is the
351    * only user) - we might want it to affect messages sent to the other peer too?
352    */
353   gboolean                            frozen;
354   GDBusCapabilityFlags                capabilities;
355   GQueue                             *received_messages_while_frozen;
356
357   GIOStream                          *stream;
358   GCancellable                       *cancellable;
359   GDBusWorkerMessageReceivedCallback  message_received_callback;
360   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
361   GDBusWorkerDisconnectedCallback     disconnected_callback;
362   gpointer                            user_data;
363
364   /* if GSocket and GKdbus are NULL, stream is GSocketConnection */
365   GSocket *socket;
366 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
367   GKdbus  *kdbus;
368 #endif
369
370   /* used for reading */
371   GMutex                              read_lock;
372   gchar                              *read_buffer;
373   gsize                               read_buffer_allocated_size;
374   gsize                               read_buffer_cur_size;
375   gsize                               read_buffer_bytes_wanted;
376   GUnixFDList                        *read_fd_list;
377   GSocketControlMessage             **read_ancillary_messages;
378   gint                                read_num_ancillary_messages;
379
380   /* Whether an async write, flush or close, or none of those, is pending.
381    * Only the worker thread may change its value, and only with the write_lock.
382    * Other threads may read its value when holding the write_lock.
383    * The worker thread may read its value at any time.
384    */
385   OutputPending                       output_pending;
386   /* used for writing */
387   GMutex                              write_lock;
388   /* queue of MessageToWriteData, protected by write_lock */
389   GQueue                             *write_queue;
390   /* protected by write_lock */
391   guint64                             write_num_messages_written;
392   /* number of messages we'd written out last time we flushed;
393    * protected by write_lock
394    */
395   guint64                             write_num_messages_flushed;
396   /* list of FlushData, protected by write_lock */
397   GList                              *write_pending_flushes;
398   /* list of CloseData, protected by write_lock */
399   GList                              *pending_close_attempts;
400   /* no lock - only used from the worker thread */
401   gboolean                            close_expected;
402 };
403
404 static void _g_dbus_worker_unref (GDBusWorker *worker);
405
406 /* ---------------------------------------------------------------------------------------------------- */
407
408 typedef struct
409 {
410   GMutex  mutex;
411   GCond   cond;
412   guint64 number_to_wait_for;
413   GError *error;
414 } FlushData;
415
416 struct _MessageToWriteData ;
417 typedef struct _MessageToWriteData MessageToWriteData;
418
419 static void message_to_write_data_free (MessageToWriteData *data);
420
421 static void read_message_print_transport_debug (gssize bytes_read,
422                                                 GDBusWorker *worker);
423
424 static void write_message_print_transport_debug (gssize bytes_written,
425                                                  MessageToWriteData *data);
426
427 typedef struct {
428     GDBusWorker *worker;
429     GCancellable *cancellable;
430     GSimpleAsyncResult *result;
431 } CloseData;
432
433 static void close_data_free (CloseData *close_data)
434 {
435   if (close_data->cancellable != NULL)
436     g_object_unref (close_data->cancellable);
437
438   if (close_data->result != NULL)
439     g_object_unref (close_data->result);
440
441   _g_dbus_worker_unref (close_data->worker);
442   g_slice_free (CloseData, close_data);
443 }
444
445 /* ---------------------------------------------------------------------------------------------------- */
446
447 static GDBusWorker *
448 _g_dbus_worker_ref (GDBusWorker *worker)
449 {
450   g_atomic_int_inc (&worker->ref_count);
451   return worker;
452 }
453
454 static void
455 _g_dbus_worker_unref (GDBusWorker *worker)
456 {
457   if (g_atomic_int_dec_and_test (&worker->ref_count))
458     {
459       g_assert (worker->write_pending_flushes == NULL);
460
461       _g_dbus_shared_thread_unref (worker->shared_thread_data);
462
463       g_object_unref (worker->stream);
464
465       g_mutex_clear (&worker->read_lock);
466       g_object_unref (worker->cancellable);
467       if (worker->read_fd_list != NULL)
468         g_object_unref (worker->read_fd_list);
469
470       g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
471       g_mutex_clear (&worker->write_lock);
472       g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
473       g_free (worker->read_buffer);
474
475       g_free (worker);
476     }
477 }
478
479 static void
480 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
481                                   gboolean      remote_peer_vanished,
482                                   GError       *error)
483 {
484   if (!g_atomic_int_get (&worker->stopped))
485     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
486 }
487
488 static void
489 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
490                                       GDBusMessage *message)
491 {
492   if (!g_atomic_int_get (&worker->stopped))
493     worker->message_received_callback (worker, message, worker->user_data);
494 }
495
496 static GDBusMessage *
497 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
498                                               GDBusMessage *message)
499 {
500   GDBusMessage *ret;
501   if (!g_atomic_int_get (&worker->stopped))
502     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
503   else
504     ret = message;
505   return ret;
506 }
507
508 /* can only be called from private thread with read-lock held - takes ownership of @message */
509 static void
510 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
511                                                   GDBusMessage *message)
512 {
513   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
514     {
515       /* queue up */
516       g_queue_push_tail (worker->received_messages_while_frozen, message);
517     }
518   else
519     {
520       /* not frozen, nor anything in queue */
521       _g_dbus_worker_emit_message_received (worker, message);
522       g_object_unref (message);
523     }
524 }
525
526 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
527 static gboolean
528 unfreeze_in_idle_cb (gpointer user_data)
529 {
530   GDBusWorker *worker = user_data;
531   GDBusMessage *message;
532
533   g_mutex_lock (&worker->read_lock);
534   if (worker->frozen)
535     {
536       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
537         {
538           _g_dbus_worker_emit_message_received (worker, message);
539           g_object_unref (message);
540         }
541       worker->frozen = FALSE;
542     }
543   else
544     {
545       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
546     }
547   g_mutex_unlock (&worker->read_lock);
548   return FALSE;
549 }
550
551 /* can be called from any thread */
552 void
553 _g_dbus_worker_unfreeze (GDBusWorker *worker)
554 {
555   GSource *idle_source;
556   idle_source = g_idle_source_new ();
557   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
558   g_source_set_callback (idle_source,
559                          unfreeze_in_idle_cb,
560                          _g_dbus_worker_ref (worker),
561                          (GDestroyNotify) _g_dbus_worker_unref);
562   g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb");
563   g_source_attach (idle_source, worker->shared_thread_data->context);
564   g_source_unref (idle_source);
565 }
566
567 /* ---------------------------------------------------------------------------------------------------- */
568
569 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
570
571 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
572 static void
573 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
574                            GAsyncResult  *res,
575                            gpointer       user_data)
576 {
577   GDBusWorker *worker = user_data;
578   GError *error;
579   gssize bytes_read;
580
581   g_mutex_lock (&worker->read_lock);
582
583   /* If already stopped, don't even process the reply */
584   if (g_atomic_int_get (&worker->stopped))
585     goto out;
586
587   error = NULL;
588   if (worker->socket == NULL)
589     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
590                                              res,
591                                              &error);
592   else
593     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
594                                                               res,
595                                                               &error);
596   if (worker->read_num_ancillary_messages > 0)
597     {
598       gint n;
599       for (n = 0; n < worker->read_num_ancillary_messages; n++)
600         {
601           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
602
603           if (FALSE)
604             {
605             }
606 #ifdef G_OS_UNIX
607           else if (G_IS_UNIX_FD_MESSAGE (control_message))
608             {
609               GUnixFDMessage *fd_message;
610               gint *fds;
611               gint num_fds;
612
613               fd_message = G_UNIX_FD_MESSAGE (control_message);
614               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
615               if (worker->read_fd_list == NULL)
616                 {
617                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
618                 }
619               else
620                 {
621                   gint n;
622                   for (n = 0; n < num_fds; n++)
623                     {
624                       /* TODO: really want a append_steal() */
625                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
626                       (void) g_close (fds[n], NULL);
627                     }
628                 }
629               g_free (fds);
630             }
631           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
632             {
633               /* do nothing */
634             }
635 #endif
636           else
637             {
638               if (error == NULL)
639                 {
640                   g_set_error (&error,
641                                G_IO_ERROR,
642                                G_IO_ERROR_FAILED,
643                                "Unexpected ancillary message of type %s received from peer",
644                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
645                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
646                   g_error_free (error);
647                   g_object_unref (control_message);
648                   n++;
649                   while (n < worker->read_num_ancillary_messages)
650                     g_object_unref (worker->read_ancillary_messages[n++]);
651                   g_free (worker->read_ancillary_messages);
652                   goto out;
653                 }
654             }
655           g_object_unref (control_message);
656         }
657       g_free (worker->read_ancillary_messages);
658     }
659
660   if (bytes_read == -1)
661     {
662       if (G_UNLIKELY (_g_dbus_debug_transport ()))
663         {
664           _g_dbus_debug_print_lock ();
665           g_print ("========================================================================\n"
666                    "GDBus-debug:Transport:\n"
667                    "  ---- READ ERROR on stream of type %s:\n"
668                    "  ---- %s %d: %s\n",
669                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
670                    g_quark_to_string (error->domain), error->code,
671                    error->message);
672           _g_dbus_debug_print_unlock ();
673         }
674
675       /* Every async read that uses this callback uses worker->cancellable
676        * as its GCancellable. worker->cancellable gets cancelled if and only
677        * if the GDBusConnection tells us to close (either via
678        * _g_dbus_worker_stop, which is called on last-unref, or directly),
679        * so a cancelled read must mean our connection was closed locally.
680        *
681        * If we're closing, other errors are possible - notably,
682        * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
683        * read in-flight. It seems sensible to treat all read errors during
684        * closing as an expected thing that doesn't trip exit-on-close.
685        *
686        * Because close_expected can't be set until we get into the worker
687        * thread, but the cancellable is signalled sooner (from another
688        * thread), we do still need to check the error.
689        */
690       if (worker->close_expected ||
691           g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
692         _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
693       else
694         _g_dbus_worker_emit_disconnected (worker, TRUE, error);
695
696       g_error_free (error);
697       goto out;
698     }
699
700 #if 0
701   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
702            (gint) bytes_read,
703            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
704            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
705            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
706                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
707            worker->stream,
708            worker);
709 #endif
710
711   /* TODO: hmm, hmm... */
712   if (bytes_read == 0)
713     {
714       g_set_error (&error,
715                    G_IO_ERROR,
716                    G_IO_ERROR_FAILED,
717                    "Underlying GIOStream returned 0 bytes on an async read");
718       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
719       g_error_free (error);
720       goto out;
721     }
722
723   read_message_print_transport_debug (bytes_read, worker);
724
725   worker->read_buffer_cur_size += bytes_read;
726   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
727     {
728       /* OK, got what we asked for! */
729       if (worker->read_buffer_bytes_wanted == 16)
730         {
731           gssize message_len;
732           /* OK, got the header - determine how many more bytes are needed */
733           error = NULL;
734           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
735                                                      16,
736                                                      &error);
737           if (message_len == -1)
738             {
739               g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
740               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
741               g_error_free (error);
742               goto out;
743             }
744
745           worker->read_buffer_bytes_wanted = message_len;
746           _g_dbus_worker_do_read_unlocked (worker);
747         }
748       else
749         {
750           GDBusMessage *message;
751           error = NULL;
752
753           /* TODO: use connection->priv->auth to decode the message */
754
755           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
756                                                   worker->read_buffer_cur_size,
757                                                   worker->capabilities,
758                                                   &error);
759           if (message == NULL)
760             {
761               gchar *s;
762               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
763               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
764                          "The error is: %s\n"
765                          "The payload is as follows:\n"
766                          "%s\n",
767                          worker->read_buffer_cur_size,
768                          error->message,
769                          s);
770               g_free (s);
771               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
772               g_error_free (error);
773               goto out;
774             }
775
776 #ifdef G_OS_UNIX
777           if (worker->read_fd_list != NULL)
778             {
779               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
780               g_object_unref (worker->read_fd_list);
781               worker->read_fd_list = NULL;
782             }
783 #endif
784
785           if (G_UNLIKELY (_g_dbus_debug_message ()))
786             {
787               gchar *s;
788               _g_dbus_debug_print_lock ();
789               g_print ("========================================================================\n"
790                        "GDBus-debug:Message:\n"
791                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
792                        worker->read_buffer_cur_size);
793               s = g_dbus_message_print (message, 2);
794               g_print ("%s", s);
795               g_free (s);
796               if (G_UNLIKELY (_g_dbus_debug_payload ()))
797                 {
798                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
799                   g_print ("%s\n", s);
800                   g_free (s);
801                 }
802               _g_dbus_debug_print_unlock ();
803             }
804
805           /* yay, got a message, go deliver it */
806           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
807
808           /* start reading another message! */
809           worker->read_buffer_bytes_wanted = 0;
810           worker->read_buffer_cur_size = 0;
811           _g_dbus_worker_do_read_unlocked (worker);
812         }
813     }
814   else
815     {
816       /* didn't get all the bytes we requested - so repeat the request... */
817       _g_dbus_worker_do_read_unlocked (worker);
818     }
819
820  out:
821   g_mutex_unlock (&worker->read_lock);
822
823   /* gives up the reference acquired when calling g_input_stream_read_async() */
824   _g_dbus_worker_unref (worker);
825 }
826
827 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
828 static void
829 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
830 {
831   /* Note that we do need to keep trying to read even if close_expected is
832    * true, because only failing a read causes us to signal 'closed'.
833    */
834
835   /* if bytes_wanted is zero, it means start reading a message */
836   if (worker->read_buffer_bytes_wanted == 0)
837     {
838       worker->read_buffer_cur_size = 0;
839       worker->read_buffer_bytes_wanted = 16;
840     }
841
842   /* ensure we have a (big enough) buffer */
843   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
844     {
845       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
846       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
847       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
848     }
849
850   if (worker->socket == NULL)
851     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
852                                worker->read_buffer + worker->read_buffer_cur_size,
853                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
854                                G_PRIORITY_DEFAULT,
855                                worker->cancellable,
856                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
857                                _g_dbus_worker_ref (worker));
858   else
859     {
860       worker->read_ancillary_messages = NULL;
861       worker->read_num_ancillary_messages = 0;
862       _g_socket_read_with_control_messages (worker->socket,
863                                             worker->read_buffer + worker->read_buffer_cur_size,
864                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
865                                             &worker->read_ancillary_messages,
866                                             &worker->read_num_ancillary_messages,
867                                             G_PRIORITY_DEFAULT,
868                                             worker->cancellable,
869                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
870                                             _g_dbus_worker_ref (worker));
871     }
872 }
873
874 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
875 static gboolean
876 _g_dbus_worker_do_initial_read (gpointer data)
877 {
878   GDBusWorker *worker = data;
879   g_mutex_lock (&worker->read_lock);
880   _g_dbus_worker_do_read_unlocked (worker);
881   g_mutex_unlock (&worker->read_lock);
882   return FALSE;
883 }
884
885 /* ---------------------------------------------------------------------------------------------------- */
886
887 struct _MessageToWriteData
888 {
889   GDBusWorker  *worker;
890   GDBusMessage *message;
891   gchar        *blob;
892   gsize         blob_size;
893
894   gsize               total_written;
895   GSimpleAsyncResult *simple;
896
897 };
898
899 static void
900 message_to_write_data_free (MessageToWriteData *data)
901 {
902   _g_dbus_worker_unref (data->worker);
903   if (data->message)
904     g_object_unref (data->message);
905   g_free (data->blob);
906   g_free (data);
907 }
908
909 /* ---------------------------------------------------------------------------------------------------- */
910
911 static void write_message_continue_writing (MessageToWriteData *data);
912
913 /* called in private thread shared by all GDBusConnection instances
914  *
915  * write-lock is not held on entry
916  * output_pending is PENDING_WRITE on entry
917  */
918 static void
919 write_message_async_cb (GObject      *source_object,
920                         GAsyncResult *res,
921                         gpointer      user_data)
922 {
923   MessageToWriteData *data = user_data;
924   GSimpleAsyncResult *simple;
925   gssize bytes_written;
926   GError *error;
927
928   /* Note: we can't access data->simple after calling g_async_result_complete () because the
929    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
930    */
931   simple = data->simple;
932
933   error = NULL;
934   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
935                                                 res,
936                                                 &error);
937   if (bytes_written == -1)
938     {
939       g_simple_async_result_take_error (simple, error);
940       g_simple_async_result_complete (simple);
941       g_object_unref (simple);
942       goto out;
943     }
944   g_assert (bytes_written > 0); /* zero is never returned */
945
946   write_message_print_transport_debug (bytes_written, data);
947
948   data->total_written += bytes_written;
949   g_assert (data->total_written <= data->blob_size);
950   if (data->total_written == data->blob_size)
951     {
952       g_simple_async_result_complete (simple);
953       g_object_unref (simple);
954       goto out;
955     }
956
957   write_message_continue_writing (data);
958
959  out:
960   ;
961 }
962
963 /* called in private thread shared by all GDBusConnection instances
964  *
965  * write-lock is not held on entry
966  * output_pending is PENDING_WRITE on entry
967  */
968 #ifdef G_OS_UNIX
969 static gboolean
970 on_socket_ready (GSocket      *socket,
971                  GIOCondition  condition,
972                  gpointer      user_data)
973 {
974   MessageToWriteData *data = user_data;
975   write_message_continue_writing (data);
976   return FALSE; /* remove source */
977 }
978 #endif
979
980 /* called in private thread shared by all GDBusConnection instances
981  *
982  * write-lock is not held on entry
983  * output_pending is PENDING_WRITE on entry
984  */
985 static void
986 write_message_continue_writing (MessageToWriteData *data)
987 {
988   GOutputStream *ostream;
989 #ifdef G_OS_UNIX
990   GSimpleAsyncResult *simple;
991   GUnixFDList *fd_list;
992 #endif
993
994 #ifdef G_OS_UNIX
995   /* Note: we can't access data->simple after calling g_async_result_complete () because the
996    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
997    */
998   simple = data->simple;
999 #endif
1000
1001   ostream = g_io_stream_get_output_stream (data->worker->stream);
1002 #ifdef G_OS_UNIX
1003   fd_list = g_dbus_message_get_unix_fd_list (data->message);
1004 #endif
1005
1006   g_assert (!g_output_stream_has_pending (ostream));
1007   g_assert_cmpint (data->total_written, <, data->blob_size);
1008
1009   if (FALSE)
1010     {
1011     }
1012 #ifdef G_OS_UNIX
1013   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
1014     {
1015       GOutputVector vector;
1016       GSocketControlMessage *control_message;
1017       gssize bytes_written;
1018       GError *error;
1019
1020       vector.buffer = data->blob;
1021       vector.size = data->blob_size;
1022
1023       control_message = NULL;
1024       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
1025         {
1026           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
1027             {
1028               g_simple_async_result_set_error (simple,
1029                                                G_IO_ERROR,
1030                                                G_IO_ERROR_FAILED,
1031                                                "Tried sending a file descriptor but remote peer does not support this capability");
1032               g_simple_async_result_complete (simple);
1033               g_object_unref (simple);
1034               goto out;
1035             }
1036           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
1037         }
1038
1039       error = NULL;
1040       bytes_written = g_socket_send_message (data->worker->socket,
1041                                              NULL, /* address */
1042                                              &vector,
1043                                              1,
1044                                              control_message != NULL ? &control_message : NULL,
1045                                              control_message != NULL ? 1 : 0,
1046                                              G_SOCKET_MSG_NONE,
1047                                              data->worker->cancellable,
1048                                              &error);
1049       if (control_message != NULL)
1050         g_object_unref (control_message);
1051
1052       if (bytes_written == -1)
1053         {
1054           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1055           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1056             {
1057               GSource *source;
1058               source = g_socket_create_source (data->worker->socket,
1059                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
1060                                                data->worker->cancellable);
1061               g_source_set_callback (source,
1062                                      (GSourceFunc) on_socket_ready,
1063                                      data,
1064                                      NULL); /* GDestroyNotify */
1065               g_source_attach (source, g_main_context_get_thread_default ());
1066               g_source_unref (source);
1067               g_error_free (error);
1068               goto out;
1069             }
1070           g_simple_async_result_take_error (simple, error);
1071           g_simple_async_result_complete (simple);
1072           g_object_unref (simple);
1073           goto out;
1074         }
1075       g_assert (bytes_written > 0); /* zero is never returned */
1076
1077       write_message_print_transport_debug (bytes_written, data);
1078
1079       data->total_written += bytes_written;
1080       g_assert (data->total_written <= data->blob_size);
1081       if (data->total_written == data->blob_size)
1082         {
1083           g_simple_async_result_complete (simple);
1084           g_object_unref (simple);
1085           goto out;
1086         }
1087
1088       write_message_continue_writing (data);
1089     }
1090 #endif
1091   else
1092     {
1093 #ifdef G_OS_UNIX
1094       if (fd_list != NULL)
1095         {
1096           g_simple_async_result_set_error (simple,
1097                                            G_IO_ERROR,
1098                                            G_IO_ERROR_FAILED,
1099                                            "Tried sending a file descriptor on unsupported stream of type %s",
1100                                            g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1101           g_simple_async_result_complete (simple);
1102           g_object_unref (simple);
1103           goto out;
1104         }
1105 #endif
1106
1107       g_output_stream_write_async (ostream,
1108                                    (const gchar *) data->blob + data->total_written,
1109                                    data->blob_size - data->total_written,
1110                                    G_PRIORITY_DEFAULT,
1111                                    data->worker->cancellable,
1112                                    write_message_async_cb,
1113                                    data);
1114     }
1115 #ifdef G_OS_UNIX
1116  out:
1117 #endif
1118   ;
1119 }
1120
1121 /* called in private thread shared by all GDBusConnection instances
1122  *
1123  * write-lock is not held on entry
1124  * output_pending is PENDING_WRITE on entry
1125  */
1126 static void
1127 write_message_async (GDBusWorker         *worker,
1128                      MessageToWriteData  *data,
1129                      GAsyncReadyCallback  callback,
1130                      gpointer             user_data)
1131 {
1132   data->simple = g_simple_async_result_new (NULL,
1133                                             callback,
1134                                             user_data,
1135                                             write_message_async);
1136   data->total_written = 0;
1137   write_message_continue_writing (data);
1138 }
1139
1140 /* called in private thread shared by all GDBusConnection instances (with write-lock held) */
1141 static gboolean
1142 write_message_finish (GAsyncResult   *res,
1143                       GError        **error)
1144 {
1145   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1146   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1147     return FALSE;
1148   else
1149     return TRUE;
1150 }
1151 /* ---------------------------------------------------------------------------------------------------- */
1152
1153 static void continue_writing (GDBusWorker *worker);
1154
1155 typedef struct
1156 {
1157   GDBusWorker *worker;
1158   GList *flushers;
1159 } FlushAsyncData;
1160
1161 static void
1162 flush_data_list_complete (const GList  *flushers,
1163                           const GError *error)
1164 {
1165   const GList *l;
1166
1167   for (l = flushers; l != NULL; l = l->next)
1168     {
1169       FlushData *f = l->data;
1170
1171       f->error = error != NULL ? g_error_copy (error) : NULL;
1172
1173       g_mutex_lock (&f->mutex);
1174       g_cond_signal (&f->cond);
1175       g_mutex_unlock (&f->mutex);
1176     }
1177 }
1178
1179 /* called in private thread shared by all GDBusConnection instances
1180  *
1181  * write-lock is not held on entry
1182  * output_pending is PENDING_FLUSH on entry
1183  */
1184 static void
1185 ostream_flush_cb (GObject      *source_object,
1186                   GAsyncResult *res,
1187                   gpointer      user_data)
1188 {
1189   FlushAsyncData *data = user_data;
1190   GError *error;
1191
1192   error = NULL;
1193   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1194                                 res,
1195                                 &error);
1196
1197   if (error == NULL)
1198     {
1199       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1200         {
1201           _g_dbus_debug_print_lock ();
1202           g_print ("========================================================================\n"
1203                    "GDBus-debug:Transport:\n"
1204                    "  ---- FLUSHED stream of type %s\n",
1205                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1206           _g_dbus_debug_print_unlock ();
1207         }
1208     }
1209
1210   g_assert (data->flushers != NULL);
1211   flush_data_list_complete (data->flushers, error);
1212   g_list_free (data->flushers);
1213
1214   if (error != NULL)
1215     g_error_free (error);
1216
1217   /* Make sure we tell folks that we don't have additional
1218      flushes pending */
1219   g_mutex_lock (&data->worker->write_lock);
1220   data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
1221   g_assert (data->worker->output_pending == PENDING_FLUSH);
1222   data->worker->output_pending = PENDING_NONE;
1223   g_mutex_unlock (&data->worker->write_lock);
1224
1225   /* OK, cool, finally kick off the next write */
1226   continue_writing (data->worker);
1227
1228   _g_dbus_worker_unref (data->worker);
1229   g_free (data);
1230 }
1231
1232 /* called in private thread shared by all GDBusConnection instances
1233  *
1234  * write-lock is not held on entry
1235  * output_pending is PENDING_FLUSH on entry
1236  */
1237 static void
1238 start_flush (FlushAsyncData *data)
1239 {
1240   g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
1241                                G_PRIORITY_DEFAULT,
1242                                data->worker->cancellable,
1243                                ostream_flush_cb,
1244                                data);
1245 }
1246
1247 /* called in private thread shared by all GDBusConnection instances
1248  *
1249  * write-lock is held on entry
1250  * output_pending is PENDING_NONE on entry
1251  */
1252 static void
1253 message_written_unlocked (GDBusWorker *worker,
1254                           MessageToWriteData *message_data)
1255 {
1256   if (G_UNLIKELY (_g_dbus_debug_message ()))
1257     {
1258       gchar *s;
1259       _g_dbus_debug_print_lock ();
1260       g_print ("========================================================================\n"
1261                "GDBus-debug:Message:\n"
1262                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1263                message_data->blob_size);
1264       s = g_dbus_message_print (message_data->message, 2);
1265       g_print ("%s", s);
1266       g_free (s);
1267       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1268         {
1269           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1270           g_print ("%s\n", s);
1271           g_free (s);
1272         }
1273       _g_dbus_debug_print_unlock ();
1274     }
1275
1276   worker->write_num_messages_written += 1;
1277 }
1278
1279 /* called in private thread shared by all GDBusConnection instances
1280  *
1281  * write-lock is held on entry
1282  * output_pending is PENDING_NONE on entry
1283  *
1284  * Returns: non-%NULL, setting @output_pending, if we need to flush now
1285  */
1286 static FlushAsyncData *
1287 prepare_flush_unlocked (GDBusWorker *worker)
1288 {
1289   GList *l;
1290   GList *ll;
1291   GList *flushers;
1292
1293   flushers = NULL;
1294   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1295     {
1296       FlushData *f = l->data;
1297       ll = l->next;
1298
1299       if (f->number_to_wait_for == worker->write_num_messages_written)
1300         {
1301           flushers = g_list_append (flushers, f);
1302           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1303         }
1304     }
1305   if (flushers != NULL)
1306     {
1307       g_assert (worker->output_pending == PENDING_NONE);
1308       worker->output_pending = PENDING_FLUSH;
1309     }
1310
1311   if (flushers != NULL)
1312     {
1313       FlushAsyncData *data;
1314
1315       data = g_new0 (FlushAsyncData, 1);
1316       data->worker = _g_dbus_worker_ref (worker);
1317       data->flushers = flushers;
1318       return data;
1319     }
1320
1321   return NULL;
1322 }
1323
1324 /* called in private thread shared by all GDBusConnection instances
1325  *
1326  * write-lock is not held on entry
1327  * output_pending is PENDING_WRITE on entry
1328  */
1329 static void
1330 write_message_cb (GObject       *source_object,
1331                   GAsyncResult  *res,
1332                   gpointer       user_data)
1333 {
1334   MessageToWriteData *data = user_data;
1335   GError *error;
1336
1337   g_mutex_lock (&data->worker->write_lock);
1338   g_assert (data->worker->output_pending == PENDING_WRITE);
1339   data->worker->output_pending = PENDING_NONE;
1340
1341   error = NULL;
1342   if (!write_message_finish (res, &error))
1343     {
1344       g_mutex_unlock (&data->worker->write_lock);
1345
1346       /* TODO: handle */
1347       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1348       g_error_free (error);
1349
1350       g_mutex_lock (&data->worker->write_lock);
1351     }
1352
1353   message_written_unlocked (data->worker, data);
1354
1355   g_mutex_unlock (&data->worker->write_lock);
1356
1357   continue_writing (data->worker);
1358
1359   message_to_write_data_free (data);
1360 }
1361
1362 /* called in private thread shared by all GDBusConnection instances
1363  *
1364  * write-lock is not held on entry
1365  * output_pending is PENDING_CLOSE on entry
1366  */
1367 static void
1368 iostream_close_cb (GObject      *source_object,
1369                    GAsyncResult *res,
1370                    gpointer      user_data)
1371 {
1372   GDBusWorker *worker = user_data;
1373   GError *error = NULL;
1374   GList *pending_close_attempts, *pending_flush_attempts;
1375   GQueue *send_queue;
1376
1377   g_io_stream_close_finish (worker->stream, res, &error);
1378
1379   g_mutex_lock (&worker->write_lock);
1380
1381   pending_close_attempts = worker->pending_close_attempts;
1382   worker->pending_close_attempts = NULL;
1383
1384   pending_flush_attempts = worker->write_pending_flushes;
1385   worker->write_pending_flushes = NULL;
1386
1387   send_queue = worker->write_queue;
1388   worker->write_queue = g_queue_new ();
1389
1390   g_assert (worker->output_pending == PENDING_CLOSE);
1391   worker->output_pending = PENDING_NONE;
1392
1393   g_mutex_unlock (&worker->write_lock);
1394
1395   while (pending_close_attempts != NULL)
1396     {
1397       CloseData *close_data = pending_close_attempts->data;
1398
1399       pending_close_attempts = g_list_delete_link (pending_close_attempts,
1400                                                    pending_close_attempts);
1401
1402       if (close_data->result != NULL)
1403         {
1404           if (error != NULL)
1405             g_simple_async_result_set_from_error (close_data->result, error);
1406
1407           /* this must be in an idle because the result is likely to be
1408            * intended for another thread
1409            */
1410           g_simple_async_result_complete_in_idle (close_data->result);
1411         }
1412
1413       close_data_free (close_data);
1414     }
1415
1416   g_clear_error (&error);
1417
1418   /* all messages queued for sending are discarded */
1419   g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
1420   /* all queued flushes fail */
1421   error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
1422                        _("Operation was cancelled"));
1423   flush_data_list_complete (pending_flush_attempts, error);
1424   g_list_free (pending_flush_attempts);
1425   g_clear_error (&error);
1426
1427   _g_dbus_worker_unref (worker);
1428 }
1429
1430 /* called in private thread shared by all GDBusConnection instances
1431  *
1432  * write-lock is not held on entry
1433  * output_pending must be PENDING_NONE on entry
1434  */
1435 static void
1436 continue_writing (GDBusWorker *worker)
1437 {
1438   MessageToWriteData *data;
1439   FlushAsyncData *flush_async_data;
1440
1441  write_next:
1442   /* we mustn't try to write two things at once */
1443   g_assert (worker->output_pending == PENDING_NONE);
1444
1445   g_mutex_lock (&worker->write_lock);
1446
1447   data = NULL;
1448   flush_async_data = NULL;
1449
1450   /* if we want to close the connection, that takes precedence */
1451   if (worker->pending_close_attempts != NULL)
1452     {
1453       worker->close_expected = TRUE;
1454       worker->output_pending = PENDING_CLOSE;
1455
1456       g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
1457                                NULL, iostream_close_cb,
1458                                _g_dbus_worker_ref (worker));
1459     }
1460   else
1461     {
1462       flush_async_data = prepare_flush_unlocked (worker);
1463
1464       if (flush_async_data == NULL)
1465         {
1466           data = g_queue_pop_head (worker->write_queue);
1467
1468           if (data != NULL)
1469             worker->output_pending = PENDING_WRITE;
1470         }
1471     }
1472
1473   g_mutex_unlock (&worker->write_lock);
1474
1475   /* Note that write_lock is only used for protecting the @write_queue
1476    * and @output_pending fields of the GDBusWorker struct ... which we
1477    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1478    *
1479    * Therefore, it's fine to drop it here when calling back into user
1480    * code and then writing the message out onto the GIOStream since this
1481    * function only runs on the worker thread.
1482    */
1483
1484   if (flush_async_data != NULL)
1485     {
1486       start_flush (flush_async_data);
1487       g_assert (data == NULL);
1488     }
1489   else if (data != NULL)
1490     {
1491       GDBusMessage *old_message;
1492       guchar *new_blob;
1493       gsize new_blob_size;
1494       GError *error;
1495
1496       old_message = data->message;
1497       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1498       if (data->message == old_message)
1499         {
1500           /* filters had no effect - do nothing */
1501         }
1502       else if (data->message == NULL)
1503         {
1504           /* filters dropped message */
1505           g_mutex_lock (&worker->write_lock);
1506           worker->output_pending = PENDING_NONE;
1507           g_mutex_unlock (&worker->write_lock);
1508           message_to_write_data_free (data);
1509           goto write_next;
1510         }
1511       else
1512         {
1513           /* filters altered the message -> reencode */
1514           error = NULL;
1515           new_blob = g_dbus_message_to_blob (data->message,
1516                                              &new_blob_size,
1517                                              worker->capabilities,
1518                                              &error);
1519           if (new_blob == NULL)
1520             {
1521               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1522                * the old message instead
1523                */
1524               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1525                          g_dbus_message_get_serial (data->message),
1526                          error->message);
1527               g_error_free (error);
1528             }
1529           else
1530             {
1531               g_free (data->blob);
1532               data->blob = (gchar *) new_blob;
1533               data->blob_size = new_blob_size;
1534             }
1535         }
1536
1537       write_message_async (worker,
1538                            data,
1539                            write_message_cb,
1540                            data);
1541     }
1542 }
1543
1544 /* called in private thread shared by all GDBusConnection instances
1545  *
1546  * write-lock is not held on entry
1547  * output_pending may be anything
1548  */
1549 static gboolean
1550 continue_writing_in_idle_cb (gpointer user_data)
1551 {
1552   GDBusWorker *worker = user_data;
1553
1554   /* Because this is the worker thread, we can read this struct member
1555    * without holding the lock: no other thread ever modifies it.
1556    */
1557   if (worker->output_pending == PENDING_NONE)
1558     continue_writing (worker);
1559
1560   return FALSE;
1561 }
1562
1563 /**
1564  * @write_data: (transfer full) (allow-none):
1565  * @flush_data: (transfer full) (allow-none):
1566  * @close_data: (transfer full) (allow-none):
1567  *
1568  * Can be called from any thread
1569  *
1570  * write_lock is held on entry
1571  * output_pending may be anything
1572  */
1573 static void
1574 schedule_writing_unlocked (GDBusWorker        *worker,
1575                            MessageToWriteData *write_data,
1576                            FlushData          *flush_data,
1577                            CloseData          *close_data)
1578 {
1579   if (write_data != NULL)
1580     g_queue_push_tail (worker->write_queue, write_data);
1581
1582   if (flush_data != NULL)
1583     worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
1584
1585   if (close_data != NULL)
1586     worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
1587                                                      close_data);
1588
1589   /* If we had output pending, the next bit of output will happen
1590    * automatically when it finishes, so we only need to do this
1591    * if nothing was pending.
1592    *
1593    * The idle callback will re-check that output_pending is still
1594    * PENDING_NONE, to guard against output starting before the idle.
1595    */
1596   if (worker->output_pending == PENDING_NONE)
1597     {
1598       GSource *idle_source;
1599       idle_source = g_idle_source_new ();
1600       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1601       g_source_set_callback (idle_source,
1602                              continue_writing_in_idle_cb,
1603                              _g_dbus_worker_ref (worker),
1604                              (GDestroyNotify) _g_dbus_worker_unref);
1605       g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb");
1606       g_source_attach (idle_source, worker->shared_thread_data->context);
1607       g_source_unref (idle_source);
1608     }
1609 }
1610
1611 /* ---------------------------------------------------------------------------------------------------- */
1612
1613 /* can be called from any thread - steals blob
1614  *
1615  * write_lock is not held on entry
1616  * output_pending may be anything
1617  */
1618 void
1619 _g_dbus_worker_send_message (GDBusWorker    *worker,
1620                              GDBusMessage   *message,
1621                              gchar          *blob,
1622                              gsize           blob_len)
1623 {
1624   MessageToWriteData *data;
1625
1626   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1627   g_return_if_fail (blob != NULL);
1628   g_return_if_fail (blob_len > 16);
1629
1630   data = g_new0 (MessageToWriteData, 1);
1631   data->worker = _g_dbus_worker_ref (worker);
1632   data->message = g_object_ref (message);
1633   data->blob = blob; /* steal! */
1634   data->blob_size = blob_len;
1635
1636   g_mutex_lock (&worker->write_lock);
1637   schedule_writing_unlocked (worker, data, NULL, NULL);
1638   g_mutex_unlock (&worker->write_lock);
1639 }
1640
1641 /* ---------------------------------------------------------------------------------------------------- */
1642
1643 GDBusWorker *
1644 _g_dbus_worker_new (GIOStream                              *stream,
1645                     GDBusCapabilityFlags                    capabilities,
1646                     gboolean                                initially_frozen,
1647                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1648                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1649                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1650                     gpointer                                user_data)
1651 {
1652   GDBusWorker *worker;
1653   GSource *idle_source;
1654
1655   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1656   g_return_val_if_fail (message_received_callback != NULL, NULL);
1657   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1658   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1659
1660   worker = g_new0 (GDBusWorker, 1);
1661   worker->ref_count = 1;
1662
1663   g_mutex_init (&worker->read_lock);
1664   worker->message_received_callback = message_received_callback;
1665   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1666   worker->disconnected_callback = disconnected_callback;
1667   worker->user_data = user_data;
1668   worker->stream = g_object_ref (stream);
1669   worker->capabilities = capabilities;
1670   worker->cancellable = g_cancellable_new ();
1671   worker->output_pending = PENDING_NONE;
1672
1673   worker->frozen = initially_frozen;
1674   worker->received_messages_while_frozen = g_queue_new ();
1675
1676   g_mutex_init (&worker->write_lock);
1677   worker->write_queue = g_queue_new ();
1678
1679   if (G_IS_SOCKET_CONNECTION (worker->stream))
1680     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1681
1682 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
1683   if (G_IS_KDBUS_CONNECTION (worker->stream))
1684     worker->kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream));
1685 #endif
1686
1687   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1688
1689   /* begin reading */
1690   idle_source = g_idle_source_new ();
1691   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1692   g_source_set_callback (idle_source,
1693                          _g_dbus_worker_do_initial_read,
1694                          _g_dbus_worker_ref (worker),
1695                          (GDestroyNotify) _g_dbus_worker_unref);
1696   g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read");
1697   g_source_attach (idle_source, worker->shared_thread_data->context);
1698   g_source_unref (idle_source);
1699
1700   return worker;
1701 }
1702
1703 /* ---------------------------------------------------------------------------------------------------- */
1704
1705 /* can be called from any thread
1706  *
1707  * write_lock is not held on entry
1708  * output_pending may be anything
1709  */
1710 void
1711 _g_dbus_worker_close (GDBusWorker         *worker,
1712                       GCancellable        *cancellable,
1713                       GSimpleAsyncResult  *result)
1714 {
1715   CloseData *close_data;
1716
1717   close_data = g_slice_new0 (CloseData);
1718   close_data->worker = _g_dbus_worker_ref (worker);
1719   close_data->cancellable =
1720       (cancellable == NULL ? NULL : g_object_ref (cancellable));
1721   close_data->result = (result == NULL ? NULL : g_object_ref (result));
1722
1723   /* Don't set worker->close_expected here - we're in the wrong thread.
1724    * It'll be set before the actual close happens.
1725    */
1726   g_cancellable_cancel (worker->cancellable);
1727   g_mutex_lock (&worker->write_lock);
1728   schedule_writing_unlocked (worker, NULL, NULL, close_data);
1729   g_mutex_unlock (&worker->write_lock);
1730 }
1731
1732 /* This can be called from any thread - frees worker. Note that
1733  * callbacks might still happen if called from another thread than the
1734  * worker - use your own synchronization primitive in the callbacks.
1735  *
1736  * write_lock is not held on entry
1737  * output_pending may be anything
1738  */
1739 void
1740 _g_dbus_worker_stop (GDBusWorker *worker)
1741 {
1742   g_atomic_int_set (&worker->stopped, TRUE);
1743
1744   /* Cancel any pending operations and schedule a close of the underlying I/O
1745    * stream in the worker thread
1746    */
1747   _g_dbus_worker_close (worker, NULL, NULL);
1748
1749   /* _g_dbus_worker_close holds a ref until after an idle in the worker
1750    * thread has run, so we no longer need to unref in an idle like in
1751    * commit 322e25b535
1752    */
1753   _g_dbus_worker_unref (worker);
1754 }
1755
1756 /* ---------------------------------------------------------------------------------------------------- */
1757
1758 /* can be called from any thread (except the worker thread) - blocks
1759  * calling thread until all queued outgoing messages are written and
1760  * the transport has been flushed
1761  *
1762  * write_lock is not held on entry
1763  * output_pending may be anything
1764  */
1765 gboolean
1766 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1767                            GCancellable   *cancellable,
1768                            GError        **error)
1769 {
1770   gboolean ret;
1771   FlushData *data;
1772   guint64 pending_writes;
1773
1774   data = NULL;
1775   ret = TRUE;
1776
1777   g_mutex_lock (&worker->write_lock);
1778
1779   /* if the queue is empty, no write is in-flight and we haven't written
1780    * anything since the last flush, then there's nothing to wait for
1781    */
1782   pending_writes = g_queue_get_length (worker->write_queue);
1783
1784   /* if a write is in-flight, we shouldn't be satisfied until the first
1785    * flush operation that follows it
1786    */
1787   if (worker->output_pending == PENDING_WRITE)
1788     pending_writes += 1;
1789
1790   if (pending_writes > 0 ||
1791       worker->write_num_messages_written != worker->write_num_messages_flushed)
1792     {
1793       data = g_new0 (FlushData, 1);
1794       g_mutex_init (&data->mutex);
1795       g_cond_init (&data->cond);
1796       data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
1797       g_mutex_lock (&data->mutex);
1798
1799       schedule_writing_unlocked (worker, NULL, data, NULL);
1800     }
1801   g_mutex_unlock (&worker->write_lock);
1802
1803   if (data != NULL)
1804     {
1805       g_cond_wait (&data->cond, &data->mutex);
1806       g_mutex_unlock (&data->mutex);
1807
1808       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1809       g_cond_clear (&data->cond);
1810       g_mutex_clear (&data->mutex);
1811       if (data->error != NULL)
1812         {
1813           ret = FALSE;
1814           g_propagate_error (error, data->error);
1815         }
1816       g_free (data);
1817     }
1818
1819   return ret;
1820 }
1821
1822 /* ---------------------------------------------------------------------------------------------------- */
1823
1824 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1825 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1826 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1827 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1828 #define G_DBUS_DEBUG_CALL           (1<<4)
1829 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1830 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1831 #define G_DBUS_DEBUG_RETURN         (1<<7)
1832 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1833 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1834
1835 static gint _gdbus_debug_flags = 0;
1836
1837 gboolean
1838 _g_dbus_debug_authentication (void)
1839 {
1840   _g_dbus_initialize ();
1841   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1842 }
1843
1844 gboolean
1845 _g_dbus_debug_transport (void)
1846 {
1847   _g_dbus_initialize ();
1848   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1849 }
1850
1851 gboolean
1852 _g_dbus_debug_message (void)
1853 {
1854   _g_dbus_initialize ();
1855   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1856 }
1857
1858 gboolean
1859 _g_dbus_debug_payload (void)
1860 {
1861   _g_dbus_initialize ();
1862   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1863 }
1864
1865 gboolean
1866 _g_dbus_debug_call (void)
1867 {
1868   _g_dbus_initialize ();
1869   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1870 }
1871
1872 gboolean
1873 _g_dbus_debug_signal (void)
1874 {
1875   _g_dbus_initialize ();
1876   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1877 }
1878
1879 gboolean
1880 _g_dbus_debug_incoming (void)
1881 {
1882   _g_dbus_initialize ();
1883   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1884 }
1885
1886 gboolean
1887 _g_dbus_debug_return (void)
1888 {
1889   _g_dbus_initialize ();
1890   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1891 }
1892
1893 gboolean
1894 _g_dbus_debug_emission (void)
1895 {
1896   _g_dbus_initialize ();
1897   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1898 }
1899
1900 gboolean
1901 _g_dbus_debug_address (void)
1902 {
1903   _g_dbus_initialize ();
1904   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1905 }
1906
1907 G_LOCK_DEFINE_STATIC (print_lock);
1908
1909 void
1910 _g_dbus_debug_print_lock (void)
1911 {
1912   G_LOCK (print_lock);
1913 }
1914
1915 void
1916 _g_dbus_debug_print_unlock (void)
1917 {
1918   G_UNLOCK (print_lock);
1919 }
1920
1921 /**
1922  * _g_dbus_initialize:
1923  *
1924  * Does various one-time init things such as
1925  *
1926  *  - registering the G_DBUS_ERROR error domain
1927  *  - parses the G_DBUS_DEBUG environment variable
1928  */
1929 void
1930 _g_dbus_initialize (void)
1931 {
1932   static volatile gsize initialized = 0;
1933
1934   if (g_once_init_enter (&initialized))
1935     {
1936       volatile GQuark g_dbus_error_domain;
1937       const gchar *debug;
1938
1939       g_dbus_error_domain = G_DBUS_ERROR;
1940       (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
1941
1942       debug = g_getenv ("G_DBUS_DEBUG");
1943       if (debug != NULL)
1944         {
1945           const GDebugKey keys[] = {
1946             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1947             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
1948             { "message",        G_DBUS_DEBUG_MESSAGE        },
1949             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
1950             { "call",           G_DBUS_DEBUG_CALL           },
1951             { "signal",         G_DBUS_DEBUG_SIGNAL         },
1952             { "incoming",       G_DBUS_DEBUG_INCOMING       },
1953             { "return",         G_DBUS_DEBUG_RETURN         },
1954             { "emission",       G_DBUS_DEBUG_EMISSION       },
1955             { "address",        G_DBUS_DEBUG_ADDRESS        }
1956           };
1957
1958           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1959           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1960             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1961         }
1962
1963       g_once_init_leave (&initialized, 1);
1964     }
1965 }
1966
1967 /* ---------------------------------------------------------------------------------------------------- */
1968
1969 GVariantType *
1970 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1971 {
1972   const GVariantType *arg_types[256];
1973   guint n;
1974
1975   if (args)
1976     for (n = 0; args[n] != NULL; n++)
1977       {
1978         /* DBus places a hard limit of 255 on signature length.
1979          * therefore number of args must be less than 256.
1980          */
1981         g_assert (n < 256);
1982
1983         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1984
1985         if G_UNLIKELY (arg_types[n] == NULL)
1986           return NULL;
1987       }
1988   else
1989     n = 0;
1990
1991   return g_variant_type_new_tuple (arg_types, n);
1992 }
1993
1994 /* ---------------------------------------------------------------------------------------------------- */
1995
1996 #ifdef G_OS_WIN32
1997
1998 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1999
2000 gchar *
2001 _g_dbus_win32_get_user_sid (void)
2002 {
2003   HANDLE h;
2004   TOKEN_USER *user;
2005   DWORD token_information_len;
2006   PSID psid;
2007   gchar *sid;
2008   gchar *ret;
2009
2010   ret = NULL;
2011   user = NULL;
2012   h = INVALID_HANDLE_VALUE;
2013
2014   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
2015     {
2016       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
2017       goto out;
2018     }
2019
2020   /* Get length of buffer */
2021   token_information_len = 0;
2022   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
2023     {
2024       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
2025         {
2026           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2027           goto out;
2028         }
2029     }
2030   user = g_malloc (token_information_len);
2031   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
2032     {
2033       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2034       goto out;
2035     }
2036
2037   psid = user->User.Sid;
2038   if (!IsValidSid (psid))
2039     {
2040       g_warning ("Invalid SID");
2041       goto out;
2042     }
2043
2044   if (!ConvertSidToStringSidA (psid, &sid))
2045     {
2046       g_warning ("Invalid SID");
2047       goto out;
2048     }
2049
2050   ret = g_strdup (sid);
2051   LocalFree (sid);
2052
2053 out:
2054   g_free (user);
2055   if (h != INVALID_HANDLE_VALUE)
2056     CloseHandle (h);
2057   return ret;
2058 }
2059 #endif
2060
2061 /* ---------------------------------------------------------------------------------------------------- */
2062
2063 gchar *
2064 _g_dbus_get_machine_id (GError **error)
2065 {
2066 #ifdef G_OS_WIN32
2067   HW_PROFILE_INFOA info;
2068   char *src, *dest, *res;
2069   int i;
2070
2071   if (!GetCurrentHwProfileA (&info))
2072     {
2073       char *message = g_win32_error_message (GetLastError ());
2074       g_set_error (error,
2075                    G_IO_ERROR,
2076                    G_IO_ERROR_FAILED,
2077                    _("Unable to get Hardware profile: %s"), message);
2078       g_free (message);
2079       return NULL;
2080     }
2081
2082   /* Form: {12340001-4980-1920-6788-123456789012} */
2083   src = &info.szHwProfileGuid[0];
2084
2085   res = g_malloc (32+1);
2086   dest = res;
2087
2088   src++; /* Skip { */
2089   for (i = 0; i < 8; i++)
2090     *dest++ = *src++;
2091   src++; /* Skip - */
2092   for (i = 0; i < 4; i++)
2093     *dest++ = *src++;
2094   src++; /* Skip - */
2095   for (i = 0; i < 4; i++)
2096     *dest++ = *src++;
2097   src++; /* Skip - */
2098   for (i = 0; i < 4; i++)
2099     *dest++ = *src++;
2100   src++; /* Skip - */
2101   for (i = 0; i < 12; i++)
2102     *dest++ = *src++;
2103   *dest = 0;
2104
2105   return res;
2106 #else
2107   gchar *ret;
2108   GError *first_error;
2109   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
2110   ret = NULL;
2111   first_error = NULL;
2112   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
2113                             &ret,
2114                             NULL,
2115                             &first_error) &&
2116       !g_file_get_contents ("/etc/machine-id",
2117                             &ret,
2118                             NULL,
2119                             NULL))
2120     {
2121       g_propagate_prefixed_error (error, first_error,
2122                                   _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
2123     }
2124   else
2125     {
2126       /* ignore the error from the first try, if any */
2127       g_clear_error (&first_error);
2128       /* TODO: validate value */
2129       g_strstrip (ret);
2130     }
2131   return ret;
2132 #endif
2133 }
2134
2135 /* ---------------------------------------------------------------------------------------------------- */
2136
2137 gchar *
2138 _g_dbus_enum_to_string (GType enum_type, gint value)
2139 {
2140   gchar *ret;
2141   GEnumClass *klass;
2142   GEnumValue *enum_value;
2143
2144   klass = g_type_class_ref (enum_type);
2145   enum_value = g_enum_get_value (klass, value);
2146   if (enum_value != NULL)
2147     ret = g_strdup (enum_value->value_nick);
2148   else
2149     ret = g_strdup_printf ("unknown (value %d)", value);
2150   g_type_class_unref (klass);
2151   return ret;
2152 }
2153
2154 /* ---------------------------------------------------------------------------------------------------- */
2155
2156 static void
2157 write_message_print_transport_debug (gssize bytes_written,
2158                                      MessageToWriteData *data)
2159 {
2160   if (G_LIKELY (!_g_dbus_debug_transport ()))
2161     goto out;
2162
2163   _g_dbus_debug_print_lock ();
2164   g_print ("========================================================================\n"
2165            "GDBus-debug:Transport:\n"
2166            "  >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2167            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
2168            bytes_written,
2169            g_dbus_message_get_serial (data->message),
2170            data->blob_size,
2171            data->total_written,
2172            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
2173   _g_dbus_debug_print_unlock ();
2174  out:
2175   ;
2176 }
2177
2178 /* ---------------------------------------------------------------------------------------------------- */
2179
2180 static void
2181 read_message_print_transport_debug (gssize bytes_read,
2182                                     GDBusWorker *worker)
2183 {
2184   gsize size;
2185   gint32 serial;
2186   gint32 message_length;
2187
2188   if (G_LIKELY (!_g_dbus_debug_transport ()))
2189     goto out;
2190
2191   size = bytes_read + worker->read_buffer_cur_size;
2192   serial = 0;
2193   message_length = 0;
2194   if (size >= 16)
2195     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
2196   if (size >= 1)
2197     {
2198       switch (worker->read_buffer[0])
2199         {
2200         case 'l':
2201           if (size >= 12)
2202             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
2203           break;
2204         case 'B':
2205           if (size >= 12)
2206             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
2207           break;
2208         default:
2209           /* an error will be set elsewhere if this happens */
2210           goto out;
2211         }
2212     }
2213
2214     _g_dbus_debug_print_lock ();
2215   g_print ("========================================================================\n"
2216            "GDBus-debug:Transport:\n"
2217            "  <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2218            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
2219            bytes_read,
2220            serial,
2221            message_length,
2222            worker->read_buffer_cur_size,
2223            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
2224   _g_dbus_debug_print_unlock ();
2225  out:
2226   ;
2227 }
2228
2229 /* ---------------------------------------------------------------------------------------------------- */
2230
2231 gboolean
2232 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
2233                                      GValue                *return_accu,
2234                                      const GValue          *handler_return,
2235                                      gpointer               dummy)
2236 {
2237   gboolean continue_emission;
2238   gboolean signal_return;
2239
2240   signal_return = g_value_get_boolean (handler_return);
2241   g_value_set_boolean (return_accu, signal_return);
2242   continue_emission = signal_return;
2243
2244   return continue_emission;
2245 }