hook gvariant vectors up to kdbus
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: David Zeuthen <davidz@redhat.com>
19  */
20
21 #include "config.h"
22
23 #include <stdlib.h>
24 #include <string.h>
25
26 #include "giotypes.h"
27 #include "gsocket.h"
28 #include "gdbusprivate.h"
29 #include "gdbusmessage.h"
30 #include "gdbuserror.h"
31 #include "gdbusintrospection.h"
32 #include "gasyncresult.h"
33 #include "gsimpleasyncresult.h"
34 #include "ginputstream.h"
35 #include "gmemoryinputstream.h"
36 #include "giostream.h"
37 #include "glib/gstdio.h"
38 #include "gsocketcontrolmessage.h"
39 #include "gsocketconnection.h"
40 #include "gsocketoutputstream.h"
41
42 #ifdef G_OS_UNIX
43 #include "gkdbus.h"
44 #include "gkdbusconnection.h"
45 #include "gunixfdmessage.h"
46 #include "gunixconnection.h"
47 #include "gunixcredentialsmessage.h"
48 #endif
49
50 #ifdef G_OS_WIN32
51 #include <windows.h>
52 #endif
53
54 #include "glibintl.h"
55
56 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
57
58 /* ---------------------------------------------------------------------------------------------------- */
59
60 gchar *
61 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
62 {
63  guint n, m;
64  GString *ret;
65
66  ret = g_string_new (NULL);
67
68  for (n = 0; n < len; n += 16)
69    {
70      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
71
72      for (m = n; m < n + 16; m++)
73        {
74          if (m > n && (m%4) == 0)
75            g_string_append_c (ret, ' ');
76          if (m < len)
77            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
78          else
79            g_string_append (ret, "   ");
80        }
81
82      g_string_append (ret, "   ");
83
84      for (m = n; m < len && m < n + 16; m++)
85        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
86
87      g_string_append_c (ret, '\n');
88    }
89
90  return g_string_free (ret, FALSE);
91 }
92
93 /* ---------------------------------------------------------------------------------------------------- */
94
95 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
96 typedef struct
97 {
98   GKdbus *kdbus;
99   GCancellable *cancellable;
100
101   GSimpleAsyncResult *simple;
102
103   gboolean from_mainloop;
104 } ReadKdbusData;
105
106 static void
107 read_kdbus_data_free (ReadKdbusData  *data)
108 {
109   g_object_unref (data->kdbus);
110   if (data->cancellable != NULL)
111     g_object_unref (data->cancellable);
112   g_object_unref (data->simple);
113   g_free (data);
114 }
115
116 static gboolean
117 _g_kdbus_read_ready (GKdbus        *kdbus,
118                      GIOCondition   condition,
119                      gpointer       user_data)
120 {
121   ReadKdbusData *data = user_data;
122   GError *error = NULL;
123   gssize result;
124
125   result = _g_kdbus_receive (data->kdbus,
126                              data->cancellable,
127                              &error);
128
129   if (result >= 0)
130     {
131       g_simple_async_result_set_op_res_gssize (data->simple, result);
132     }
133   else
134     {
135       g_assert (error != NULL);
136       g_simple_async_result_take_error (data->simple, error);
137     }
138
139   if (data->from_mainloop)
140     g_simple_async_result_complete (data->simple);
141   else
142     g_simple_async_result_complete_in_idle (data->simple);
143
144   return FALSE;
145 }
146
147 static void
148 _g_kdbus_read (GKdbus               *kdbus,
149                GCancellable         *cancellable,
150                GAsyncReadyCallback   callback,
151                gpointer              user_data)
152 {
153   ReadKdbusData *data;
154   GSource *source;
155
156   data = g_new0 (ReadKdbusData, 1);
157   data->kdbus = g_object_ref (kdbus);
158   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
159
160   data->simple = g_simple_async_result_new (G_OBJECT (kdbus),
161                                             callback,
162                                             user_data,
163                                             _g_kdbus_read);
164   g_simple_async_result_set_check_cancellable (data->simple, cancellable);
165
166   data->from_mainloop = TRUE;
167   source = _g_kdbus_create_source (data->kdbus,
168                                    G_IO_IN,
169                                    cancellable);
170   g_source_set_callback (source,
171                          (GSourceFunc) _g_kdbus_read_ready,
172                          data,
173                          (GDestroyNotify) read_kdbus_data_free);
174   g_source_attach (source, g_main_context_get_thread_default ());
175   g_source_unref (source);
176 }
177
178 static gssize
179 _g_kdbus_read_finish (GKdbus        *kdbus,
180                       GAsyncResult  *result,
181                       GError       **error)
182 {
183   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
184
185   g_return_val_if_fail (G_IS_KDBUS (kdbus), -1);
186   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read);
187
188   if (g_simple_async_result_propagate_error (simple, error))
189     return -1;
190   else
191     return g_simple_async_result_get_op_res_gssize (simple);
192 }
193
194 #endif /* defined (G_OS_UNIX) && (KDBUS_TRANSPORT) */
195
196 /* Unfortunately ancillary messages are discarded when reading from a
197  * socket using the GSocketInputStream abstraction. So we provide a
198  * very GInputStream-ish API that uses GSocket in this case (very
199  * similar to GSocketInputStream).
200  */
201
202 typedef struct
203 {
204   GSocket *socket;
205   GCancellable *cancellable;
206
207   void *buffer;
208   gsize count;
209
210   GSocketControlMessage ***messages;
211   gint *num_messages;
212
213   GSimpleAsyncResult *simple;
214
215   gboolean from_mainloop;
216 } ReadWithControlData;
217
218 static void
219 read_with_control_data_free (ReadWithControlData *data)
220 {
221   g_object_unref (data->socket);
222   if (data->cancellable != NULL)
223     g_object_unref (data->cancellable);
224   g_object_unref (data->simple);
225   g_free (data);
226 }
227
228 static gboolean
229 _g_socket_read_with_control_messages_ready (GSocket      *socket,
230                                             GIOCondition  condition,
231                                             gpointer      user_data)
232 {
233   ReadWithControlData *data = user_data;
234   GError *error;
235   gssize result;
236   GInputVector vector;
237
238   error = NULL;
239   vector.buffer = data->buffer;
240   vector.size = data->count;
241   result = g_socket_receive_message (data->socket,
242                                      NULL, /* address */
243                                      &vector,
244                                      1,
245                                      data->messages,
246                                      data->num_messages,
247                                      NULL,
248                                      data->cancellable,
249                                      &error);
250   if (result >= 0)
251     {
252       g_simple_async_result_set_op_res_gssize (data->simple, result);
253     }
254   else
255     {
256       g_assert (error != NULL);
257       g_simple_async_result_take_error (data->simple, error);
258     }
259
260   if (data->from_mainloop)
261     g_simple_async_result_complete (data->simple);
262   else
263     g_simple_async_result_complete_in_idle (data->simple);
264
265   return FALSE;
266 }
267
268 static void
269 _g_socket_read_with_control_messages (GSocket                 *socket,
270                                       void                    *buffer,
271                                       gsize                    count,
272                                       GSocketControlMessage ***messages,
273                                       gint                    *num_messages,
274                                       gint                     io_priority,
275                                       GCancellable            *cancellable,
276                                       GAsyncReadyCallback      callback,
277                                       gpointer                 user_data)
278 {
279   ReadWithControlData *data;
280
281   data = g_new0 (ReadWithControlData, 1);
282   data->socket = g_object_ref (socket);
283   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
284   data->buffer = buffer;
285   data->count = count;
286   data->messages = messages;
287   data->num_messages = num_messages;
288
289   data->simple = g_simple_async_result_new (G_OBJECT (socket),
290                                             callback,
291                                             user_data,
292                                             _g_socket_read_with_control_messages);
293   g_simple_async_result_set_check_cancellable (data->simple, cancellable);
294
295   if (!g_socket_condition_check (socket, G_IO_IN))
296     {
297       GSource *source;
298       data->from_mainloop = TRUE;
299       source = g_socket_create_source (data->socket,
300                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
301                                        cancellable);
302       g_source_set_callback (source,
303                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
304                              data,
305                              (GDestroyNotify) read_with_control_data_free);
306       g_source_attach (source, g_main_context_get_thread_default ());
307       g_source_unref (source);
308     }
309   else
310     {
311       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
312       read_with_control_data_free (data);
313     }
314 }
315
316 static gssize
317 _g_socket_read_with_control_messages_finish (GSocket       *socket,
318                                              GAsyncResult  *result,
319                                              GError       **error)
320 {
321   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
322
323   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
324   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
325
326   if (g_simple_async_result_propagate_error (simple, error))
327       return -1;
328   else
329     return g_simple_async_result_get_op_res_gssize (simple);
330 }
331
332 /* ---------------------------------------------------------------------------------------------------- */
333
334 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
335
336 static GPtrArray *ensured_classes = NULL;
337
338 static void
339 ensure_type (GType gtype)
340 {
341   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
342 }
343
344 static void
345 release_required_types (void)
346 {
347   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
348   g_ptr_array_unref (ensured_classes);
349   ensured_classes = NULL;
350 }
351
352 static void
353 ensure_required_types (void)
354 {
355   g_assert (ensured_classes == NULL);
356   ensured_classes = g_ptr_array_new ();
357   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
358   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
359 }
360 /* ---------------------------------------------------------------------------------------------------- */
361
362 typedef struct
363 {
364   volatile gint refcount;
365   GThread *thread;
366   GMainContext *context;
367   GMainLoop *loop;
368 } SharedThreadData;
369
370 static gpointer
371 gdbus_shared_thread_func (gpointer user_data)
372 {
373   SharedThreadData *data = user_data;
374
375   g_main_context_push_thread_default (data->context);
376   g_main_loop_run (data->loop);
377   g_main_context_pop_thread_default (data->context);
378
379   release_required_types ();
380
381   return NULL;
382 }
383
384 /* ---------------------------------------------------------------------------------------------------- */
385
386 static SharedThreadData *
387 _g_dbus_shared_thread_ref (void)
388 {
389   static gsize shared_thread_data = 0;
390   SharedThreadData *ret;
391
392   if (g_once_init_enter (&shared_thread_data))
393     {
394       SharedThreadData *data;
395
396       /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
397       ensure_required_types ();
398
399       data = g_new0 (SharedThreadData, 1);
400       data->refcount = 0;
401       
402       data->context = g_main_context_new ();
403       data->loop = g_main_loop_new (data->context, FALSE);
404       data->thread = g_thread_new ("gdbus",
405                                    gdbus_shared_thread_func,
406                                    data);
407       /* We can cast between gsize and gpointer safely */
408       g_once_init_leave (&shared_thread_data, (gsize) data);
409     }
410
411   ret = (SharedThreadData*) shared_thread_data;
412   g_atomic_int_inc (&ret->refcount);
413   return ret;
414 }
415
416 static void
417 _g_dbus_shared_thread_unref (SharedThreadData *data)
418 {
419   /* TODO: actually destroy the shared thread here */
420 #if 0
421   g_assert (data != NULL);
422   if (g_atomic_int_dec_and_test (&data->refcount))
423     {
424       g_main_loop_quit (data->loop);
425       //g_thread_join (data->thread);
426       g_main_loop_unref (data->loop);
427       g_main_context_unref (data->context);
428     }
429 #endif
430 }
431
432 /* ---------------------------------------------------------------------------------------------------- */
433
434 typedef enum {
435     PENDING_NONE = 0,
436     PENDING_WRITE,
437     PENDING_FLUSH,
438     PENDING_CLOSE
439 } OutputPending;
440
441 struct GDBusWorker
442 {
443   volatile gint                       ref_count;
444
445   SharedThreadData                   *shared_thread_data;
446
447   /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
448   volatile gint                       stopped;
449
450   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
451    * only affects messages received from the other peer (since GDBusServer is the
452    * only user) - we might want it to affect messages sent to the other peer too?
453    */
454   gboolean                            frozen;
455   GDBusCapabilityFlags                capabilities;
456   GQueue                             *received_messages_while_frozen;
457
458   GIOStream                          *stream;
459   GCancellable                       *cancellable;
460   GDBusWorkerMessageReceivedCallback  message_received_callback;
461   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
462   GDBusWorkerDisconnectedCallback     disconnected_callback;
463   gpointer                            user_data;
464
465   /* if GSocket and GKdbus are NULL, stream is GSocketConnection */
466   GSocket *socket;
467 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
468   GKdbus  *kdbus;
469 #endif
470
471   /* used for reading */
472   GMutex                              read_lock;
473   gchar                              *read_buffer;
474   gsize                               read_buffer_allocated_size;
475   gsize                               read_buffer_cur_size;
476   gsize                               read_buffer_bytes_wanted;
477   GUnixFDList                        *read_fd_list;
478   GSocketControlMessage             **read_ancillary_messages;
479   gint                                read_num_ancillary_messages;
480 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
481   GSList                             *read_kdbus_msg_items;
482 #endif
483
484
485   /* Whether an async write, flush or close, or none of those, is pending.
486    * Only the worker thread may change its value, and only with the write_lock.
487    * Other threads may read its value when holding the write_lock.
488    * The worker thread may read its value at any time.
489    */
490   OutputPending                       output_pending;
491   /* used for writing */
492   GMutex                              write_lock;
493   /* queue of MessageToWriteData, protected by write_lock */
494   GQueue                             *write_queue;
495   /* protected by write_lock */
496   guint64                             write_num_messages_written;
497   /* number of messages we'd written out last time we flushed;
498    * protected by write_lock
499    */
500   guint64                             write_num_messages_flushed;
501   /* list of FlushData, protected by write_lock */
502   GList                              *write_pending_flushes;
503   /* list of CloseData, protected by write_lock */
504   GList                              *pending_close_attempts;
505   /* no lock - only used from the worker thread */
506   gboolean                            close_expected;
507 };
508
509 static void _g_dbus_worker_unref (GDBusWorker *worker);
510
511 /* ---------------------------------------------------------------------------------------------------- */
512
513 typedef struct
514 {
515   GMutex  mutex;
516   GCond   cond;
517   guint64 number_to_wait_for;
518   GError *error;
519 } FlushData;
520
521 struct _MessageToWriteData ;
522 typedef struct _MessageToWriteData MessageToWriteData;
523
524 static void message_to_write_data_free (MessageToWriteData *data);
525
526 static void read_message_print_transport_debug (gssize bytes_read,
527                                                 GDBusWorker *worker);
528
529 static void write_message_print_transport_debug (gssize bytes_written,
530                                                  MessageToWriteData *data);
531
532 typedef struct {
533     GDBusWorker *worker;
534     GCancellable *cancellable;
535     GSimpleAsyncResult *result;
536 } CloseData;
537
538 static void close_data_free (CloseData *close_data)
539 {
540   if (close_data->cancellable != NULL)
541     g_object_unref (close_data->cancellable);
542
543   if (close_data->result != NULL)
544     g_object_unref (close_data->result);
545
546   _g_dbus_worker_unref (close_data->worker);
547   g_slice_free (CloseData, close_data);
548 }
549
550 /* ---------------------------------------------------------------------------------------------------- */
551
552 static GDBusWorker *
553 _g_dbus_worker_ref (GDBusWorker *worker)
554 {
555   g_atomic_int_inc (&worker->ref_count);
556   return worker;
557 }
558
559 static void
560 _g_dbus_worker_unref (GDBusWorker *worker)
561 {
562   if (g_atomic_int_dec_and_test (&worker->ref_count))
563     {
564       g_assert (worker->write_pending_flushes == NULL);
565
566       _g_dbus_shared_thread_unref (worker->shared_thread_data);
567
568       g_object_unref (worker->stream);
569
570       g_mutex_clear (&worker->read_lock);
571       g_object_unref (worker->cancellable);
572       if (worker->read_fd_list != NULL)
573         g_object_unref (worker->read_fd_list);
574
575       g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
576       g_mutex_clear (&worker->write_lock);
577       g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
578       g_free (worker->read_buffer);
579
580       g_free (worker);
581     }
582 }
583
584 static void
585 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
586                                   gboolean      remote_peer_vanished,
587                                   GError       *error)
588 {
589   if (!g_atomic_int_get (&worker->stopped))
590     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
591 }
592
593 static void
594 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
595                                       GDBusMessage *message)
596 {
597   if (!g_atomic_int_get (&worker->stopped))
598     worker->message_received_callback (worker, message, worker->user_data);
599 }
600
601 static GDBusMessage *
602 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
603                                               GDBusMessage *message)
604 {
605   GDBusMessage *ret;
606   if (!g_atomic_int_get (&worker->stopped))
607     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
608   else
609     ret = message;
610   return ret;
611 }
612
613 /* can only be called from private thread with read-lock held - takes ownership of @message */
614 static void
615 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
616                                                   GDBusMessage *message)
617 {
618   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
619     {
620       /* queue up */
621       g_queue_push_tail (worker->received_messages_while_frozen, message);
622     }
623   else
624     {
625       /* not frozen, nor anything in queue */
626       _g_dbus_worker_emit_message_received (worker, message);
627       g_object_unref (message);
628     }
629 }
630
631 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
632 static gboolean
633 unfreeze_in_idle_cb (gpointer user_data)
634 {
635   GDBusWorker *worker = user_data;
636   GDBusMessage *message;
637
638   g_mutex_lock (&worker->read_lock);
639   if (worker->frozen)
640     {
641       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
642         {
643           _g_dbus_worker_emit_message_received (worker, message);
644           g_object_unref (message);
645         }
646       worker->frozen = FALSE;
647     }
648   else
649     {
650       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
651     }
652   g_mutex_unlock (&worker->read_lock);
653   return FALSE;
654 }
655
656 /* can be called from any thread */
657 void
658 _g_dbus_worker_unfreeze (GDBusWorker *worker)
659 {
660   GSource *idle_source;
661   idle_source = g_idle_source_new ();
662   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
663   g_source_set_callback (idle_source,
664                          unfreeze_in_idle_cb,
665                          _g_dbus_worker_ref (worker),
666                          (GDestroyNotify) _g_dbus_worker_unref);
667   g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb");
668   g_source_attach (idle_source, worker->shared_thread_data->context);
669   g_source_unref (idle_source);
670 }
671
672 /* ---------------------------------------------------------------------------------------------------- */
673
674 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
675
676 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
677 static void
678 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
679                            GAsyncResult  *res,
680                            gpointer       user_data)
681 {
682   GDBusWorker *worker = user_data;
683   GError *error;
684   gssize bytes_read;
685
686   g_mutex_lock (&worker->read_lock);
687
688   /* If already stopped, don't even process the reply */
689   if (g_atomic_int_get (&worker->stopped))
690     goto out;
691
692   error = NULL;
693   bytes_read = 0;
694
695   if (FALSE)
696     {
697     }
698 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
699   else if (G_IS_KDBUS_CONNECTION (worker->stream))
700     {
701       bytes_read = _g_kdbus_read_finish (worker->kdbus,
702                                          res,
703                                          &error);
704
705       /* [KDBUS]  Get all received items*/
706       worker->read_kdbus_msg_items = _g_kdbus_get_last_msg_items (worker->kdbus);
707
708       /* [KDBUS] Attach fds (if any) to worker->read_fd_list */
709       _g_kdbus_attach_fds_to_msg (worker->kdbus, &worker->read_fd_list);
710
711       /* [KDBUS] For KDBUS transport we read whole message at once*/
712       worker->read_buffer_bytes_wanted = bytes_read;
713     }
714 #endif
715   else if (worker->socket == NULL)
716     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
717                                              res,
718                                              &error);
719   else
720     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
721                                                               res,
722                                                               &error);
723   if (worker->read_num_ancillary_messages > 0)
724     {
725       gint n;
726       for (n = 0; n < worker->read_num_ancillary_messages; n++)
727         {
728           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
729
730           if (FALSE)
731             {
732             }
733 #ifdef G_OS_UNIX
734           else if (G_IS_UNIX_FD_MESSAGE (control_message))
735             {
736               GUnixFDMessage *fd_message;
737               gint *fds;
738               gint num_fds;
739
740               fd_message = G_UNIX_FD_MESSAGE (control_message);
741               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
742               if (worker->read_fd_list == NULL)
743                 {
744                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
745                 }
746               else
747                 {
748                   gint n;
749                   for (n = 0; n < num_fds; n++)
750                     {
751                       /* TODO: really want a append_steal() */
752                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
753                       (void) g_close (fds[n], NULL);
754                     }
755                 }
756               g_free (fds);
757             }
758           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
759             {
760               /* do nothing */
761             }
762 #endif
763           else
764             {
765               if (error == NULL)
766                 {
767                   g_set_error (&error,
768                                G_IO_ERROR,
769                                G_IO_ERROR_FAILED,
770                                "Unexpected ancillary message of type %s received from peer",
771                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
772                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
773                   g_error_free (error);
774                   g_object_unref (control_message);
775                   n++;
776                   while (n < worker->read_num_ancillary_messages)
777                     g_object_unref (worker->read_ancillary_messages[n++]);
778                   g_free (worker->read_ancillary_messages);
779                   goto out;
780                 }
781             }
782           g_object_unref (control_message);
783         }
784       g_free (worker->read_ancillary_messages);
785     }
786
787   if (bytes_read == -1)
788     {
789       if (G_UNLIKELY (_g_dbus_debug_transport ()))
790         {
791           _g_dbus_debug_print_lock ();
792           g_print ("========================================================================\n"
793                    "GDBus-debug:Transport:\n"
794                    "  ---- READ ERROR:\n"
795                    "  ---- %s %d: %s\n",
796                    g_quark_to_string (error->domain), error->code,
797                    error->message);
798           _g_dbus_debug_print_unlock ();
799         }
800
801       /* Every async read that uses this callback uses worker->cancellable
802        * as its GCancellable. worker->cancellable gets cancelled if and only
803        * if the GDBusConnection tells us to close (either via
804        * _g_dbus_worker_stop, which is called on last-unref, or directly),
805        * so a cancelled read must mean our connection was closed locally.
806        *
807        * If we're closing, other errors are possible - notably,
808        * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
809        * read in-flight. It seems sensible to treat all read errors during
810        * closing as an expected thing that doesn't trip exit-on-close.
811        *
812        * Because close_expected can't be set until we get into the worker
813        * thread, but the cancellable is signalled sooner (from another
814        * thread), we do still need to check the error.
815        */
816       if (worker->close_expected ||
817           g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
818         _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
819       else
820         _g_dbus_worker_emit_disconnected (worker, TRUE, error);
821
822       g_error_free (error);
823       goto out;
824     }
825
826 #if 0
827   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
828            (gint) bytes_read,
829            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
830            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
831            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
832                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
833            worker->stream,
834            worker);
835 #endif
836
837   /* TODO: hmm, hmm... */
838   if (bytes_read == 0)
839     {
840       g_set_error (&error,
841                    G_IO_ERROR,
842                    G_IO_ERROR_FAILED,
843                    "Underlying GIOStream returned 0 bytes on an async read");
844       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
845       g_error_free (error);
846       goto out;
847     }
848
849   /* [KDBUS] don't print transport dbus debug for kdbus connection */
850   if (!G_IS_KDBUS_CONNECTION (worker->stream))
851     read_message_print_transport_debug (bytes_read, worker);
852
853   worker->read_buffer_cur_size += bytes_read;
854   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
855     {
856       /* OK, got what we asked for! */
857       if (worker->read_buffer_bytes_wanted == 16)
858         {
859           gssize message_len;
860           /* OK, got the header - determine how many more bytes are needed */
861           error = NULL;
862           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
863                                                      16,
864                                                      &error);
865           if (message_len == -1)
866             {
867               g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
868               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
869               g_error_free (error);
870               goto out;
871             }
872
873           worker->read_buffer_bytes_wanted = message_len;
874           _g_dbus_worker_do_read_unlocked (worker);
875         }
876       else
877         {
878           GDBusMessage *message;
879           error = NULL;
880
881           /* TODO: use connection->priv->auth to decode the message */
882
883           if (FALSE)
884             {
885             }
886 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
887           else if (G_IS_KDBUS_CONNECTION (worker->stream))
888             {
889               GDBusMessageType message_type;
890               gchar *sender;
891               gchar *destination;
892
893               message = _g_dbus_message_new_from_kdbus_items (worker->read_kdbus_msg_items,
894                                                               &error);
895
896               /* [KDBUS] override informations from the user header with kernel msg header */
897               sender = _g_kdbus_get_last_msg_sender (worker->kdbus);
898               g_dbus_message_set_sender (message, sender);
899
900               message_type = g_dbus_message_get_message_type (message);
901               if (message_type == G_DBUS_MESSAGE_TYPE_SIGNAL)
902                 {
903                   destination = _g_kdbus_get_last_msg_destination (worker->kdbus);
904                   g_dbus_message_set_destination (message, destination);
905                 }
906
907               if (message == NULL)
908                 {
909                    g_warning ("Error decoding D-Bus (kdbus) message\n");
910                    g_error_free (error);
911                    goto out;
912                 }
913             }
914 #endif
915           else
916             {
917               message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
918                                                       worker->read_buffer_cur_size,
919                                                       worker->capabilities,
920                                                       &error);
921
922               if (message == NULL)
923                 {
924                   gchar *s;
925                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
926                   g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
927                              "The error is: %s\n"
928                              "The payload is as follows:\n"
929                              "%s\n",
930                              worker->read_buffer_cur_size,
931                              error->message,
932                              s);
933                   g_free (s);
934                   _g_dbus_worker_emit_disconnected (worker, FALSE, error);
935                   g_error_free (error);
936                   goto out;
937                 }
938             }
939
940 #ifdef G_OS_UNIX
941           if (worker->read_fd_list != NULL)
942             {
943               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
944               g_object_unref (worker->read_fd_list);
945               worker->read_fd_list = NULL;
946             }
947 #endif
948
949           if (G_UNLIKELY (_g_dbus_debug_message ()))
950             {
951               gchar *s;
952               _g_dbus_debug_print_lock ();
953               g_print ("========================================================================\n"
954                        "GDBus-debug:Message:\n"
955                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
956                        worker->read_buffer_cur_size);
957               s = g_dbus_message_print (message, 2);
958               g_print ("%s", s);
959               g_free (s);
960               if (G_UNLIKELY (_g_dbus_debug_payload ()))
961                 {
962                   if (FALSE)
963                     {
964                     }
965 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
966                   else if (G_IS_KDBUS_CONNECTION (worker->stream))
967                     s = _g_kdbus_hexdump_all_items (worker->read_kdbus_msg_items);
968 #endif
969                   else
970                     s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
971                   g_print ("%s\n", s);
972                   g_free (s);
973                 }
974               _g_dbus_debug_print_unlock ();
975             }
976
977           /* yay, got a message, go deliver it */
978           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
979
980           /* start reading another message! */
981           worker->read_buffer_bytes_wanted = 0;
982           worker->read_buffer_cur_size = 0;
983           _g_dbus_worker_do_read_unlocked (worker);
984         }
985     }
986   else
987     {
988       /* didn't get all the bytes we requested - so repeat the request... */
989       _g_dbus_worker_do_read_unlocked (worker);
990     }
991
992  out:
993
994 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
995   /* [KDBUS] release memory occupied by kdbus message */
996   if (G_IS_KDBUS_CONNECTION (worker->stream))
997     {
998       if (!_g_kdbus_is_closed (worker->kdbus))
999         {
1000           _g_kdbus_release_kmsg (worker->kdbus);
1001           worker->read_kdbus_msg_items = NULL;
1002         }
1003       worker->read_buffer = NULL;
1004     }
1005 #endif
1006
1007   g_mutex_unlock (&worker->read_lock);
1008
1009   /* gives up the reference acquired when calling g_input_stream_read_async() */
1010   _g_dbus_worker_unref (worker);
1011 }
1012
1013 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
1014 static void
1015 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
1016 {
1017   /* Note that we do need to keep trying to read even if close_expected is
1018    * true, because only failing a read causes us to signal 'closed'.
1019    */
1020
1021   /* [KDBUS]
1022    * For KDBUS transport we don't  have to alloc buffer (worker->read_buffer)
1023    * instead of it we use kdbus memory pool. On connection stage KDBUS client
1024    * have to register a memory pool, large enough to  carry all backlog of
1025    * data enqueued for the connection.
1026    */
1027
1028 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
1029   if (G_IS_KDBUS_CONNECTION (worker->stream))
1030     {
1031       _g_kdbus_read(worker->kdbus,
1032                     worker->cancellable,
1033                     (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
1034                     _g_dbus_worker_ref (worker));
1035       return;
1036     }
1037 #endif
1038
1039   /* if bytes_wanted is zero, it means start reading a message */
1040   if (worker->read_buffer_bytes_wanted == 0)
1041     {
1042       worker->read_buffer_cur_size = 0;
1043       worker->read_buffer_bytes_wanted = 16;
1044     }
1045
1046   /* ensure we have a (big enough) buffer */
1047   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
1048     {
1049       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
1050       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
1051       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
1052     }
1053
1054   if (worker->socket == NULL)
1055     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
1056                                worker->read_buffer + worker->read_buffer_cur_size,
1057                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
1058                                G_PRIORITY_DEFAULT,
1059                                worker->cancellable,
1060                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
1061                                _g_dbus_worker_ref (worker));
1062   else
1063     {
1064       worker->read_ancillary_messages = NULL;
1065       worker->read_num_ancillary_messages = 0;
1066       _g_socket_read_with_control_messages (worker->socket,
1067                                             worker->read_buffer + worker->read_buffer_cur_size,
1068                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
1069                                             &worker->read_ancillary_messages,
1070                                             &worker->read_num_ancillary_messages,
1071                                             G_PRIORITY_DEFAULT,
1072                                             worker->cancellable,
1073                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
1074                                             _g_dbus_worker_ref (worker));
1075     }
1076 }
1077
1078 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
1079 static gboolean
1080 _g_dbus_worker_do_initial_read (gpointer data)
1081 {
1082   GDBusWorker *worker = data;
1083   g_mutex_lock (&worker->read_lock);
1084   _g_dbus_worker_do_read_unlocked (worker);
1085   g_mutex_unlock (&worker->read_lock);
1086   return FALSE;
1087 }
1088
1089 /* ---------------------------------------------------------------------------------------------------- */
1090
1091 struct _MessageToWriteData
1092 {
1093   GDBusWorker  *worker;
1094   GDBusMessage *message;
1095   gchar        *blob;
1096   gsize         blob_size;
1097
1098   gsize               total_written;
1099   GSimpleAsyncResult *simple;
1100
1101 };
1102
1103 static void
1104 message_to_write_data_free (MessageToWriteData *data)
1105 {
1106   _g_dbus_worker_unref (data->worker);
1107   if (data->message)
1108     g_object_unref (data->message);
1109   g_free (data->blob);
1110   g_free (data);
1111 }
1112
1113 /* ---------------------------------------------------------------------------------------------------- */
1114
1115 static void write_message_continue_writing (MessageToWriteData *data);
1116
1117 /* called in private thread shared by all GDBusConnection instances
1118  *
1119  * write-lock is not held on entry
1120  * output_pending is PENDING_WRITE on entry
1121  */
1122 static void
1123 write_message_async_cb (GObject      *source_object,
1124                         GAsyncResult *res,
1125                         gpointer      user_data)
1126 {
1127   MessageToWriteData *data = user_data;
1128   GSimpleAsyncResult *simple;
1129   gssize bytes_written;
1130   GError *error;
1131
1132   /* Note: we can't access data->simple after calling g_async_result_complete () because the
1133    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
1134    */
1135   simple = data->simple;
1136
1137   error = NULL;
1138   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
1139                                                 res,
1140                                                 &error);
1141   if (bytes_written == -1)
1142     {
1143       g_simple_async_result_take_error (simple, error);
1144       g_simple_async_result_complete (simple);
1145       g_object_unref (simple);
1146       goto out;
1147     }
1148   g_assert (bytes_written > 0); /* zero is never returned */
1149
1150   write_message_print_transport_debug (bytes_written, data);
1151
1152   data->total_written += bytes_written;
1153   g_assert (data->total_written <= data->blob_size);
1154   if (data->total_written == data->blob_size)
1155     {
1156       g_simple_async_result_complete (simple);
1157       g_object_unref (simple);
1158       goto out;
1159     }
1160
1161   write_message_continue_writing (data);
1162
1163  out:
1164   ;
1165 }
1166
1167 /* called in private thread shared by all GDBusConnection instances
1168  *
1169  * write-lock is not held on entry
1170  * output_pending is PENDING_WRITE on entry
1171  */
1172 #ifdef G_OS_UNIX
1173 static gboolean
1174 on_socket_ready (GSocket      *socket,
1175                  GIOCondition  condition,
1176                  gpointer      user_data)
1177 {
1178   MessageToWriteData *data = user_data;
1179   write_message_continue_writing (data);
1180   return FALSE; /* remove source */
1181 }
1182 #endif
1183
1184 /* called in private thread shared by all GDBusConnection instances
1185  *
1186  * write-lock is not held on entry
1187  * output_pending is PENDING_WRITE on entry
1188  */
1189 static void
1190 write_message_continue_writing (MessageToWriteData *data)
1191 {
1192   GOutputStream *ostream;
1193
1194 #ifdef G_OS_UNIX
1195   GSimpleAsyncResult *simple;
1196   GUnixFDList *fd_list;
1197
1198   /* Note: we can't access data->simple after calling g_async_result_complete () because the
1199    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
1200    */
1201   simple = data->simple;
1202
1203   fd_list = g_dbus_message_get_unix_fd_list (data->message);
1204
1205 #ifdef KDBUS_TRANSPORT
1206   if (G_IS_KDBUS_CONNECTION (data->worker->stream))
1207     {
1208       GError *error;
1209       error = NULL;
1210       data->total_written = _g_kdbus_send (data->worker,
1211                                            data->worker->kdbus,
1212                                            data->message,
1213                                            fd_list,
1214                                            data->worker->cancellable,
1215                                            &error);
1216
1217       g_simple_async_result_complete (simple);
1218       g_object_unref (simple);
1219       goto out;
1220     }
1221 #endif /* KDBUS_TRANSPORT */
1222
1223 #endif /* G_OS_UNIX */
1224
1225   ostream = g_io_stream_get_output_stream (data->worker->stream);
1226
1227   g_assert (!g_output_stream_has_pending (ostream));
1228   g_assert_cmpint (data->total_written, <, data->blob_size);
1229
1230   if (FALSE)
1231     {
1232     }
1233 #ifdef G_OS_UNIX
1234   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
1235     {
1236       GOutputVector vector;
1237       GSocketControlMessage *control_message;
1238       gssize bytes_written;
1239       GError *error;
1240
1241       vector.buffer = data->blob;
1242       vector.size = data->blob_size;
1243
1244       control_message = NULL;
1245       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
1246         {
1247           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
1248             {
1249               g_simple_async_result_set_error (simple,
1250                                                G_IO_ERROR,
1251                                                G_IO_ERROR_FAILED,
1252                                                "Tried sending a file descriptor but remote peer does not support this capability");
1253               g_simple_async_result_complete (simple);
1254               g_object_unref (simple);
1255               goto out;
1256             }
1257           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
1258         }
1259
1260       error = NULL;
1261       bytes_written = g_socket_send_message (data->worker->socket,
1262                                              NULL, /* address */
1263                                              &vector,
1264                                              1,
1265                                              control_message != NULL ? &control_message : NULL,
1266                                              control_message != NULL ? 1 : 0,
1267                                              G_SOCKET_MSG_NONE,
1268                                              data->worker->cancellable,
1269                                              &error);
1270       if (control_message != NULL)
1271         g_object_unref (control_message);
1272
1273       if (bytes_written == -1)
1274         {
1275           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1276           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1277             {
1278               GSource *source;
1279               source = g_socket_create_source (data->worker->socket,
1280                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
1281                                                data->worker->cancellable);
1282               g_source_set_callback (source,
1283                                      (GSourceFunc) on_socket_ready,
1284                                      data,
1285                                      NULL); /* GDestroyNotify */
1286               g_source_attach (source, g_main_context_get_thread_default ());
1287               g_source_unref (source);
1288               g_error_free (error);
1289               goto out;
1290             }
1291           g_simple_async_result_take_error (simple, error);
1292           g_simple_async_result_complete (simple);
1293           g_object_unref (simple);
1294           goto out;
1295         }
1296       g_assert (bytes_written > 0); /* zero is never returned */
1297
1298       write_message_print_transport_debug (bytes_written, data);
1299
1300       data->total_written += bytes_written;
1301       g_assert (data->total_written <= data->blob_size);
1302       if (data->total_written == data->blob_size)
1303         {
1304           g_simple_async_result_complete (simple);
1305           g_object_unref (simple);
1306           goto out;
1307         }
1308
1309       write_message_continue_writing (data);
1310     }
1311 #endif
1312   else
1313     {
1314 #ifdef G_OS_UNIX
1315       if (fd_list != NULL)
1316         {
1317           g_simple_async_result_set_error (simple,
1318                                            G_IO_ERROR,
1319                                            G_IO_ERROR_FAILED,
1320                                            "Tried sending a file descriptor on unsupported stream of type %s",
1321                                            g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1322           g_simple_async_result_complete (simple);
1323           g_object_unref (simple);
1324           goto out;
1325         }
1326 #endif
1327
1328       g_output_stream_write_async (ostream,
1329                                    (const gchar *) data->blob + data->total_written,
1330                                    data->blob_size - data->total_written,
1331                                    G_PRIORITY_DEFAULT,
1332                                    data->worker->cancellable,
1333                                    write_message_async_cb,
1334                                    data);
1335     }
1336 #ifdef G_OS_UNIX
1337  out:
1338 #endif
1339   ;
1340 }
1341
1342 /* called in private thread shared by all GDBusConnection instances
1343  *
1344  * write-lock is not held on entry
1345  * output_pending is PENDING_WRITE on entry
1346  */
1347 static void
1348 write_message_async (GDBusWorker         *worker,
1349                      MessageToWriteData  *data,
1350                      GAsyncReadyCallback  callback,
1351                      gpointer             user_data)
1352 {
1353   data->simple = g_simple_async_result_new (NULL,
1354                                             callback,
1355                                             user_data,
1356                                             write_message_async);
1357   data->total_written = 0;
1358   write_message_continue_writing (data);
1359 }
1360
1361 /* called in private thread shared by all GDBusConnection instances (with write-lock held) */
1362 static gboolean
1363 write_message_finish (GAsyncResult   *res,
1364                       GError        **error)
1365 {
1366   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1367   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1368     return FALSE;
1369   else
1370     return TRUE;
1371 }
1372 /* ---------------------------------------------------------------------------------------------------- */
1373
1374 static void continue_writing (GDBusWorker *worker);
1375
1376 typedef struct
1377 {
1378   GDBusWorker *worker;
1379   GList *flushers;
1380 } FlushAsyncData;
1381
1382 static void
1383 flush_data_list_complete (const GList  *flushers,
1384                           const GError *error)
1385 {
1386   const GList *l;
1387
1388   for (l = flushers; l != NULL; l = l->next)
1389     {
1390       FlushData *f = l->data;
1391
1392       f->error = error != NULL ? g_error_copy (error) : NULL;
1393
1394       g_mutex_lock (&f->mutex);
1395       g_cond_signal (&f->cond);
1396       g_mutex_unlock (&f->mutex);
1397     }
1398 }
1399
1400 /* called in private thread shared by all GDBusConnection instances
1401  *
1402  * write-lock is not held on entry
1403  * output_pending is PENDING_FLUSH on entry
1404  */
1405 static void
1406 ostream_flush_cb (GObject      *source_object,
1407                   GAsyncResult *res,
1408                   gpointer      user_data)
1409 {
1410   FlushAsyncData *data = user_data;
1411   GError *error;
1412
1413   error = NULL;
1414   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1415                                 res,
1416                                 &error);
1417
1418   if (error == NULL)
1419     {
1420       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1421         {
1422           _g_dbus_debug_print_lock ();
1423           g_print ("========================================================================\n"
1424                    "GDBus-debug:Transport:\n"
1425                    "  ---- FLUSHED stream of type %s\n",
1426                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1427           _g_dbus_debug_print_unlock ();
1428         }
1429     }
1430
1431   g_assert (data->flushers != NULL);
1432   flush_data_list_complete (data->flushers, error);
1433   g_list_free (data->flushers);
1434
1435   if (error != NULL)
1436     g_error_free (error);
1437
1438   /* Make sure we tell folks that we don't have additional
1439      flushes pending */
1440   g_mutex_lock (&data->worker->write_lock);
1441   data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
1442   g_assert (data->worker->output_pending == PENDING_FLUSH);
1443   data->worker->output_pending = PENDING_NONE;
1444   g_mutex_unlock (&data->worker->write_lock);
1445
1446   /* OK, cool, finally kick off the next write */
1447   continue_writing (data->worker);
1448
1449   _g_dbus_worker_unref (data->worker);
1450   g_free (data);
1451 }
1452
1453 /* called in private thread shared by all GDBusConnection instances
1454  *
1455  * write-lock is not held on entry
1456  * output_pending is PENDING_FLUSH on entry
1457  */
1458 static void
1459 start_flush (FlushAsyncData *data)
1460 {
1461   /*[KDBUS]: TODO: to investigate */
1462   if (G_IS_KDBUS_CONNECTION (data->worker->stream))
1463     {
1464       g_assert (data->flushers != NULL);
1465       flush_data_list_complete (data->flushers, NULL);
1466       g_list_free (data->flushers);
1467
1468       g_mutex_lock (&data->worker->write_lock);
1469       data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
1470       g_assert (data->worker->output_pending == PENDING_FLUSH);
1471       data->worker->output_pending = PENDING_NONE;
1472       g_mutex_unlock (&data->worker->write_lock);
1473
1474       /* OK, cool, finally kick off the next write */
1475       continue_writing (data->worker);
1476
1477       _g_dbus_worker_unref (data->worker);
1478       g_free (data);
1479     }
1480   else
1481     {
1482       g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
1483                                    G_PRIORITY_DEFAULT,
1484                                    data->worker->cancellable,
1485                                    ostream_flush_cb,
1486                                    data);
1487     }
1488 }
1489
1490 /* called in private thread shared by all GDBusConnection instances
1491  *
1492  * write-lock is held on entry
1493  * output_pending is PENDING_NONE on entry
1494  */
1495 static void
1496 message_written_unlocked (GDBusWorker *worker,
1497                           MessageToWriteData *message_data)
1498 {
1499   if (G_UNLIKELY (_g_dbus_debug_message ()))
1500     {
1501       gchar *s;
1502       _g_dbus_debug_print_lock ();
1503       g_print ("========================================================================\n"
1504                "GDBus-debug:Message:\n"
1505                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1506                message_data->blob_size);
1507       s = g_dbus_message_print (message_data->message, 2);
1508       g_print ("%s", s);
1509       g_free (s);
1510       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1511         {
1512           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1513           g_print ("%s\n", s);
1514           g_free (s);
1515         }
1516       _g_dbus_debug_print_unlock ();
1517     }
1518
1519   worker->write_num_messages_written += 1;
1520 }
1521
1522 /* called in private thread shared by all GDBusConnection instances
1523  *
1524  * write-lock is held on entry
1525  * output_pending is PENDING_NONE on entry
1526  *
1527  * Returns: non-%NULL, setting @output_pending, if we need to flush now
1528  */
1529 static FlushAsyncData *
1530 prepare_flush_unlocked (GDBusWorker *worker)
1531 {
1532   GList *l;
1533   GList *ll;
1534   GList *flushers;
1535
1536   flushers = NULL;
1537   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1538     {
1539       FlushData *f = l->data;
1540       ll = l->next;
1541
1542       if (f->number_to_wait_for == worker->write_num_messages_written)
1543         {
1544           flushers = g_list_append (flushers, f);
1545           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1546         }
1547     }
1548   if (flushers != NULL)
1549     {
1550       g_assert (worker->output_pending == PENDING_NONE);
1551       worker->output_pending = PENDING_FLUSH;
1552     }
1553
1554   if (flushers != NULL)
1555     {
1556       FlushAsyncData *data;
1557
1558       data = g_new0 (FlushAsyncData, 1);
1559       data->worker = _g_dbus_worker_ref (worker);
1560       data->flushers = flushers;
1561       return data;
1562     }
1563
1564   return NULL;
1565 }
1566
1567 /* called in private thread shared by all GDBusConnection instances
1568  *
1569  * write-lock is not held on entry
1570  * output_pending is PENDING_WRITE on entry
1571  */
1572 static void
1573 write_message_cb (GObject       *source_object,
1574                   GAsyncResult  *res,
1575                   gpointer       user_data)
1576 {
1577   MessageToWriteData *data = user_data;
1578   GError *error;
1579
1580   g_mutex_lock (&data->worker->write_lock);
1581   g_assert (data->worker->output_pending == PENDING_WRITE);
1582   data->worker->output_pending = PENDING_NONE;
1583
1584   error = NULL;
1585   if (!write_message_finish (res, &error))
1586     {
1587       g_mutex_unlock (&data->worker->write_lock);
1588
1589       /* TODO: handle */
1590       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1591       g_error_free (error);
1592
1593       g_mutex_lock (&data->worker->write_lock);
1594     }
1595
1596   message_written_unlocked (data->worker, data);
1597
1598   g_mutex_unlock (&data->worker->write_lock);
1599
1600   continue_writing (data->worker);
1601
1602   message_to_write_data_free (data);
1603 }
1604
1605 /* called in private thread shared by all GDBusConnection instances
1606  *
1607  * write-lock is not held on entry
1608  * output_pending is PENDING_CLOSE on entry
1609  */
1610 static void
1611 iostream_close_cb (GObject      *source_object,
1612                    GAsyncResult *res,
1613                    gpointer      user_data)
1614 {
1615   GDBusWorker *worker = user_data;
1616   GError *error = NULL;
1617   GList *pending_close_attempts, *pending_flush_attempts;
1618   GQueue *send_queue;
1619
1620   g_io_stream_close_finish (worker->stream, res, &error);
1621
1622   g_mutex_lock (&worker->write_lock);
1623
1624   pending_close_attempts = worker->pending_close_attempts;
1625   worker->pending_close_attempts = NULL;
1626
1627   pending_flush_attempts = worker->write_pending_flushes;
1628   worker->write_pending_flushes = NULL;
1629
1630   send_queue = worker->write_queue;
1631   worker->write_queue = g_queue_new ();
1632
1633   g_assert (worker->output_pending == PENDING_CLOSE);
1634   worker->output_pending = PENDING_NONE;
1635
1636   g_mutex_unlock (&worker->write_lock);
1637
1638   while (pending_close_attempts != NULL)
1639     {
1640       CloseData *close_data = pending_close_attempts->data;
1641
1642       pending_close_attempts = g_list_delete_link (pending_close_attempts,
1643                                                    pending_close_attempts);
1644
1645       if (close_data->result != NULL)
1646         {
1647           if (error != NULL)
1648             g_simple_async_result_set_from_error (close_data->result, error);
1649
1650           /* this must be in an idle because the result is likely to be
1651            * intended for another thread
1652            */
1653           g_simple_async_result_complete_in_idle (close_data->result);
1654         }
1655
1656       close_data_free (close_data);
1657     }
1658
1659   g_clear_error (&error);
1660
1661   /* all messages queued for sending are discarded */
1662   g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
1663   /* all queued flushes fail */
1664   error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
1665                        _("Operation was cancelled"));
1666   flush_data_list_complete (pending_flush_attempts, error);
1667   g_list_free (pending_flush_attempts);
1668   g_clear_error (&error);
1669
1670   _g_dbus_worker_unref (worker);
1671 }
1672
1673 /* called in private thread shared by all GDBusConnection instances
1674  *
1675  * write-lock is not held on entry
1676  * output_pending must be PENDING_NONE on entry
1677  */
1678 static void
1679 continue_writing (GDBusWorker *worker)
1680 {
1681   MessageToWriteData *data;
1682   FlushAsyncData *flush_async_data;
1683
1684  write_next:
1685   /* we mustn't try to write two things at once */
1686   g_assert (worker->output_pending == PENDING_NONE);
1687
1688   g_mutex_lock (&worker->write_lock);
1689
1690   data = NULL;
1691   flush_async_data = NULL;
1692
1693   /* if we want to close the connection, that takes precedence */
1694   if (worker->pending_close_attempts != NULL)
1695     {
1696       worker->close_expected = TRUE;
1697       worker->output_pending = PENDING_CLOSE;
1698
1699       g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
1700                                NULL, iostream_close_cb,
1701                                _g_dbus_worker_ref (worker));
1702     }
1703   else
1704     {
1705       flush_async_data = prepare_flush_unlocked (worker);
1706
1707       if (flush_async_data == NULL)
1708         {
1709           data = g_queue_pop_head (worker->write_queue);
1710
1711           if (data != NULL)
1712             worker->output_pending = PENDING_WRITE;
1713         }
1714     }
1715
1716   g_mutex_unlock (&worker->write_lock);
1717
1718   /* Note that write_lock is only used for protecting the @write_queue
1719    * and @output_pending fields of the GDBusWorker struct ... which we
1720    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1721    *
1722    * Therefore, it's fine to drop it here when calling back into user
1723    * code and then writing the message out onto the GIOStream since this
1724    * function only runs on the worker thread.
1725    */
1726
1727   if (flush_async_data != NULL)
1728     {
1729       start_flush (flush_async_data);
1730       g_assert (data == NULL);
1731     }
1732   else if (data != NULL)
1733     {
1734       GDBusMessage *old_message;
1735       guchar *new_blob;
1736       gsize new_blob_size;
1737       GError *error;
1738
1739       old_message = data->message;
1740       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1741       if (data->message == old_message)
1742         {
1743           /* filters had no effect - do nothing */
1744         }
1745       else if (data->message == NULL)
1746         {
1747           /* filters dropped message */
1748           g_mutex_lock (&worker->write_lock);
1749           worker->output_pending = PENDING_NONE;
1750           g_mutex_unlock (&worker->write_lock);
1751           message_to_write_data_free (data);
1752           goto write_next;
1753         }
1754       else
1755         {
1756           /* filters altered the message -> reencode */
1757           error = NULL;
1758
1759          /* [KDBUS]
1760           * Setting protocol version, before invoking g_dbus_message_to_blob() will
1761           * be removed after preparing new function only for kdbus transport purposes
1762           * (this function will be able to create blob directly/unconditionally in memfd
1763           * object, without making copy):
1764           *
1765           * [1] https://code.google.com/p/d-bus/source/browse/TODO
1766           */
1767
1768           if (G_IS_KDBUS_CONNECTION (worker->stream))
1769             _g_dbus_message_set_protocol_ver (data->message,2);
1770           else
1771             _g_dbus_message_set_protocol_ver (data->message,1);
1772
1773           new_blob = g_dbus_message_to_blob (data->message,
1774                                              &new_blob_size,
1775                                              worker->capabilities,
1776                                              &error);
1777           if (new_blob == NULL)
1778             {
1779               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1780                * the old message instead
1781                */
1782               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1783                          g_dbus_message_get_serial (data->message),
1784                          error->message);
1785               g_error_free (error);
1786             }
1787           else
1788             {
1789               g_free (data->blob);
1790               data->blob = (gchar *) new_blob;
1791               data->blob_size = new_blob_size;
1792             }
1793         }
1794
1795       write_message_async (worker,
1796                            data,
1797                            write_message_cb,
1798                            data);
1799     }
1800 }
1801
1802 /* called in private thread shared by all GDBusConnection instances
1803  *
1804  * write-lock is not held on entry
1805  * output_pending may be anything
1806  */
1807 static gboolean
1808 continue_writing_in_idle_cb (gpointer user_data)
1809 {
1810   GDBusWorker *worker = user_data;
1811
1812   /* Because this is the worker thread, we can read this struct member
1813    * without holding the lock: no other thread ever modifies it.
1814    */
1815   if (worker->output_pending == PENDING_NONE)
1816     continue_writing (worker);
1817
1818   return FALSE;
1819 }
1820
1821 /**
1822  * @write_data: (transfer full) (allow-none):
1823  * @flush_data: (transfer full) (allow-none):
1824  * @close_data: (transfer full) (allow-none):
1825  *
1826  * Can be called from any thread
1827  *
1828  * write_lock is held on entry
1829  * output_pending may be anything
1830  */
1831 static void
1832 schedule_writing_unlocked (GDBusWorker        *worker,
1833                            MessageToWriteData *write_data,
1834                            FlushData          *flush_data,
1835                            CloseData          *close_data)
1836 {
1837   if (write_data != NULL)
1838     g_queue_push_tail (worker->write_queue, write_data);
1839
1840   if (flush_data != NULL)
1841     worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
1842
1843   if (close_data != NULL)
1844     worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
1845                                                      close_data);
1846
1847   /* If we had output pending, the next bit of output will happen
1848    * automatically when it finishes, so we only need to do this
1849    * if nothing was pending.
1850    *
1851    * The idle callback will re-check that output_pending is still
1852    * PENDING_NONE, to guard against output starting before the idle.
1853    */
1854   if (worker->output_pending == PENDING_NONE)
1855     {
1856       GSource *idle_source;
1857       idle_source = g_idle_source_new ();
1858       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1859       g_source_set_callback (idle_source,
1860                              continue_writing_in_idle_cb,
1861                              _g_dbus_worker_ref (worker),
1862                              (GDestroyNotify) _g_dbus_worker_unref);
1863       g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb");
1864       g_source_attach (idle_source, worker->shared_thread_data->context);
1865       g_source_unref (idle_source);
1866     }
1867 }
1868
1869 /* ---------------------------------------------------------------------------------------------------- */
1870
1871 /* can be called from any thread - steals blob
1872  *
1873  * write_lock is not held on entry
1874  * output_pending may be anything
1875  */
1876 void
1877 _g_dbus_worker_send_message (GDBusWorker    *worker,
1878                              GDBusMessage   *message,
1879                              gchar          *blob,
1880                              gsize           blob_len)
1881 {
1882   MessageToWriteData *data;
1883
1884   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1885   g_return_if_fail (blob != NULL);
1886   g_return_if_fail (blob_len > 16);
1887
1888   data = g_new0 (MessageToWriteData, 1);
1889   data->worker = _g_dbus_worker_ref (worker);
1890   data->message = g_object_ref (message);
1891   data->blob = blob; /* steal! */
1892   data->blob_size = blob_len;
1893
1894   g_mutex_lock (&worker->write_lock);
1895   schedule_writing_unlocked (worker, data, NULL, NULL);
1896   g_mutex_unlock (&worker->write_lock);
1897 }
1898
1899 /* ---------------------------------------------------------------------------------------------------- */
1900
1901 GDBusWorker *
1902 _g_dbus_worker_new (GIOStream                              *stream,
1903                     GDBusCapabilityFlags                    capabilities,
1904                     gboolean                                initially_frozen,
1905                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1906                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1907                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1908                     gpointer                                user_data)
1909 {
1910   GDBusWorker *worker;
1911   GSource *idle_source;
1912
1913   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1914   g_return_val_if_fail (message_received_callback != NULL, NULL);
1915   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1916   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1917
1918   worker = g_new0 (GDBusWorker, 1);
1919   worker->ref_count = 1;
1920
1921   g_mutex_init (&worker->read_lock);
1922   worker->message_received_callback = message_received_callback;
1923   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1924   worker->disconnected_callback = disconnected_callback;
1925   worker->user_data = user_data;
1926   worker->stream = g_object_ref (stream);
1927   worker->capabilities = capabilities;
1928   worker->cancellable = g_cancellable_new ();
1929   worker->output_pending = PENDING_NONE;
1930
1931   worker->frozen = initially_frozen;
1932   worker->received_messages_while_frozen = g_queue_new ();
1933
1934   g_mutex_init (&worker->write_lock);
1935   worker->write_queue = g_queue_new ();
1936
1937   if (G_IS_SOCKET_CONNECTION (worker->stream))
1938     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1939
1940 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
1941   if (G_IS_KDBUS_CONNECTION (worker->stream))
1942     worker->kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream));
1943 #endif
1944
1945   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1946
1947   /* begin reading */
1948   idle_source = g_idle_source_new ();
1949   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1950   g_source_set_callback (idle_source,
1951                          _g_dbus_worker_do_initial_read,
1952                          _g_dbus_worker_ref (worker),
1953                          (GDestroyNotify) _g_dbus_worker_unref);
1954   g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read");
1955   g_source_attach (idle_source, worker->shared_thread_data->context);
1956   g_source_unref (idle_source);
1957
1958   return worker;
1959 }
1960
1961 /* ---------------------------------------------------------------------------------------------------- */
1962
1963 /* can be called from any thread
1964  *
1965  * write_lock is not held on entry
1966  * output_pending may be anything
1967  */
1968 void
1969 _g_dbus_worker_close (GDBusWorker         *worker,
1970                       GCancellable        *cancellable,
1971                       GSimpleAsyncResult  *result)
1972 {
1973   CloseData *close_data;
1974
1975   close_data = g_slice_new0 (CloseData);
1976   close_data->worker = _g_dbus_worker_ref (worker);
1977   close_data->cancellable =
1978       (cancellable == NULL ? NULL : g_object_ref (cancellable));
1979   close_data->result = (result == NULL ? NULL : g_object_ref (result));
1980
1981   /* Don't set worker->close_expected here - we're in the wrong thread.
1982    * It'll be set before the actual close happens.
1983    */
1984   g_cancellable_cancel (worker->cancellable);
1985   g_mutex_lock (&worker->write_lock);
1986   schedule_writing_unlocked (worker, NULL, NULL, close_data);
1987   g_mutex_unlock (&worker->write_lock);
1988 }
1989
1990 /* This can be called from any thread - frees worker. Note that
1991  * callbacks might still happen if called from another thread than the
1992  * worker - use your own synchronization primitive in the callbacks.
1993  *
1994  * write_lock is not held on entry
1995  * output_pending may be anything
1996  */
1997 void
1998 _g_dbus_worker_stop (GDBusWorker *worker)
1999 {
2000   g_atomic_int_set (&worker->stopped, TRUE);
2001
2002   /* Cancel any pending operations and schedule a close of the underlying I/O
2003    * stream in the worker thread
2004    */
2005   _g_dbus_worker_close (worker, NULL, NULL);
2006
2007   /* _g_dbus_worker_close holds a ref until after an idle in the worker
2008    * thread has run, so we no longer need to unref in an idle like in
2009    * commit 322e25b535
2010    */
2011   _g_dbus_worker_unref (worker);
2012 }
2013
2014 /* ---------------------------------------------------------------------------------------------------- */
2015
2016 /* can be called from any thread (except the worker thread) - blocks
2017  * calling thread until all queued outgoing messages are written and
2018  * the transport has been flushed
2019  *
2020  * write_lock is not held on entry
2021  * output_pending may be anything
2022  */
2023 gboolean
2024 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
2025                            GCancellable   *cancellable,
2026                            GError        **error)
2027 {
2028   gboolean ret;
2029   FlushData *data;
2030   guint64 pending_writes;
2031
2032   data = NULL;
2033   ret = TRUE;
2034
2035   g_mutex_lock (&worker->write_lock);
2036
2037   /* if the queue is empty, no write is in-flight and we haven't written
2038    * anything since the last flush, then there's nothing to wait for
2039    */
2040   pending_writes = g_queue_get_length (worker->write_queue);
2041
2042   /* if a write is in-flight, we shouldn't be satisfied until the first
2043    * flush operation that follows it
2044    */
2045   if (worker->output_pending == PENDING_WRITE)
2046     pending_writes += 1;
2047
2048   if (pending_writes > 0 ||
2049       worker->write_num_messages_written != worker->write_num_messages_flushed)
2050     {
2051       data = g_new0 (FlushData, 1);
2052       g_mutex_init (&data->mutex);
2053       g_cond_init (&data->cond);
2054       data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
2055       g_mutex_lock (&data->mutex);
2056
2057       schedule_writing_unlocked (worker, NULL, data, NULL);
2058     }
2059   g_mutex_unlock (&worker->write_lock);
2060
2061   if (data != NULL)
2062     {
2063       g_cond_wait (&data->cond, &data->mutex);
2064       g_mutex_unlock (&data->mutex);
2065
2066       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
2067       g_cond_clear (&data->cond);
2068       g_mutex_clear (&data->mutex);
2069       if (data->error != NULL)
2070         {
2071           ret = FALSE;
2072           g_propagate_error (error, data->error);
2073         }
2074       g_free (data);
2075     }
2076
2077   return ret;
2078 }
2079
2080 /* ---------------------------------------------------------------------------------------------------- */
2081
2082 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
2083 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
2084 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
2085 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
2086 #define G_DBUS_DEBUG_CALL           (1<<4)
2087 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
2088 #define G_DBUS_DEBUG_INCOMING       (1<<6)
2089 #define G_DBUS_DEBUG_RETURN         (1<<7)
2090 #define G_DBUS_DEBUG_EMISSION       (1<<8)
2091 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
2092
2093 static gint _gdbus_debug_flags = 0;
2094
2095 gboolean
2096 _g_dbus_debug_authentication (void)
2097 {
2098   _g_dbus_initialize ();
2099   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
2100 }
2101
2102 gboolean
2103 _g_dbus_debug_transport (void)
2104 {
2105   _g_dbus_initialize ();
2106   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
2107 }
2108
2109 gboolean
2110 _g_dbus_debug_message (void)
2111 {
2112   _g_dbus_initialize ();
2113   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
2114 }
2115
2116 gboolean
2117 _g_dbus_debug_payload (void)
2118 {
2119   _g_dbus_initialize ();
2120   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
2121 }
2122
2123 gboolean
2124 _g_dbus_debug_call (void)
2125 {
2126   _g_dbus_initialize ();
2127   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
2128 }
2129
2130 gboolean
2131 _g_dbus_debug_signal (void)
2132 {
2133   _g_dbus_initialize ();
2134   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
2135 }
2136
2137 gboolean
2138 _g_dbus_debug_incoming (void)
2139 {
2140   _g_dbus_initialize ();
2141   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
2142 }
2143
2144 gboolean
2145 _g_dbus_debug_return (void)
2146 {
2147   _g_dbus_initialize ();
2148   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
2149 }
2150
2151 gboolean
2152 _g_dbus_debug_emission (void)
2153 {
2154   _g_dbus_initialize ();
2155   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
2156 }
2157
2158 gboolean
2159 _g_dbus_debug_address (void)
2160 {
2161   _g_dbus_initialize ();
2162   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
2163 }
2164
2165 G_LOCK_DEFINE_STATIC (print_lock);
2166
2167 void
2168 _g_dbus_debug_print_lock (void)
2169 {
2170   G_LOCK (print_lock);
2171 }
2172
2173 void
2174 _g_dbus_debug_print_unlock (void)
2175 {
2176   G_UNLOCK (print_lock);
2177 }
2178
2179 /**
2180  * _g_dbus_initialize:
2181  *
2182  * Does various one-time init things such as
2183  *
2184  *  - registering the G_DBUS_ERROR error domain
2185  *  - parses the G_DBUS_DEBUG environment variable
2186  */
2187 void
2188 _g_dbus_initialize (void)
2189 {
2190   static volatile gsize initialized = 0;
2191
2192   if (g_once_init_enter (&initialized))
2193     {
2194       volatile GQuark g_dbus_error_domain;
2195       const gchar *debug;
2196
2197       g_dbus_error_domain = G_DBUS_ERROR;
2198       (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
2199
2200       debug = g_getenv ("G_DBUS_DEBUG");
2201       if (debug != NULL)
2202         {
2203           const GDebugKey keys[] = {
2204             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
2205             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
2206             { "message",        G_DBUS_DEBUG_MESSAGE        },
2207             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
2208             { "call",           G_DBUS_DEBUG_CALL           },
2209             { "signal",         G_DBUS_DEBUG_SIGNAL         },
2210             { "incoming",       G_DBUS_DEBUG_INCOMING       },
2211             { "return",         G_DBUS_DEBUG_RETURN         },
2212             { "emission",       G_DBUS_DEBUG_EMISSION       },
2213             { "address",        G_DBUS_DEBUG_ADDRESS        }
2214           };
2215
2216           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
2217           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
2218             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
2219         }
2220
2221       g_once_init_leave (&initialized, 1);
2222     }
2223 }
2224
2225 /* ---------------------------------------------------------------------------------------------------- */
2226
2227 GVariantType *
2228 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
2229 {
2230   const GVariantType *arg_types[256];
2231   guint n;
2232
2233   if (args)
2234     for (n = 0; args[n] != NULL; n++)
2235       {
2236         /* DBus places a hard limit of 255 on signature length.
2237          * therefore number of args must be less than 256.
2238          */
2239         g_assert (n < 256);
2240
2241         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
2242
2243         if G_UNLIKELY (arg_types[n] == NULL)
2244           return NULL;
2245       }
2246   else
2247     n = 0;
2248
2249   return g_variant_type_new_tuple (arg_types, n);
2250 }
2251
2252 /* ---------------------------------------------------------------------------------------------------- */
2253
2254 #ifdef G_OS_WIN32
2255
2256 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
2257
2258 gchar *
2259 _g_dbus_win32_get_user_sid (void)
2260 {
2261   HANDLE h;
2262   TOKEN_USER *user;
2263   DWORD token_information_len;
2264   PSID psid;
2265   gchar *sid;
2266   gchar *ret;
2267
2268   ret = NULL;
2269   user = NULL;
2270   h = INVALID_HANDLE_VALUE;
2271
2272   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
2273     {
2274       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
2275       goto out;
2276     }
2277
2278   /* Get length of buffer */
2279   token_information_len = 0;
2280   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
2281     {
2282       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
2283         {
2284           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2285           goto out;
2286         }
2287     }
2288   user = g_malloc (token_information_len);
2289   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
2290     {
2291       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2292       goto out;
2293     }
2294
2295   psid = user->User.Sid;
2296   if (!IsValidSid (psid))
2297     {
2298       g_warning ("Invalid SID");
2299       goto out;
2300     }
2301
2302   if (!ConvertSidToStringSidA (psid, &sid))
2303     {
2304       g_warning ("Invalid SID");
2305       goto out;
2306     }
2307
2308   ret = g_strdup (sid);
2309   LocalFree (sid);
2310
2311 out:
2312   g_free (user);
2313   if (h != INVALID_HANDLE_VALUE)
2314     CloseHandle (h);
2315   return ret;
2316 }
2317 #endif
2318
2319 /* ---------------------------------------------------------------------------------------------------- */
2320
2321 gchar *
2322 _g_dbus_get_machine_id (GError **error)
2323 {
2324 #ifdef G_OS_WIN32
2325   HW_PROFILE_INFOA info;
2326   char *src, *dest, *res;
2327   int i;
2328
2329   if (!GetCurrentHwProfileA (&info))
2330     {
2331       char *message = g_win32_error_message (GetLastError ());
2332       g_set_error (error,
2333                    G_IO_ERROR,
2334                    G_IO_ERROR_FAILED,
2335                    _("Unable to get Hardware profile: %s"), message);
2336       g_free (message);
2337       return NULL;
2338     }
2339
2340   /* Form: {12340001-4980-1920-6788-123456789012} */
2341   src = &info.szHwProfileGuid[0];
2342
2343   res = g_malloc (32+1);
2344   dest = res;
2345
2346   src++; /* Skip { */
2347   for (i = 0; i < 8; i++)
2348     *dest++ = *src++;
2349   src++; /* Skip - */
2350   for (i = 0; i < 4; i++)
2351     *dest++ = *src++;
2352   src++; /* Skip - */
2353   for (i = 0; i < 4; i++)
2354     *dest++ = *src++;
2355   src++; /* Skip - */
2356   for (i = 0; i < 4; i++)
2357     *dest++ = *src++;
2358   src++; /* Skip - */
2359   for (i = 0; i < 12; i++)
2360     *dest++ = *src++;
2361   *dest = 0;
2362
2363   return res;
2364 #else
2365   gchar *ret;
2366   GError *first_error;
2367   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
2368   ret = NULL;
2369   first_error = NULL;
2370   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
2371                             &ret,
2372                             NULL,
2373                             &first_error) &&
2374       !g_file_get_contents ("/etc/machine-id",
2375                             &ret,
2376                             NULL,
2377                             NULL))
2378     {
2379       g_propagate_prefixed_error (error, first_error,
2380                                   _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
2381     }
2382   else
2383     {
2384       /* ignore the error from the first try, if any */
2385       g_clear_error (&first_error);
2386       /* TODO: validate value */
2387       g_strstrip (ret);
2388     }
2389   return ret;
2390 #endif
2391 }
2392
2393 /* ---------------------------------------------------------------------------------------------------- */
2394
2395 gchar *
2396 _g_dbus_enum_to_string (GType enum_type, gint value)
2397 {
2398   gchar *ret;
2399   GEnumClass *klass;
2400   GEnumValue *enum_value;
2401
2402   klass = g_type_class_ref (enum_type);
2403   enum_value = g_enum_get_value (klass, value);
2404   if (enum_value != NULL)
2405     ret = g_strdup (enum_value->value_nick);
2406   else
2407     ret = g_strdup_printf ("unknown (value %d)", value);
2408   g_type_class_unref (klass);
2409   return ret;
2410 }
2411
2412 /* ---------------------------------------------------------------------------------------------------- */
2413
2414 static void
2415 write_message_print_transport_debug (gssize bytes_written,
2416                                      MessageToWriteData *data)
2417 {
2418   if (G_LIKELY (!_g_dbus_debug_transport ()))
2419     goto out;
2420
2421   _g_dbus_debug_print_lock ();
2422   g_print ("========================================================================\n"
2423            "GDBus-debug:Transport:\n"
2424            "  >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2425            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
2426            bytes_written,
2427            g_dbus_message_get_serial (data->message),
2428            data->blob_size,
2429            data->total_written,
2430            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
2431   _g_dbus_debug_print_unlock ();
2432  out:
2433   ;
2434 }
2435
2436 /* ---------------------------------------------------------------------------------------------------- */
2437
2438 static void
2439 read_message_print_transport_debug (gssize bytes_read,
2440                                     GDBusWorker *worker)
2441 {
2442   gsize size;
2443   gint32 serial;
2444   gint32 message_length;
2445
2446   if (G_LIKELY (!_g_dbus_debug_transport ()))
2447     goto out;
2448
2449   size = bytes_read + worker->read_buffer_cur_size;
2450   serial = 0;
2451   message_length = 0;
2452   if (size >= 16)
2453     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
2454   if (size >= 1)
2455     {
2456       switch (worker->read_buffer[0])
2457         {
2458         case 'l':
2459           if (size >= 12)
2460             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
2461           break;
2462         case 'B':
2463           if (size >= 12)
2464             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
2465           break;
2466         default:
2467           /* an error will be set elsewhere if this happens */
2468           goto out;
2469         }
2470     }
2471
2472     _g_dbus_debug_print_lock ();
2473   g_print ("========================================================================\n"
2474            "GDBus-debug:Transport:\n"
2475            "  <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2476            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
2477            bytes_read,
2478            serial,
2479            message_length,
2480            worker->read_buffer_cur_size,
2481            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
2482   _g_dbus_debug_print_unlock ();
2483  out:
2484   ;
2485 }
2486
2487 /* ---------------------------------------------------------------------------------------------------- */
2488
2489 gboolean
2490 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
2491                                      GValue                *return_accu,
2492                                      const GValue          *handler_return,
2493                                      gpointer               dummy)
2494 {
2495   gboolean continue_emission;
2496   gboolean signal_return;
2497
2498   signal_return = g_value_get_boolean (handler_return);
2499   g_value_set_boolean (return_accu, signal_return);
2500   continue_emission = signal_return;
2501
2502   return continue_emission;
2503 }