oops
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: David Zeuthen <davidz@redhat.com>
19  */
20
21 #include "config.h"
22
23 #include <stdlib.h>
24 #include <string.h>
25
26 #include "giotypes.h"
27 #include "gsocket.h"
28 #include "gdbusprivate.h"
29 #include "gdbusmessage.h"
30 #include "gdbuserror.h"
31 #include "gdbusintrospection.h"
32 #include "gasyncresult.h"
33 #include "gsimpleasyncresult.h"
34 #include "ginputstream.h"
35 #include "gmemoryinputstream.h"
36 #include "giostream.h"
37 #include "glib/gstdio.h"
38 #include "gsocketcontrolmessage.h"
39 #include "gsocketconnection.h"
40 #include "gsocketoutputstream.h"
41
42 #ifdef G_OS_UNIX
43 #include "gkdbus.h"
44 #include "gkdbusconnection.h"
45 #include "gunixfdmessage.h"
46 #include "gunixconnection.h"
47 #include "gunixcredentialsmessage.h"
48 #endif
49
50 #ifdef G_OS_WIN32
51 #include <windows.h>
52 #endif
53
54 #include "glibintl.h"
55
56 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
57
58 /* ---------------------------------------------------------------------------------------------------- */
59
60 gchar *
61 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
62 {
63  guint n, m;
64  GString *ret;
65
66  ret = g_string_new (NULL);
67
68  for (n = 0; n < len; n += 16)
69    {
70      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
71
72      for (m = n; m < n + 16; m++)
73        {
74          if (m > n && (m%4) == 0)
75            g_string_append_c (ret, ' ');
76          if (m < len)
77            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
78          else
79            g_string_append (ret, "   ");
80        }
81
82      g_string_append (ret, "   ");
83
84      for (m = n; m < len && m < n + 16; m++)
85        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
86
87      g_string_append_c (ret, '\n');
88    }
89
90  return g_string_free (ret, FALSE);
91 }
92
93 /* ---------------------------------------------------------------------------------------------------- */
94
95 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
96 typedef struct
97 {
98   GKdbus *kdbus;
99   GCancellable *cancellable;
100
101   GSimpleAsyncResult *simple;
102
103   gboolean from_mainloop;
104 } ReadKdbusData;
105
106 static void
107 read_kdbus_data_free (ReadKdbusData  *data)
108 {
109   g_object_unref (data->kdbus);
110   if (data->cancellable != NULL)
111     g_object_unref (data->cancellable);
112   g_object_unref (data->simple);
113   g_free (data);
114 }
115
116 static gboolean
117 _g_kdbus_read_ready (GKdbus        *kdbus,
118                      GIOCondition   condition,
119                      gpointer       user_data)
120 {
121   ReadKdbusData *data = user_data;
122   GError *error = NULL;
123   gssize result;
124
125   result = _g_kdbus_receive (data->kdbus,
126                              data->cancellable,
127                              &error);
128
129   if (result >= 0)
130     {
131       g_simple_async_result_set_op_res_gssize (data->simple, result);
132     }
133   else
134     {
135       g_assert (error != NULL);
136       g_simple_async_result_take_error (data->simple, error);
137     }
138
139   if (data->from_mainloop)
140     g_simple_async_result_complete (data->simple);
141   else
142     g_simple_async_result_complete_in_idle (data->simple);
143
144   return FALSE;
145 }
146
147 static void
148 _g_kdbus_read (GKdbus               *kdbus,
149                GCancellable         *cancellable,
150                GAsyncReadyCallback   callback,
151                gpointer              user_data)
152 {
153   ReadKdbusData *data;
154   GSource *source;
155
156   data = g_new0 (ReadKdbusData, 1);
157   data->kdbus = g_object_ref (kdbus);
158   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
159
160   data->simple = g_simple_async_result_new (G_OBJECT (kdbus),
161                                             callback,
162                                             user_data,
163                                             _g_kdbus_read);
164   g_simple_async_result_set_check_cancellable (data->simple, cancellable);
165
166   data->from_mainloop = TRUE;
167   source = _g_kdbus_create_source (data->kdbus,
168                                    G_IO_IN,
169                                    cancellable);
170   g_source_set_callback (source,
171                          (GSourceFunc) _g_kdbus_read_ready,
172                          data,
173                          (GDestroyNotify) read_kdbus_data_free);
174   g_source_attach (source, g_main_context_get_thread_default ());
175   g_source_unref (source);
176 }
177
178 static gssize
179 _g_kdbus_read_finish (GKdbus        *kdbus,
180                       GAsyncResult  *result,
181                       GError       **error)
182 {
183   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
184
185   g_return_val_if_fail (G_IS_KDBUS (kdbus), -1);
186   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read);
187
188   if (g_simple_async_result_propagate_error (simple, error))
189     return -1;
190   else
191     return g_simple_async_result_get_op_res_gssize (simple);
192 }
193
194 #endif /* defined (G_OS_UNIX) && (KDBUS_TRANSPORT) */
195
196 /* Unfortunately ancillary messages are discarded when reading from a
197  * socket using the GSocketInputStream abstraction. So we provide a
198  * very GInputStream-ish API that uses GSocket in this case (very
199  * similar to GSocketInputStream).
200  */
201
202 typedef struct
203 {
204   GSocket *socket;
205   GCancellable *cancellable;
206
207   void *buffer;
208   gsize count;
209
210   GSocketControlMessage ***messages;
211   gint *num_messages;
212
213   GSimpleAsyncResult *simple;
214
215   gboolean from_mainloop;
216 } ReadWithControlData;
217
218 static void
219 read_with_control_data_free (ReadWithControlData *data)
220 {
221   g_object_unref (data->socket);
222   if (data->cancellable != NULL)
223     g_object_unref (data->cancellable);
224   g_object_unref (data->simple);
225   g_free (data);
226 }
227
228 static gboolean
229 _g_socket_read_with_control_messages_ready (GSocket      *socket,
230                                             GIOCondition  condition,
231                                             gpointer      user_data)
232 {
233   ReadWithControlData *data = user_data;
234   GError *error;
235   gssize result;
236   GInputVector vector;
237
238   error = NULL;
239   vector.buffer = data->buffer;
240   vector.size = data->count;
241   result = g_socket_receive_message (data->socket,
242                                      NULL, /* address */
243                                      &vector,
244                                      1,
245                                      data->messages,
246                                      data->num_messages,
247                                      NULL,
248                                      data->cancellable,
249                                      &error);
250   if (result >= 0)
251     {
252       g_simple_async_result_set_op_res_gssize (data->simple, result);
253     }
254   else
255     {
256       g_assert (error != NULL);
257       g_simple_async_result_take_error (data->simple, error);
258     }
259
260   if (data->from_mainloop)
261     g_simple_async_result_complete (data->simple);
262   else
263     g_simple_async_result_complete_in_idle (data->simple);
264
265   return FALSE;
266 }
267
268 static void
269 _g_socket_read_with_control_messages (GSocket                 *socket,
270                                       void                    *buffer,
271                                       gsize                    count,
272                                       GSocketControlMessage ***messages,
273                                       gint                    *num_messages,
274                                       gint                     io_priority,
275                                       GCancellable            *cancellable,
276                                       GAsyncReadyCallback      callback,
277                                       gpointer                 user_data)
278 {
279   ReadWithControlData *data;
280
281   data = g_new0 (ReadWithControlData, 1);
282   data->socket = g_object_ref (socket);
283   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
284   data->buffer = buffer;
285   data->count = count;
286   data->messages = messages;
287   data->num_messages = num_messages;
288
289   data->simple = g_simple_async_result_new (G_OBJECT (socket),
290                                             callback,
291                                             user_data,
292                                             _g_socket_read_with_control_messages);
293   g_simple_async_result_set_check_cancellable (data->simple, cancellable);
294
295   if (!g_socket_condition_check (socket, G_IO_IN))
296     {
297       GSource *source;
298       data->from_mainloop = TRUE;
299       source = g_socket_create_source (data->socket,
300                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
301                                        cancellable);
302       g_source_set_callback (source,
303                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
304                              data,
305                              (GDestroyNotify) read_with_control_data_free);
306       g_source_attach (source, g_main_context_get_thread_default ());
307       g_source_unref (source);
308     }
309   else
310     {
311       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
312       read_with_control_data_free (data);
313     }
314 }
315
316 static gssize
317 _g_socket_read_with_control_messages_finish (GSocket       *socket,
318                                              GAsyncResult  *result,
319                                              GError       **error)
320 {
321   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
322
323   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
324   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
325
326   if (g_simple_async_result_propagate_error (simple, error))
327       return -1;
328   else
329     return g_simple_async_result_get_op_res_gssize (simple);
330 }
331
332 /* ---------------------------------------------------------------------------------------------------- */
333
334 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
335
336 static GPtrArray *ensured_classes = NULL;
337
338 static void
339 ensure_type (GType gtype)
340 {
341   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
342 }
343
344 static void
345 release_required_types (void)
346 {
347   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
348   g_ptr_array_unref (ensured_classes);
349   ensured_classes = NULL;
350 }
351
352 static void
353 ensure_required_types (void)
354 {
355   g_assert (ensured_classes == NULL);
356   ensured_classes = g_ptr_array_new ();
357   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
358   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
359 }
360 /* ---------------------------------------------------------------------------------------------------- */
361
362 typedef struct
363 {
364   volatile gint refcount;
365   GThread *thread;
366   GMainContext *context;
367   GMainLoop *loop;
368 } SharedThreadData;
369
370 static gpointer
371 gdbus_shared_thread_func (gpointer user_data)
372 {
373   SharedThreadData *data = user_data;
374
375   g_main_context_push_thread_default (data->context);
376   g_main_loop_run (data->loop);
377   g_main_context_pop_thread_default (data->context);
378
379   release_required_types ();
380
381   return NULL;
382 }
383
384 /* ---------------------------------------------------------------------------------------------------- */
385
386 static SharedThreadData *
387 _g_dbus_shared_thread_ref (void)
388 {
389   static gsize shared_thread_data = 0;
390   SharedThreadData *ret;
391
392   if (g_once_init_enter (&shared_thread_data))
393     {
394       SharedThreadData *data;
395
396       /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
397       ensure_required_types ();
398
399       data = g_new0 (SharedThreadData, 1);
400       data->refcount = 0;
401       
402       data->context = g_main_context_new ();
403       data->loop = g_main_loop_new (data->context, FALSE);
404       data->thread = g_thread_new ("gdbus",
405                                    gdbus_shared_thread_func,
406                                    data);
407       /* We can cast between gsize and gpointer safely */
408       g_once_init_leave (&shared_thread_data, (gsize) data);
409     }
410
411   ret = (SharedThreadData*) shared_thread_data;
412   g_atomic_int_inc (&ret->refcount);
413   return ret;
414 }
415
416 static void
417 _g_dbus_shared_thread_unref (SharedThreadData *data)
418 {
419   /* TODO: actually destroy the shared thread here */
420 #if 0
421   g_assert (data != NULL);
422   if (g_atomic_int_dec_and_test (&data->refcount))
423     {
424       g_main_loop_quit (data->loop);
425       //g_thread_join (data->thread);
426       g_main_loop_unref (data->loop);
427       g_main_context_unref (data->context);
428     }
429 #endif
430 }
431
432 /* ---------------------------------------------------------------------------------------------------- */
433
434 typedef enum {
435     PENDING_NONE = 0,
436     PENDING_WRITE,
437     PENDING_FLUSH,
438     PENDING_CLOSE
439 } OutputPending;
440
441 struct GDBusWorker
442 {
443   volatile gint                       ref_count;
444
445   SharedThreadData                   *shared_thread_data;
446
447   /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
448   volatile gint                       stopped;
449
450   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
451    * only affects messages received from the other peer (since GDBusServer is the
452    * only user) - we might want it to affect messages sent to the other peer too?
453    */
454   gboolean                            frozen;
455   GDBusCapabilityFlags                capabilities;
456   GQueue                             *received_messages_while_frozen;
457
458   GIOStream                          *stream;
459   GCancellable                       *cancellable;
460   GDBusWorkerMessageReceivedCallback  message_received_callback;
461   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
462   GDBusWorkerDisconnectedCallback     disconnected_callback;
463   gpointer                            user_data;
464
465   /* if GSocket and GKdbus are NULL, stream is GSocketConnection */
466   GSocket *socket;
467 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
468   GKdbus  *kdbus;
469 #endif
470
471   /* used for reading */
472   GMutex                              read_lock;
473   gchar                              *read_buffer;
474   gsize                               read_buffer_allocated_size;
475   gsize                               read_buffer_cur_size;
476   gsize                               read_buffer_bytes_wanted;
477   GUnixFDList                        *read_fd_list;
478   GSocketControlMessage             **read_ancillary_messages;
479   gint                                read_num_ancillary_messages;
480 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
481   GSList                             *read_kdbus_msg_items;
482 #endif
483
484
485   /* Whether an async write, flush or close, or none of those, is pending.
486    * Only the worker thread may change its value, and only with the write_lock.
487    * Other threads may read its value when holding the write_lock.
488    * The worker thread may read its value at any time.
489    */
490   OutputPending                       output_pending;
491   /* used for writing */
492   GMutex                              write_lock;
493   /* queue of MessageToWriteData, protected by write_lock */
494   GQueue                             *write_queue;
495   /* protected by write_lock */
496   guint64                             write_num_messages_written;
497   /* number of messages we'd written out last time we flushed;
498    * protected by write_lock
499    */
500   guint64                             write_num_messages_flushed;
501   /* list of FlushData, protected by write_lock */
502   GList                              *write_pending_flushes;
503   /* list of CloseData, protected by write_lock */
504   GList                              *pending_close_attempts;
505   /* no lock - only used from the worker thread */
506   gboolean                            close_expected;
507 };
508
509 static void _g_dbus_worker_unref (GDBusWorker *worker);
510
511 /* ---------------------------------------------------------------------------------------------------- */
512
513 typedef struct
514 {
515   GMutex  mutex;
516   GCond   cond;
517   guint64 number_to_wait_for;
518   GError *error;
519 } FlushData;
520
521 struct _MessageToWriteData ;
522 typedef struct _MessageToWriteData MessageToWriteData;
523
524 static void message_to_write_data_free (MessageToWriteData *data);
525
526 static void read_message_print_transport_debug (gssize bytes_read,
527                                                 GDBusWorker *worker);
528
529 static void write_message_print_transport_debug (gssize bytes_written,
530                                                  MessageToWriteData *data);
531
532 typedef struct {
533     GDBusWorker *worker;
534     GCancellable *cancellable;
535     GSimpleAsyncResult *result;
536 } CloseData;
537
538 static void close_data_free (CloseData *close_data)
539 {
540   if (close_data->cancellable != NULL)
541     g_object_unref (close_data->cancellable);
542
543   if (close_data->result != NULL)
544     g_object_unref (close_data->result);
545
546   _g_dbus_worker_unref (close_data->worker);
547   g_slice_free (CloseData, close_data);
548 }
549
550 /* ---------------------------------------------------------------------------------------------------- */
551
552 static GDBusWorker *
553 _g_dbus_worker_ref (GDBusWorker *worker)
554 {
555   g_atomic_int_inc (&worker->ref_count);
556   return worker;
557 }
558
559 static void
560 _g_dbus_worker_unref (GDBusWorker *worker)
561 {
562   if (g_atomic_int_dec_and_test (&worker->ref_count))
563     {
564       g_assert (worker->write_pending_flushes == NULL);
565
566       _g_dbus_shared_thread_unref (worker->shared_thread_data);
567
568       g_object_unref (worker->stream);
569
570       g_mutex_clear (&worker->read_lock);
571       g_object_unref (worker->cancellable);
572       if (worker->read_fd_list != NULL)
573         g_object_unref (worker->read_fd_list);
574
575       g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
576       g_mutex_clear (&worker->write_lock);
577       g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
578       g_free (worker->read_buffer);
579
580       g_free (worker);
581     }
582 }
583
584 static void
585 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
586                                   gboolean      remote_peer_vanished,
587                                   GError       *error)
588 {
589   if (!g_atomic_int_get (&worker->stopped))
590     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
591 }
592
593 static void
594 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
595                                       GDBusMessage *message)
596 {
597   if (!g_atomic_int_get (&worker->stopped))
598     worker->message_received_callback (worker, message, worker->user_data);
599 }
600
601 static GDBusMessage *
602 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
603                                               GDBusMessage *message)
604 {
605   GDBusMessage *ret;
606   if (!g_atomic_int_get (&worker->stopped))
607     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
608   else
609     ret = message;
610   return ret;
611 }
612
613 /* can only be called from private thread with read-lock held - takes ownership of @message */
614 static void
615 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
616                                                   GDBusMessage *message)
617 {
618   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
619     {
620       /* queue up */
621       g_queue_push_tail (worker->received_messages_while_frozen, message);
622     }
623   else
624     {
625       /* not frozen, nor anything in queue */
626       _g_dbus_worker_emit_message_received (worker, message);
627       g_object_unref (message);
628     }
629 }
630
631 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
632 static gboolean
633 unfreeze_in_idle_cb (gpointer user_data)
634 {
635   GDBusWorker *worker = user_data;
636   GDBusMessage *message;
637
638   g_mutex_lock (&worker->read_lock);
639   if (worker->frozen)
640     {
641       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
642         {
643           _g_dbus_worker_emit_message_received (worker, message);
644           g_object_unref (message);
645         }
646       worker->frozen = FALSE;
647     }
648   else
649     {
650       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
651     }
652   g_mutex_unlock (&worker->read_lock);
653   return FALSE;
654 }
655
656 /* can be called from any thread */
657 void
658 _g_dbus_worker_unfreeze (GDBusWorker *worker)
659 {
660   GSource *idle_source;
661   idle_source = g_idle_source_new ();
662   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
663   g_source_set_callback (idle_source,
664                          unfreeze_in_idle_cb,
665                          _g_dbus_worker_ref (worker),
666                          (GDestroyNotify) _g_dbus_worker_unref);
667   g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb");
668   g_source_attach (idle_source, worker->shared_thread_data->context);
669   g_source_unref (idle_source);
670 }
671
672 /* ---------------------------------------------------------------------------------------------------- */
673
674 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
675
676 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
677 static void
678 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
679                            GAsyncResult  *res,
680                            gpointer       user_data)
681 {
682   GDBusWorker *worker = user_data;
683   GError *error;
684   gssize bytes_read;
685
686   g_mutex_lock (&worker->read_lock);
687
688   /* If already stopped, don't even process the reply */
689   if (g_atomic_int_get (&worker->stopped))
690     goto out;
691
692   error = NULL;
693   bytes_read = 0;
694
695   if (FALSE)
696     {
697     }
698 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
699   else if (G_IS_KDBUS_CONNECTION (worker->stream))
700     {
701       bytes_read = _g_kdbus_read_finish (worker->kdbus,
702                                          res,
703                                          &error);
704
705       /* [KDBUS]  Get all received items*/
706       worker->read_kdbus_msg_items = _g_kdbus_get_last_msg_items (worker->kdbus);
707
708       /* [KDBUS] Attach fds (if any) to worker->read_fd_list */
709       _g_kdbus_attach_fds_to_msg (worker->kdbus, &worker->read_fd_list);
710
711       /* [KDBUS] For KDBUS transport we read whole message at once*/
712       worker->read_buffer_bytes_wanted = bytes_read;
713     }
714 #endif
715   else if (worker->socket == NULL)
716     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
717                                              res,
718                                              &error);
719   else
720     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
721                                                               res,
722                                                               &error);
723   if (worker->read_num_ancillary_messages > 0)
724     {
725       gint n;
726       for (n = 0; n < worker->read_num_ancillary_messages; n++)
727         {
728           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
729
730           if (FALSE)
731             {
732             }
733 #ifdef G_OS_UNIX
734           else if (G_IS_UNIX_FD_MESSAGE (control_message))
735             {
736               GUnixFDMessage *fd_message;
737               gint *fds;
738               gint num_fds;
739
740               fd_message = G_UNIX_FD_MESSAGE (control_message);
741               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
742               if (worker->read_fd_list == NULL)
743                 {
744                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
745                 }
746               else
747                 {
748                   gint n;
749                   for (n = 0; n < num_fds; n++)
750                     {
751                       /* TODO: really want a append_steal() */
752                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
753                       (void) g_close (fds[n], NULL);
754                     }
755                 }
756               g_free (fds);
757             }
758           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
759             {
760               /* do nothing */
761             }
762 #endif
763           else
764             {
765               if (error == NULL)
766                 {
767                   g_set_error (&error,
768                                G_IO_ERROR,
769                                G_IO_ERROR_FAILED,
770                                "Unexpected ancillary message of type %s received from peer",
771                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
772                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
773                   g_error_free (error);
774                   g_object_unref (control_message);
775                   n++;
776                   while (n < worker->read_num_ancillary_messages)
777                     g_object_unref (worker->read_ancillary_messages[n++]);
778                   g_free (worker->read_ancillary_messages);
779                   goto out;
780                 }
781             }
782           g_object_unref (control_message);
783         }
784       g_free (worker->read_ancillary_messages);
785     }
786
787   if (bytes_read == -1)
788     {
789       if (G_UNLIKELY (_g_dbus_debug_transport ()))
790         {
791           _g_dbus_debug_print_lock ();
792           g_print ("========================================================================\n"
793                    "GDBus-debug:Transport:\n"
794                    "  ---- READ ERROR:\n"
795                    "  ---- %s %d: %s\n",
796                    g_quark_to_string (error->domain), error->code,
797                    error->message);
798           _g_dbus_debug_print_unlock ();
799         }
800
801       /* Every async read that uses this callback uses worker->cancellable
802        * as its GCancellable. worker->cancellable gets cancelled if and only
803        * if the GDBusConnection tells us to close (either via
804        * _g_dbus_worker_stop, which is called on last-unref, or directly),
805        * so a cancelled read must mean our connection was closed locally.
806        *
807        * If we're closing, other errors are possible - notably,
808        * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
809        * read in-flight. It seems sensible to treat all read errors during
810        * closing as an expected thing that doesn't trip exit-on-close.
811        *
812        * Because close_expected can't be set until we get into the worker
813        * thread, but the cancellable is signalled sooner (from another
814        * thread), we do still need to check the error.
815        */
816       if (worker->close_expected ||
817           g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
818         _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
819       else
820         _g_dbus_worker_emit_disconnected (worker, TRUE, error);
821
822       g_error_free (error);
823       goto out;
824     }
825
826 #if 0
827   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
828            (gint) bytes_read,
829            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
830            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
831            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
832                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
833            worker->stream,
834            worker);
835 #endif
836
837   /* TODO: hmm, hmm... */
838   if (bytes_read == 0)
839     {
840       g_set_error (&error,
841                    G_IO_ERROR,
842                    G_IO_ERROR_FAILED,
843                    "Underlying GIOStream returned 0 bytes on an async read");
844       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
845       g_error_free (error);
846       goto out;
847     }
848
849   /* [KDBUS] don't print transport dbus debug for kdbus connection */
850   if (!G_IS_KDBUS_CONNECTION (worker->stream))
851     read_message_print_transport_debug (bytes_read, worker);
852
853   worker->read_buffer_cur_size += bytes_read;
854   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
855     {
856       /* OK, got what we asked for! */
857       if (worker->read_buffer_bytes_wanted == 16)
858         {
859           gssize message_len;
860           /* OK, got the header - determine how many more bytes are needed */
861           error = NULL;
862           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
863                                                      16,
864                                                      &error);
865           if (message_len == -1)
866             {
867               g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
868               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
869               g_error_free (error);
870               goto out;
871             }
872
873           worker->read_buffer_bytes_wanted = message_len;
874           _g_dbus_worker_do_read_unlocked (worker);
875         }
876       else
877         {
878           GDBusMessage *message;
879           error = NULL;
880
881           /* TODO: use connection->priv->auth to decode the message */
882
883           if (FALSE)
884             {
885             }
886 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
887           else if (G_IS_KDBUS_CONNECTION (worker->stream))
888             {
889               GDBusMessageType message_type;
890               gchar *sender;
891               gchar *destination;
892
893               message = _g_dbus_message_new_from_kdbus_items (worker->read_kdbus_msg_items,
894                                                               &error);
895
896               /* [KDBUS] override informations from the user header with kernel msg header */
897               sender = _g_kdbus_get_last_msg_sender (worker->kdbus);
898               g_dbus_message_set_sender (message, sender);
899
900               message_type = g_dbus_message_get_message_type (message);
901               if (message_type == G_DBUS_MESSAGE_TYPE_SIGNAL)
902                 {
903                   destination = _g_kdbus_get_last_msg_destination (worker->kdbus);
904                   g_dbus_message_set_destination (message, destination);
905                 }
906
907               if (message == NULL)
908                 {
909                    g_warning ("Error decoding D-Bus (kdbus) message\n");
910                    g_error_free (error);
911                    goto out;
912                 }
913             }
914 #endif
915           else
916             {
917               message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
918                                                       worker->read_buffer_cur_size,
919                                                       worker->capabilities,
920                                                       &error);
921
922               if (message == NULL)
923                 {
924                   gchar *s;
925                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
926                   g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
927                              "The error is: %s\n"
928                              "The payload is as follows:\n"
929                              "%s\n",
930                              worker->read_buffer_cur_size,
931                              error->message,
932                              s);
933                   g_free (s);
934                   _g_dbus_worker_emit_disconnected (worker, FALSE, error);
935                   g_error_free (error);
936                   goto out;
937                 }
938             }
939
940 #ifdef G_OS_UNIX
941           if (worker->read_fd_list != NULL)
942             {
943               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
944               g_object_unref (worker->read_fd_list);
945               worker->read_fd_list = NULL;
946             }
947 #endif
948
949           if (G_UNLIKELY (_g_dbus_debug_message ()))
950             {
951               gchar *s;
952               _g_dbus_debug_print_lock ();
953               g_print ("========================================================================\n"
954                        "GDBus-debug:Message:\n"
955                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
956                        worker->read_buffer_cur_size);
957               s = g_dbus_message_print (message, 2);
958               g_print ("%s", s);
959               g_free (s);
960               if (G_UNLIKELY (_g_dbus_debug_payload ()))
961                 {
962                   if (FALSE)
963                     {
964                     }
965 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
966                   else if (G_IS_KDBUS_CONNECTION (worker->stream))
967                     s = _g_kdbus_hexdump_all_items (worker->read_kdbus_msg_items);
968 #endif
969                   else
970                     s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
971                   g_print ("%s\n", s);
972                   g_free (s);
973                 }
974               _g_dbus_debug_print_unlock ();
975             }
976
977           /* yay, got a message, go deliver it */
978           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
979
980           /* start reading another message! */
981           worker->read_buffer_bytes_wanted = 0;
982           worker->read_buffer_cur_size = 0;
983           _g_dbus_worker_do_read_unlocked (worker);
984         }
985     }
986   else
987     {
988       /* didn't get all the bytes we requested - so repeat the request... */
989       _g_dbus_worker_do_read_unlocked (worker);
990     }
991
992  out:
993
994 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
995   /* [KDBUS] release memory occupied by kdbus message */
996   if (G_IS_KDBUS_CONNECTION (worker->stream))
997     {
998       if (!_g_kdbus_is_closed (worker->kdbus))
999         {
1000           _g_kdbus_release_kmsg (worker->kdbus);
1001           worker->read_kdbus_msg_items = NULL;
1002         }
1003       worker->read_buffer = NULL;
1004     }
1005 #endif
1006
1007   g_mutex_unlock (&worker->read_lock);
1008
1009   /* gives up the reference acquired when calling g_input_stream_read_async() */
1010   _g_dbus_worker_unref (worker);
1011 }
1012
1013 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
1014 static void
1015 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
1016 {
1017   /* Note that we do need to keep trying to read even if close_expected is
1018    * true, because only failing a read causes us to signal 'closed'.
1019    */
1020
1021   /* [KDBUS]
1022    * For KDBUS transport we don't  have to alloc buffer (worker->read_buffer)
1023    * instead of it we use kdbus memory pool. On connection stage KDBUS client
1024    * have to register a memory pool, large enough to  carry all backlog of
1025    * data enqueued for the connection.
1026    */
1027
1028 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
1029   if (G_IS_KDBUS_CONNECTION (worker->stream))
1030     {
1031       _g_kdbus_read(worker->kdbus,
1032                     worker->cancellable,
1033                     (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
1034                     _g_dbus_worker_ref (worker));
1035       return;
1036     }
1037 #endif
1038
1039   /* if bytes_wanted is zero, it means start reading a message */
1040   if (worker->read_buffer_bytes_wanted == 0)
1041     {
1042       worker->read_buffer_cur_size = 0;
1043       worker->read_buffer_bytes_wanted = 16;
1044     }
1045
1046   /* ensure we have a (big enough) buffer */
1047   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
1048     {
1049       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
1050       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
1051       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
1052     }
1053
1054   if (worker->socket == NULL)
1055     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
1056                                worker->read_buffer + worker->read_buffer_cur_size,
1057                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
1058                                G_PRIORITY_DEFAULT,
1059                                worker->cancellable,
1060                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
1061                                _g_dbus_worker_ref (worker));
1062   else
1063     {
1064       worker->read_ancillary_messages = NULL;
1065       worker->read_num_ancillary_messages = 0;
1066       _g_socket_read_with_control_messages (worker->socket,
1067                                             worker->read_buffer + worker->read_buffer_cur_size,
1068                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
1069                                             &worker->read_ancillary_messages,
1070                                             &worker->read_num_ancillary_messages,
1071                                             G_PRIORITY_DEFAULT,
1072                                             worker->cancellable,
1073                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
1074                                             _g_dbus_worker_ref (worker));
1075     }
1076 }
1077
1078 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
1079 static gboolean
1080 _g_dbus_worker_do_initial_read (gpointer data)
1081 {
1082   GDBusWorker *worker = data;
1083   g_mutex_lock (&worker->read_lock);
1084   _g_dbus_worker_do_read_unlocked (worker);
1085   g_mutex_unlock (&worker->read_lock);
1086   return FALSE;
1087 }
1088
1089 /* ---------------------------------------------------------------------------------------------------- */
1090
1091 struct _MessageToWriteData
1092 {
1093   GDBusWorker  *worker;
1094   GDBusMessage *message;
1095   gchar        *blob;
1096   gsize         blob_size;
1097
1098   gsize               total_written;
1099   GSimpleAsyncResult *simple;
1100
1101 };
1102
1103 static void
1104 message_to_write_data_free (MessageToWriteData *data)
1105 {
1106   _g_dbus_worker_unref (data->worker);
1107   if (data->message)
1108     g_object_unref (data->message);
1109   g_free (data->blob);
1110   g_free (data);
1111 }
1112
1113 /* ---------------------------------------------------------------------------------------------------- */
1114
1115 static void write_message_continue_writing (MessageToWriteData *data);
1116
1117 /* called in private thread shared by all GDBusConnection instances
1118  *
1119  * write-lock is not held on entry
1120  * output_pending is PENDING_WRITE on entry
1121  */
1122 static void
1123 write_message_async_cb (GObject      *source_object,
1124                         GAsyncResult *res,
1125                         gpointer      user_data)
1126 {
1127   MessageToWriteData *data = user_data;
1128   GSimpleAsyncResult *simple;
1129   gssize bytes_written;
1130   GError *error;
1131
1132   /* Note: we can't access data->simple after calling g_async_result_complete () because the
1133    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
1134    */
1135   simple = data->simple;
1136
1137   error = NULL;
1138   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
1139                                                 res,
1140                                                 &error);
1141   if (bytes_written == -1)
1142     {
1143       g_simple_async_result_take_error (simple, error);
1144       g_simple_async_result_complete (simple);
1145       g_object_unref (simple);
1146       goto out;
1147     }
1148   g_assert (bytes_written > 0); /* zero is never returned */
1149
1150   write_message_print_transport_debug (bytes_written, data);
1151
1152   data->total_written += bytes_written;
1153   g_assert (data->total_written <= data->blob_size);
1154   if (data->total_written == data->blob_size)
1155     {
1156       g_simple_async_result_complete (simple);
1157       g_object_unref (simple);
1158       goto out;
1159     }
1160
1161   write_message_continue_writing (data);
1162
1163  out:
1164   ;
1165 }
1166
1167 /* called in private thread shared by all GDBusConnection instances
1168  *
1169  * write-lock is not held on entry
1170  * output_pending is PENDING_WRITE on entry
1171  */
1172 #ifdef G_OS_UNIX
1173 static gboolean
1174 on_socket_ready (GSocket      *socket,
1175                  GIOCondition  condition,
1176                  gpointer      user_data)
1177 {
1178   MessageToWriteData *data = user_data;
1179   write_message_continue_writing (data);
1180   return FALSE; /* remove source */
1181 }
1182 #endif
1183
1184 /* called in private thread shared by all GDBusConnection instances
1185  *
1186  * write-lock is not held on entry
1187  * output_pending is PENDING_WRITE on entry
1188  */
1189 static void
1190 write_message_continue_writing (MessageToWriteData *data)
1191 {
1192   GOutputStream *ostream;
1193
1194 #ifdef G_OS_UNIX
1195   GSimpleAsyncResult *simple;
1196   GUnixFDList *fd_list;
1197
1198   /* Note: we can't access data->simple after calling g_async_result_complete () because the
1199    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
1200    */
1201   simple = data->simple;
1202
1203   fd_list = g_dbus_message_get_unix_fd_list (data->message);
1204
1205 #ifdef KDBUS_TRANSPORT
1206   if (G_IS_KDBUS_CONNECTION (data->worker->stream))
1207     {
1208       GError *error;
1209       error = NULL;
1210       data->total_written = _g_kdbus_send (data->worker,
1211                                            data->worker->kdbus,
1212                                            data->message,
1213                                            data->blob,
1214                                            data->blob_size,
1215                                            fd_list,
1216                                            data->worker->cancellable,
1217                                            &error);
1218
1219       g_simple_async_result_complete (simple);
1220       g_object_unref (simple);
1221       goto out;
1222     }
1223 #endif /* KDBUS_TRANSPORT */
1224
1225 #endif /* G_OS_UNIX */
1226
1227   ostream = g_io_stream_get_output_stream (data->worker->stream);
1228
1229   g_assert (!g_output_stream_has_pending (ostream));
1230   g_assert_cmpint (data->total_written, <, data->blob_size);
1231
1232   if (FALSE)
1233     {
1234     }
1235 #ifdef G_OS_UNIX
1236   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
1237     {
1238       GOutputVector vector;
1239       GSocketControlMessage *control_message;
1240       gssize bytes_written;
1241       GError *error;
1242
1243       vector.buffer = data->blob;
1244       vector.size = data->blob_size;
1245
1246       control_message = NULL;
1247       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
1248         {
1249           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
1250             {
1251               g_simple_async_result_set_error (simple,
1252                                                G_IO_ERROR,
1253                                                G_IO_ERROR_FAILED,
1254                                                "Tried sending a file descriptor but remote peer does not support this capability");
1255               g_simple_async_result_complete (simple);
1256               g_object_unref (simple);
1257               goto out;
1258             }
1259           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
1260         }
1261
1262       error = NULL;
1263       bytes_written = g_socket_send_message (data->worker->socket,
1264                                              NULL, /* address */
1265                                              &vector,
1266                                              1,
1267                                              control_message != NULL ? &control_message : NULL,
1268                                              control_message != NULL ? 1 : 0,
1269                                              G_SOCKET_MSG_NONE,
1270                                              data->worker->cancellable,
1271                                              &error);
1272       if (control_message != NULL)
1273         g_object_unref (control_message);
1274
1275       if (bytes_written == -1)
1276         {
1277           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1278           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1279             {
1280               GSource *source;
1281               source = g_socket_create_source (data->worker->socket,
1282                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
1283                                                data->worker->cancellable);
1284               g_source_set_callback (source,
1285                                      (GSourceFunc) on_socket_ready,
1286                                      data,
1287                                      NULL); /* GDestroyNotify */
1288               g_source_attach (source, g_main_context_get_thread_default ());
1289               g_source_unref (source);
1290               g_error_free (error);
1291               goto out;
1292             }
1293           g_simple_async_result_take_error (simple, error);
1294           g_simple_async_result_complete (simple);
1295           g_object_unref (simple);
1296           goto out;
1297         }
1298       g_assert (bytes_written > 0); /* zero is never returned */
1299
1300       write_message_print_transport_debug (bytes_written, data);
1301
1302       data->total_written += bytes_written;
1303       g_assert (data->total_written <= data->blob_size);
1304       if (data->total_written == data->blob_size)
1305         {
1306           g_simple_async_result_complete (simple);
1307           g_object_unref (simple);
1308           goto out;
1309         }
1310
1311       write_message_continue_writing (data);
1312     }
1313 #endif
1314   else
1315     {
1316 #ifdef G_OS_UNIX
1317       if (fd_list != NULL)
1318         {
1319           g_simple_async_result_set_error (simple,
1320                                            G_IO_ERROR,
1321                                            G_IO_ERROR_FAILED,
1322                                            "Tried sending a file descriptor on unsupported stream of type %s",
1323                                            g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1324           g_simple_async_result_complete (simple);
1325           g_object_unref (simple);
1326           goto out;
1327         }
1328 #endif
1329
1330       g_output_stream_write_async (ostream,
1331                                    (const gchar *) data->blob + data->total_written,
1332                                    data->blob_size - data->total_written,
1333                                    G_PRIORITY_DEFAULT,
1334                                    data->worker->cancellable,
1335                                    write_message_async_cb,
1336                                    data);
1337     }
1338 #ifdef G_OS_UNIX
1339  out:
1340 #endif
1341   ;
1342 }
1343
1344 /* called in private thread shared by all GDBusConnection instances
1345  *
1346  * write-lock is not held on entry
1347  * output_pending is PENDING_WRITE on entry
1348  */
1349 static void
1350 write_message_async (GDBusWorker         *worker,
1351                      MessageToWriteData  *data,
1352                      GAsyncReadyCallback  callback,
1353                      gpointer             user_data)
1354 {
1355   data->simple = g_simple_async_result_new (NULL,
1356                                             callback,
1357                                             user_data,
1358                                             write_message_async);
1359   data->total_written = 0;
1360   write_message_continue_writing (data);
1361 }
1362
1363 /* called in private thread shared by all GDBusConnection instances (with write-lock held) */
1364 static gboolean
1365 write_message_finish (GAsyncResult   *res,
1366                       GError        **error)
1367 {
1368   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1369   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1370     return FALSE;
1371   else
1372     return TRUE;
1373 }
1374 /* ---------------------------------------------------------------------------------------------------- */
1375
1376 static void continue_writing (GDBusWorker *worker);
1377
1378 typedef struct
1379 {
1380   GDBusWorker *worker;
1381   GList *flushers;
1382 } FlushAsyncData;
1383
1384 static void
1385 flush_data_list_complete (const GList  *flushers,
1386                           const GError *error)
1387 {
1388   const GList *l;
1389
1390   for (l = flushers; l != NULL; l = l->next)
1391     {
1392       FlushData *f = l->data;
1393
1394       f->error = error != NULL ? g_error_copy (error) : NULL;
1395
1396       g_mutex_lock (&f->mutex);
1397       g_cond_signal (&f->cond);
1398       g_mutex_unlock (&f->mutex);
1399     }
1400 }
1401
1402 /* called in private thread shared by all GDBusConnection instances
1403  *
1404  * write-lock is not held on entry
1405  * output_pending is PENDING_FLUSH on entry
1406  */
1407 static void
1408 ostream_flush_cb (GObject      *source_object,
1409                   GAsyncResult *res,
1410                   gpointer      user_data)
1411 {
1412   FlushAsyncData *data = user_data;
1413   GError *error;
1414
1415   error = NULL;
1416   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1417                                 res,
1418                                 &error);
1419
1420   if (error == NULL)
1421     {
1422       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1423         {
1424           _g_dbus_debug_print_lock ();
1425           g_print ("========================================================================\n"
1426                    "GDBus-debug:Transport:\n"
1427                    "  ---- FLUSHED stream of type %s\n",
1428                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1429           _g_dbus_debug_print_unlock ();
1430         }
1431     }
1432
1433   g_assert (data->flushers != NULL);
1434   flush_data_list_complete (data->flushers, error);
1435   g_list_free (data->flushers);
1436
1437   if (error != NULL)
1438     g_error_free (error);
1439
1440   /* Make sure we tell folks that we don't have additional
1441      flushes pending */
1442   g_mutex_lock (&data->worker->write_lock);
1443   data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
1444   g_assert (data->worker->output_pending == PENDING_FLUSH);
1445   data->worker->output_pending = PENDING_NONE;
1446   g_mutex_unlock (&data->worker->write_lock);
1447
1448   /* OK, cool, finally kick off the next write */
1449   continue_writing (data->worker);
1450
1451   _g_dbus_worker_unref (data->worker);
1452   g_free (data);
1453 }
1454
1455 /* called in private thread shared by all GDBusConnection instances
1456  *
1457  * write-lock is not held on entry
1458  * output_pending is PENDING_FLUSH on entry
1459  */
1460 static void
1461 start_flush (FlushAsyncData *data)
1462 {
1463   /*[KDBUS]: TODO: to investigate */
1464   if (G_IS_KDBUS_CONNECTION (data->worker->stream))
1465     {
1466       g_assert (data->flushers != NULL);
1467       flush_data_list_complete (data->flushers, NULL);
1468       g_list_free (data->flushers);
1469
1470       g_mutex_lock (&data->worker->write_lock);
1471       data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
1472       g_assert (data->worker->output_pending == PENDING_FLUSH);
1473       data->worker->output_pending = PENDING_NONE;
1474       g_mutex_unlock (&data->worker->write_lock);
1475
1476       /* OK, cool, finally kick off the next write */
1477       continue_writing (data->worker);
1478
1479       _g_dbus_worker_unref (data->worker);
1480       g_free (data);
1481     }
1482   else
1483     {
1484       g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
1485                                    G_PRIORITY_DEFAULT,
1486                                    data->worker->cancellable,
1487                                    ostream_flush_cb,
1488                                    data);
1489     }
1490 }
1491
1492 /* called in private thread shared by all GDBusConnection instances
1493  *
1494  * write-lock is held on entry
1495  * output_pending is PENDING_NONE on entry
1496  */
1497 static void
1498 message_written_unlocked (GDBusWorker *worker,
1499                           MessageToWriteData *message_data)
1500 {
1501   if (G_UNLIKELY (_g_dbus_debug_message ()))
1502     {
1503       gchar *s;
1504       _g_dbus_debug_print_lock ();
1505       g_print ("========================================================================\n"
1506                "GDBus-debug:Message:\n"
1507                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1508                message_data->blob_size);
1509       s = g_dbus_message_print (message_data->message, 2);
1510       g_print ("%s", s);
1511       g_free (s);
1512       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1513         {
1514           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1515           g_print ("%s\n", s);
1516           g_free (s);
1517         }
1518       _g_dbus_debug_print_unlock ();
1519     }
1520
1521   worker->write_num_messages_written += 1;
1522 }
1523
1524 /* called in private thread shared by all GDBusConnection instances
1525  *
1526  * write-lock is held on entry
1527  * output_pending is PENDING_NONE on entry
1528  *
1529  * Returns: non-%NULL, setting @output_pending, if we need to flush now
1530  */
1531 static FlushAsyncData *
1532 prepare_flush_unlocked (GDBusWorker *worker)
1533 {
1534   GList *l;
1535   GList *ll;
1536   GList *flushers;
1537
1538   flushers = NULL;
1539   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1540     {
1541       FlushData *f = l->data;
1542       ll = l->next;
1543
1544       if (f->number_to_wait_for == worker->write_num_messages_written)
1545         {
1546           flushers = g_list_append (flushers, f);
1547           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1548         }
1549     }
1550   if (flushers != NULL)
1551     {
1552       g_assert (worker->output_pending == PENDING_NONE);
1553       worker->output_pending = PENDING_FLUSH;
1554     }
1555
1556   if (flushers != NULL)
1557     {
1558       FlushAsyncData *data;
1559
1560       data = g_new0 (FlushAsyncData, 1);
1561       data->worker = _g_dbus_worker_ref (worker);
1562       data->flushers = flushers;
1563       return data;
1564     }
1565
1566   return NULL;
1567 }
1568
1569 /* called in private thread shared by all GDBusConnection instances
1570  *
1571  * write-lock is not held on entry
1572  * output_pending is PENDING_WRITE on entry
1573  */
1574 static void
1575 write_message_cb (GObject       *source_object,
1576                   GAsyncResult  *res,
1577                   gpointer       user_data)
1578 {
1579   MessageToWriteData *data = user_data;
1580   GError *error;
1581
1582   g_mutex_lock (&data->worker->write_lock);
1583   g_assert (data->worker->output_pending == PENDING_WRITE);
1584   data->worker->output_pending = PENDING_NONE;
1585
1586   error = NULL;
1587   if (!write_message_finish (res, &error))
1588     {
1589       g_mutex_unlock (&data->worker->write_lock);
1590
1591       /* TODO: handle */
1592       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1593       g_error_free (error);
1594
1595       g_mutex_lock (&data->worker->write_lock);
1596     }
1597
1598   message_written_unlocked (data->worker, data);
1599
1600   g_mutex_unlock (&data->worker->write_lock);
1601
1602   continue_writing (data->worker);
1603
1604   message_to_write_data_free (data);
1605 }
1606
1607 /* called in private thread shared by all GDBusConnection instances
1608  *
1609  * write-lock is not held on entry
1610  * output_pending is PENDING_CLOSE on entry
1611  */
1612 static void
1613 iostream_close_cb (GObject      *source_object,
1614                    GAsyncResult *res,
1615                    gpointer      user_data)
1616 {
1617   GDBusWorker *worker = user_data;
1618   GError *error = NULL;
1619   GList *pending_close_attempts, *pending_flush_attempts;
1620   GQueue *send_queue;
1621
1622   g_io_stream_close_finish (worker->stream, res, &error);
1623
1624   g_mutex_lock (&worker->write_lock);
1625
1626   pending_close_attempts = worker->pending_close_attempts;
1627   worker->pending_close_attempts = NULL;
1628
1629   pending_flush_attempts = worker->write_pending_flushes;
1630   worker->write_pending_flushes = NULL;
1631
1632   send_queue = worker->write_queue;
1633   worker->write_queue = g_queue_new ();
1634
1635   g_assert (worker->output_pending == PENDING_CLOSE);
1636   worker->output_pending = PENDING_NONE;
1637
1638   g_mutex_unlock (&worker->write_lock);
1639
1640   while (pending_close_attempts != NULL)
1641     {
1642       CloseData *close_data = pending_close_attempts->data;
1643
1644       pending_close_attempts = g_list_delete_link (pending_close_attempts,
1645                                                    pending_close_attempts);
1646
1647       if (close_data->result != NULL)
1648         {
1649           if (error != NULL)
1650             g_simple_async_result_set_from_error (close_data->result, error);
1651
1652           /* this must be in an idle because the result is likely to be
1653            * intended for another thread
1654            */
1655           g_simple_async_result_complete_in_idle (close_data->result);
1656         }
1657
1658       close_data_free (close_data);
1659     }
1660
1661   g_clear_error (&error);
1662
1663   /* all messages queued for sending are discarded */
1664   g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
1665   /* all queued flushes fail */
1666   error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
1667                        _("Operation was cancelled"));
1668   flush_data_list_complete (pending_flush_attempts, error);
1669   g_list_free (pending_flush_attempts);
1670   g_clear_error (&error);
1671
1672   _g_dbus_worker_unref (worker);
1673 }
1674
1675 /* called in private thread shared by all GDBusConnection instances
1676  *
1677  * write-lock is not held on entry
1678  * output_pending must be PENDING_NONE on entry
1679  */
1680 static void
1681 continue_writing (GDBusWorker *worker)
1682 {
1683   MessageToWriteData *data;
1684   FlushAsyncData *flush_async_data;
1685
1686  write_next:
1687   /* we mustn't try to write two things at once */
1688   g_assert (worker->output_pending == PENDING_NONE);
1689
1690   g_mutex_lock (&worker->write_lock);
1691
1692   data = NULL;
1693   flush_async_data = NULL;
1694
1695   /* if we want to close the connection, that takes precedence */
1696   if (worker->pending_close_attempts != NULL)
1697     {
1698       worker->close_expected = TRUE;
1699       worker->output_pending = PENDING_CLOSE;
1700
1701       g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
1702                                NULL, iostream_close_cb,
1703                                _g_dbus_worker_ref (worker));
1704     }
1705   else
1706     {
1707       flush_async_data = prepare_flush_unlocked (worker);
1708
1709       if (flush_async_data == NULL)
1710         {
1711           data = g_queue_pop_head (worker->write_queue);
1712
1713           if (data != NULL)
1714             worker->output_pending = PENDING_WRITE;
1715         }
1716     }
1717
1718   g_mutex_unlock (&worker->write_lock);
1719
1720   /* Note that write_lock is only used for protecting the @write_queue
1721    * and @output_pending fields of the GDBusWorker struct ... which we
1722    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1723    *
1724    * Therefore, it's fine to drop it here when calling back into user
1725    * code and then writing the message out onto the GIOStream since this
1726    * function only runs on the worker thread.
1727    */
1728
1729   if (flush_async_data != NULL)
1730     {
1731       start_flush (flush_async_data);
1732       g_assert (data == NULL);
1733     }
1734   else if (data != NULL)
1735     {
1736       GDBusMessage *old_message;
1737       guchar *new_blob;
1738       gsize new_blob_size;
1739       GError *error;
1740
1741       old_message = data->message;
1742       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1743       if (data->message == old_message)
1744         {
1745           /* filters had no effect - do nothing */
1746         }
1747       else if (data->message == NULL)
1748         {
1749           /* filters dropped message */
1750           g_mutex_lock (&worker->write_lock);
1751           worker->output_pending = PENDING_NONE;
1752           g_mutex_unlock (&worker->write_lock);
1753           message_to_write_data_free (data);
1754           goto write_next;
1755         }
1756       else
1757         {
1758           /* filters altered the message -> reencode */
1759           error = NULL;
1760
1761          /* [KDBUS]
1762           * Setting protocol version, before invoking g_dbus_message_to_blob() will
1763           * be removed after preparing new function only for kdbus transport purposes
1764           * (this function will be able to create blob directly/unconditionally in memfd
1765           * object, without making copy):
1766           *
1767           * [1] https://code.google.com/p/d-bus/source/browse/TODO
1768           */
1769
1770           if (G_IS_KDBUS_CONNECTION (worker->stream))
1771             _g_dbus_message_set_protocol_ver (data->message,2);
1772           else
1773             _g_dbus_message_set_protocol_ver (data->message,1);
1774
1775           new_blob = g_dbus_message_to_blob (data->message,
1776                                              &new_blob_size,
1777                                              worker->capabilities,
1778                                              &error);
1779           if (new_blob == NULL)
1780             {
1781               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1782                * the old message instead
1783                */
1784               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1785                          g_dbus_message_get_serial (data->message),
1786                          error->message);
1787               g_error_free (error);
1788             }
1789           else
1790             {
1791               g_free (data->blob);
1792               data->blob = (gchar *) new_blob;
1793               data->blob_size = new_blob_size;
1794             }
1795         }
1796
1797       write_message_async (worker,
1798                            data,
1799                            write_message_cb,
1800                            data);
1801     }
1802 }
1803
1804 /* called in private thread shared by all GDBusConnection instances
1805  *
1806  * write-lock is not held on entry
1807  * output_pending may be anything
1808  */
1809 static gboolean
1810 continue_writing_in_idle_cb (gpointer user_data)
1811 {
1812   GDBusWorker *worker = user_data;
1813
1814   /* Because this is the worker thread, we can read this struct member
1815    * without holding the lock: no other thread ever modifies it.
1816    */
1817   if (worker->output_pending == PENDING_NONE)
1818     continue_writing (worker);
1819
1820   return FALSE;
1821 }
1822
1823 /**
1824  * @write_data: (transfer full) (allow-none):
1825  * @flush_data: (transfer full) (allow-none):
1826  * @close_data: (transfer full) (allow-none):
1827  *
1828  * Can be called from any thread
1829  *
1830  * write_lock is held on entry
1831  * output_pending may be anything
1832  */
1833 static void
1834 schedule_writing_unlocked (GDBusWorker        *worker,
1835                            MessageToWriteData *write_data,
1836                            FlushData          *flush_data,
1837                            CloseData          *close_data)
1838 {
1839   if (write_data != NULL)
1840     g_queue_push_tail (worker->write_queue, write_data);
1841
1842   if (flush_data != NULL)
1843     worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
1844
1845   if (close_data != NULL)
1846     worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
1847                                                      close_data);
1848
1849   /* If we had output pending, the next bit of output will happen
1850    * automatically when it finishes, so we only need to do this
1851    * if nothing was pending.
1852    *
1853    * The idle callback will re-check that output_pending is still
1854    * PENDING_NONE, to guard against output starting before the idle.
1855    */
1856   if (worker->output_pending == PENDING_NONE)
1857     {
1858       GSource *idle_source;
1859       idle_source = g_idle_source_new ();
1860       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1861       g_source_set_callback (idle_source,
1862                              continue_writing_in_idle_cb,
1863                              _g_dbus_worker_ref (worker),
1864                              (GDestroyNotify) _g_dbus_worker_unref);
1865       g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb");
1866       g_source_attach (idle_source, worker->shared_thread_data->context);
1867       g_source_unref (idle_source);
1868     }
1869 }
1870
1871 /* ---------------------------------------------------------------------------------------------------- */
1872
1873 /* can be called from any thread - steals blob
1874  *
1875  * write_lock is not held on entry
1876  * output_pending may be anything
1877  */
1878 void
1879 _g_dbus_worker_send_message (GDBusWorker    *worker,
1880                              GDBusMessage   *message,
1881                              gchar          *blob,
1882                              gsize           blob_len)
1883 {
1884   MessageToWriteData *data;
1885
1886   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1887   g_return_if_fail (blob != NULL);
1888   g_return_if_fail (blob_len > 16);
1889
1890   data = g_new0 (MessageToWriteData, 1);
1891   data->worker = _g_dbus_worker_ref (worker);
1892   data->message = g_object_ref (message);
1893   data->blob = blob; /* steal! */
1894   data->blob_size = blob_len;
1895
1896   g_mutex_lock (&worker->write_lock);
1897   schedule_writing_unlocked (worker, data, NULL, NULL);
1898   g_mutex_unlock (&worker->write_lock);
1899 }
1900
1901 /* ---------------------------------------------------------------------------------------------------- */
1902
1903 GDBusWorker *
1904 _g_dbus_worker_new (GIOStream                              *stream,
1905                     GDBusCapabilityFlags                    capabilities,
1906                     gboolean                                initially_frozen,
1907                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1908                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1909                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1910                     gpointer                                user_data)
1911 {
1912   GDBusWorker *worker;
1913   GSource *idle_source;
1914
1915   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1916   g_return_val_if_fail (message_received_callback != NULL, NULL);
1917   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1918   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1919
1920   worker = g_new0 (GDBusWorker, 1);
1921   worker->ref_count = 1;
1922
1923   g_mutex_init (&worker->read_lock);
1924   worker->message_received_callback = message_received_callback;
1925   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1926   worker->disconnected_callback = disconnected_callback;
1927   worker->user_data = user_data;
1928   worker->stream = g_object_ref (stream);
1929   worker->capabilities = capabilities;
1930   worker->cancellable = g_cancellable_new ();
1931   worker->output_pending = PENDING_NONE;
1932
1933   worker->frozen = initially_frozen;
1934   worker->received_messages_while_frozen = g_queue_new ();
1935
1936   g_mutex_init (&worker->write_lock);
1937   worker->write_queue = g_queue_new ();
1938
1939   if (G_IS_SOCKET_CONNECTION (worker->stream))
1940     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1941
1942 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
1943   if (G_IS_KDBUS_CONNECTION (worker->stream))
1944     worker->kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream));
1945 #endif
1946
1947   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1948
1949   /* begin reading */
1950   idle_source = g_idle_source_new ();
1951   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1952   g_source_set_callback (idle_source,
1953                          _g_dbus_worker_do_initial_read,
1954                          _g_dbus_worker_ref (worker),
1955                          (GDestroyNotify) _g_dbus_worker_unref);
1956   g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read");
1957   g_source_attach (idle_source, worker->shared_thread_data->context);
1958   g_source_unref (idle_source);
1959
1960   return worker;
1961 }
1962
1963 /* ---------------------------------------------------------------------------------------------------- */
1964
1965 /* can be called from any thread
1966  *
1967  * write_lock is not held on entry
1968  * output_pending may be anything
1969  */
1970 void
1971 _g_dbus_worker_close (GDBusWorker         *worker,
1972                       GCancellable        *cancellable,
1973                       GSimpleAsyncResult  *result)
1974 {
1975   CloseData *close_data;
1976
1977   close_data = g_slice_new0 (CloseData);
1978   close_data->worker = _g_dbus_worker_ref (worker);
1979   close_data->cancellable =
1980       (cancellable == NULL ? NULL : g_object_ref (cancellable));
1981   close_data->result = (result == NULL ? NULL : g_object_ref (result));
1982
1983   /* Don't set worker->close_expected here - we're in the wrong thread.
1984    * It'll be set before the actual close happens.
1985    */
1986   g_cancellable_cancel (worker->cancellable);
1987   g_mutex_lock (&worker->write_lock);
1988   schedule_writing_unlocked (worker, NULL, NULL, close_data);
1989   g_mutex_unlock (&worker->write_lock);
1990 }
1991
1992 /* This can be called from any thread - frees worker. Note that
1993  * callbacks might still happen if called from another thread than the
1994  * worker - use your own synchronization primitive in the callbacks.
1995  *
1996  * write_lock is not held on entry
1997  * output_pending may be anything
1998  */
1999 void
2000 _g_dbus_worker_stop (GDBusWorker *worker)
2001 {
2002   g_atomic_int_set (&worker->stopped, TRUE);
2003
2004   /* Cancel any pending operations and schedule a close of the underlying I/O
2005    * stream in the worker thread
2006    */
2007   _g_dbus_worker_close (worker, NULL, NULL);
2008
2009   /* _g_dbus_worker_close holds a ref until after an idle in the worker
2010    * thread has run, so we no longer need to unref in an idle like in
2011    * commit 322e25b535
2012    */
2013   _g_dbus_worker_unref (worker);
2014 }
2015
2016 /* ---------------------------------------------------------------------------------------------------- */
2017
2018 /* can be called from any thread (except the worker thread) - blocks
2019  * calling thread until all queued outgoing messages are written and
2020  * the transport has been flushed
2021  *
2022  * write_lock is not held on entry
2023  * output_pending may be anything
2024  */
2025 gboolean
2026 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
2027                            GCancellable   *cancellable,
2028                            GError        **error)
2029 {
2030   gboolean ret;
2031   FlushData *data;
2032   guint64 pending_writes;
2033
2034   data = NULL;
2035   ret = TRUE;
2036
2037   g_mutex_lock (&worker->write_lock);
2038
2039   /* if the queue is empty, no write is in-flight and we haven't written
2040    * anything since the last flush, then there's nothing to wait for
2041    */
2042   pending_writes = g_queue_get_length (worker->write_queue);
2043
2044   /* if a write is in-flight, we shouldn't be satisfied until the first
2045    * flush operation that follows it
2046    */
2047   if (worker->output_pending == PENDING_WRITE)
2048     pending_writes += 1;
2049
2050   if (pending_writes > 0 ||
2051       worker->write_num_messages_written != worker->write_num_messages_flushed)
2052     {
2053       data = g_new0 (FlushData, 1);
2054       g_mutex_init (&data->mutex);
2055       g_cond_init (&data->cond);
2056       data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
2057       g_mutex_lock (&data->mutex);
2058
2059       schedule_writing_unlocked (worker, NULL, data, NULL);
2060     }
2061   g_mutex_unlock (&worker->write_lock);
2062
2063   if (data != NULL)
2064     {
2065       g_cond_wait (&data->cond, &data->mutex);
2066       g_mutex_unlock (&data->mutex);
2067
2068       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
2069       g_cond_clear (&data->cond);
2070       g_mutex_clear (&data->mutex);
2071       if (data->error != NULL)
2072         {
2073           ret = FALSE;
2074           g_propagate_error (error, data->error);
2075         }
2076       g_free (data);
2077     }
2078
2079   return ret;
2080 }
2081
2082 /* ---------------------------------------------------------------------------------------------------- */
2083
2084 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
2085 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
2086 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
2087 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
2088 #define G_DBUS_DEBUG_CALL           (1<<4)
2089 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
2090 #define G_DBUS_DEBUG_INCOMING       (1<<6)
2091 #define G_DBUS_DEBUG_RETURN         (1<<7)
2092 #define G_DBUS_DEBUG_EMISSION       (1<<8)
2093 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
2094
2095 static gint _gdbus_debug_flags = 0;
2096
2097 gboolean
2098 _g_dbus_debug_authentication (void)
2099 {
2100   _g_dbus_initialize ();
2101   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
2102 }
2103
2104 gboolean
2105 _g_dbus_debug_transport (void)
2106 {
2107   _g_dbus_initialize ();
2108   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
2109 }
2110
2111 gboolean
2112 _g_dbus_debug_message (void)
2113 {
2114   _g_dbus_initialize ();
2115   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
2116 }
2117
2118 gboolean
2119 _g_dbus_debug_payload (void)
2120 {
2121   _g_dbus_initialize ();
2122   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
2123 }
2124
2125 gboolean
2126 _g_dbus_debug_call (void)
2127 {
2128   _g_dbus_initialize ();
2129   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
2130 }
2131
2132 gboolean
2133 _g_dbus_debug_signal (void)
2134 {
2135   _g_dbus_initialize ();
2136   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
2137 }
2138
2139 gboolean
2140 _g_dbus_debug_incoming (void)
2141 {
2142   _g_dbus_initialize ();
2143   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
2144 }
2145
2146 gboolean
2147 _g_dbus_debug_return (void)
2148 {
2149   _g_dbus_initialize ();
2150   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
2151 }
2152
2153 gboolean
2154 _g_dbus_debug_emission (void)
2155 {
2156   _g_dbus_initialize ();
2157   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
2158 }
2159
2160 gboolean
2161 _g_dbus_debug_address (void)
2162 {
2163   _g_dbus_initialize ();
2164   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
2165 }
2166
2167 G_LOCK_DEFINE_STATIC (print_lock);
2168
2169 void
2170 _g_dbus_debug_print_lock (void)
2171 {
2172   G_LOCK (print_lock);
2173 }
2174
2175 void
2176 _g_dbus_debug_print_unlock (void)
2177 {
2178   G_UNLOCK (print_lock);
2179 }
2180
2181 /**
2182  * _g_dbus_initialize:
2183  *
2184  * Does various one-time init things such as
2185  *
2186  *  - registering the G_DBUS_ERROR error domain
2187  *  - parses the G_DBUS_DEBUG environment variable
2188  */
2189 void
2190 _g_dbus_initialize (void)
2191 {
2192   static volatile gsize initialized = 0;
2193
2194   if (g_once_init_enter (&initialized))
2195     {
2196       volatile GQuark g_dbus_error_domain;
2197       const gchar *debug;
2198
2199       g_dbus_error_domain = G_DBUS_ERROR;
2200       (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
2201
2202       debug = g_getenv ("G_DBUS_DEBUG");
2203       if (debug != NULL)
2204         {
2205           const GDebugKey keys[] = {
2206             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
2207             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
2208             { "message",        G_DBUS_DEBUG_MESSAGE        },
2209             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
2210             { "call",           G_DBUS_DEBUG_CALL           },
2211             { "signal",         G_DBUS_DEBUG_SIGNAL         },
2212             { "incoming",       G_DBUS_DEBUG_INCOMING       },
2213             { "return",         G_DBUS_DEBUG_RETURN         },
2214             { "emission",       G_DBUS_DEBUG_EMISSION       },
2215             { "address",        G_DBUS_DEBUG_ADDRESS        }
2216           };
2217
2218           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
2219           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
2220             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
2221         }
2222
2223       g_once_init_leave (&initialized, 1);
2224     }
2225 }
2226
2227 /* ---------------------------------------------------------------------------------------------------- */
2228
2229 GVariantType *
2230 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
2231 {
2232   const GVariantType *arg_types[256];
2233   guint n;
2234
2235   if (args)
2236     for (n = 0; args[n] != NULL; n++)
2237       {
2238         /* DBus places a hard limit of 255 on signature length.
2239          * therefore number of args must be less than 256.
2240          */
2241         g_assert (n < 256);
2242
2243         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
2244
2245         if G_UNLIKELY (arg_types[n] == NULL)
2246           return NULL;
2247       }
2248   else
2249     n = 0;
2250
2251   return g_variant_type_new_tuple (arg_types, n);
2252 }
2253
2254 /* ---------------------------------------------------------------------------------------------------- */
2255
2256 #ifdef G_OS_WIN32
2257
2258 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
2259
2260 gchar *
2261 _g_dbus_win32_get_user_sid (void)
2262 {
2263   HANDLE h;
2264   TOKEN_USER *user;
2265   DWORD token_information_len;
2266   PSID psid;
2267   gchar *sid;
2268   gchar *ret;
2269
2270   ret = NULL;
2271   user = NULL;
2272   h = INVALID_HANDLE_VALUE;
2273
2274   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
2275     {
2276       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
2277       goto out;
2278     }
2279
2280   /* Get length of buffer */
2281   token_information_len = 0;
2282   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
2283     {
2284       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
2285         {
2286           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2287           goto out;
2288         }
2289     }
2290   user = g_malloc (token_information_len);
2291   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
2292     {
2293       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2294       goto out;
2295     }
2296
2297   psid = user->User.Sid;
2298   if (!IsValidSid (psid))
2299     {
2300       g_warning ("Invalid SID");
2301       goto out;
2302     }
2303
2304   if (!ConvertSidToStringSidA (psid, &sid))
2305     {
2306       g_warning ("Invalid SID");
2307       goto out;
2308     }
2309
2310   ret = g_strdup (sid);
2311   LocalFree (sid);
2312
2313 out:
2314   g_free (user);
2315   if (h != INVALID_HANDLE_VALUE)
2316     CloseHandle (h);
2317   return ret;
2318 }
2319 #endif
2320
2321 /* ---------------------------------------------------------------------------------------------------- */
2322
2323 gchar *
2324 _g_dbus_get_machine_id (GError **error)
2325 {
2326 #ifdef G_OS_WIN32
2327   HW_PROFILE_INFOA info;
2328   char *src, *dest, *res;
2329   int i;
2330
2331   if (!GetCurrentHwProfileA (&info))
2332     {
2333       char *message = g_win32_error_message (GetLastError ());
2334       g_set_error (error,
2335                    G_IO_ERROR,
2336                    G_IO_ERROR_FAILED,
2337                    _("Unable to get Hardware profile: %s"), message);
2338       g_free (message);
2339       return NULL;
2340     }
2341
2342   /* Form: {12340001-4980-1920-6788-123456789012} */
2343   src = &info.szHwProfileGuid[0];
2344
2345   res = g_malloc (32+1);
2346   dest = res;
2347
2348   src++; /* Skip { */
2349   for (i = 0; i < 8; i++)
2350     *dest++ = *src++;
2351   src++; /* Skip - */
2352   for (i = 0; i < 4; i++)
2353     *dest++ = *src++;
2354   src++; /* Skip - */
2355   for (i = 0; i < 4; i++)
2356     *dest++ = *src++;
2357   src++; /* Skip - */
2358   for (i = 0; i < 4; i++)
2359     *dest++ = *src++;
2360   src++; /* Skip - */
2361   for (i = 0; i < 12; i++)
2362     *dest++ = *src++;
2363   *dest = 0;
2364
2365   return res;
2366 #else
2367   gchar *ret;
2368   GError *first_error;
2369   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
2370   ret = NULL;
2371   first_error = NULL;
2372   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
2373                             &ret,
2374                             NULL,
2375                             &first_error) &&
2376       !g_file_get_contents ("/etc/machine-id",
2377                             &ret,
2378                             NULL,
2379                             NULL))
2380     {
2381       g_propagate_prefixed_error (error, first_error,
2382                                   _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
2383     }
2384   else
2385     {
2386       /* ignore the error from the first try, if any */
2387       g_clear_error (&first_error);
2388       /* TODO: validate value */
2389       g_strstrip (ret);
2390     }
2391   return ret;
2392 #endif
2393 }
2394
2395 /* ---------------------------------------------------------------------------------------------------- */
2396
2397 gchar *
2398 _g_dbus_enum_to_string (GType enum_type, gint value)
2399 {
2400   gchar *ret;
2401   GEnumClass *klass;
2402   GEnumValue *enum_value;
2403
2404   klass = g_type_class_ref (enum_type);
2405   enum_value = g_enum_get_value (klass, value);
2406   if (enum_value != NULL)
2407     ret = g_strdup (enum_value->value_nick);
2408   else
2409     ret = g_strdup_printf ("unknown (value %d)", value);
2410   g_type_class_unref (klass);
2411   return ret;
2412 }
2413
2414 /* ---------------------------------------------------------------------------------------------------- */
2415
2416 static void
2417 write_message_print_transport_debug (gssize bytes_written,
2418                                      MessageToWriteData *data)
2419 {
2420   if (G_LIKELY (!_g_dbus_debug_transport ()))
2421     goto out;
2422
2423   _g_dbus_debug_print_lock ();
2424   g_print ("========================================================================\n"
2425            "GDBus-debug:Transport:\n"
2426            "  >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2427            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
2428            bytes_written,
2429            g_dbus_message_get_serial (data->message),
2430            data->blob_size,
2431            data->total_written,
2432            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
2433   _g_dbus_debug_print_unlock ();
2434  out:
2435   ;
2436 }
2437
2438 /* ---------------------------------------------------------------------------------------------------- */
2439
2440 static void
2441 read_message_print_transport_debug (gssize bytes_read,
2442                                     GDBusWorker *worker)
2443 {
2444   gsize size;
2445   gint32 serial;
2446   gint32 message_length;
2447
2448   if (G_LIKELY (!_g_dbus_debug_transport ()))
2449     goto out;
2450
2451   size = bytes_read + worker->read_buffer_cur_size;
2452   serial = 0;
2453   message_length = 0;
2454   if (size >= 16)
2455     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
2456   if (size >= 1)
2457     {
2458       switch (worker->read_buffer[0])
2459         {
2460         case 'l':
2461           if (size >= 12)
2462             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
2463           break;
2464         case 'B':
2465           if (size >= 12)
2466             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
2467           break;
2468         default:
2469           /* an error will be set elsewhere if this happens */
2470           goto out;
2471         }
2472     }
2473
2474     _g_dbus_debug_print_lock ();
2475   g_print ("========================================================================\n"
2476            "GDBus-debug:Transport:\n"
2477            "  <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2478            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
2479            bytes_read,
2480            serial,
2481            message_length,
2482            worker->read_buffer_cur_size,
2483            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
2484   _g_dbus_debug_print_unlock ();
2485  out:
2486   ;
2487 }
2488
2489 /* ---------------------------------------------------------------------------------------------------- */
2490
2491 gboolean
2492 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
2493                                      GValue                *return_accu,
2494                                      const GValue          *handler_return,
2495                                      gpointer               dummy)
2496 {
2497   gboolean continue_emission;
2498   gboolean signal_return;
2499
2500   signal_return = g_value_get_boolean (handler_return);
2501   g_value_set_boolean (return_accu, signal_return);
2502   continue_emission = signal_return;
2503
2504   return continue_emission;
2505 }