[kdbus] Enable 'initial_read()' - is no longer needed
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: David Zeuthen <davidz@redhat.com>
19  */
20
21 #include "config.h"
22
23 #include <stdlib.h>
24 #include <string.h>
25
26 #include "giotypes.h"
27 #include "gsocket.h"
28 #include "gdbusprivate.h"
29 #include "gdbusmessage.h"
30 #include "gdbuserror.h"
31 #include "gdbusintrospection.h"
32 #include "gasyncresult.h"
33 #include "gsimpleasyncresult.h"
34 #include "ginputstream.h"
35 #include "gmemoryinputstream.h"
36 #include "giostream.h"
37 #include "glib/gstdio.h"
38 #include "gsocketcontrolmessage.h"
39 #include "gsocketconnection.h"
40 #include "gsocketoutputstream.h"
41
42 #ifdef G_OS_UNIX
43 #include "gkdbus.h"
44 #include "gkdbusconnection.h"
45 #include "gunixfdmessage.h"
46 #include "gunixconnection.h"
47 #include "gunixcredentialsmessage.h"
48 #endif
49
50 #ifdef G_OS_WIN32
51 #include <windows.h>
52 #endif
53
54 #include "glibintl.h"
55
56 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
57
58 /* ---------------------------------------------------------------------------------------------------- */
59
60 gchar *
61 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
62 {
63  guint n, m;
64  GString *ret;
65
66  ret = g_string_new (NULL);
67
68  for (n = 0; n < len; n += 16)
69    {
70      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
71
72      for (m = n; m < n + 16; m++)
73        {
74          if (m > n && (m%4) == 0)
75            g_string_append_c (ret, ' ');
76          if (m < len)
77            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
78          else
79            g_string_append (ret, "   ");
80        }
81
82      g_string_append (ret, "   ");
83
84      for (m = n; m < len && m < n + 16; m++)
85        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
86
87      g_string_append_c (ret, '\n');
88    }
89
90  return g_string_free (ret, FALSE);
91 }
92
93 /* ---------------------------------------------------------------------------------------------------- */
94
95 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
96 typedef struct
97 {
98   GKdbus *kdbus;
99   GCancellable *cancellable;
100
101   GSimpleAsyncResult *simple;
102
103   gboolean from_mainloop;
104 } ReadKdbusData;
105
106 static void
107 read_kdbus_data_free (ReadKdbusData  *data)
108 {
109   g_object_unref (data->kdbus);
110   if (data->cancellable != NULL)
111     g_object_unref (data->cancellable);
112   g_object_unref (data->simple);
113   g_free (data);
114 }
115
116 static gboolean
117 _g_kdbus_read_ready (GKdbus        *kdbus,
118                      GIOCondition   condition,
119                      gpointer       user_data)
120 {
121   ReadKdbusData *data = user_data;
122   GError *error = NULL;
123   gssize result;
124
125   result = _g_kdbus_receive (data->kdbus,
126                              data->cancellable,
127                              &error);
128
129   if (result >= 0)
130     {
131       g_simple_async_result_set_op_res_gssize (data->simple, result);
132     }
133   else
134     {
135       g_assert (error != NULL);
136       g_simple_async_result_take_error (data->simple, error);
137     }
138
139   if (data->from_mainloop)
140     g_simple_async_result_complete (data->simple);
141   else
142     g_simple_async_result_complete_in_idle (data->simple);
143
144   return FALSE;
145 }
146
147 static void
148 _g_kdbus_read (GKdbus               *kdbus,
149                GCancellable         *cancellable,
150                GAsyncReadyCallback   callback,
151                gpointer              user_data)
152 {
153   ReadKdbusData *data;
154   GSource *source;
155
156   data = g_new0 (ReadKdbusData, 1);
157   data->kdbus = g_object_ref (kdbus);
158   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
159
160   data->simple = g_simple_async_result_new (G_OBJECT (kdbus),
161                                             callback,
162                                             user_data,
163                                             _g_kdbus_read);
164   g_simple_async_result_set_check_cancellable (data->simple, cancellable);
165
166   data->from_mainloop = TRUE;
167   source = _g_kdbus_create_source (data->kdbus,
168                                    G_IO_IN,
169                                    cancellable);
170   g_source_set_callback (source,
171                          (GSourceFunc) _g_kdbus_read_ready,
172                          data,
173                          (GDestroyNotify) read_kdbus_data_free);
174   g_source_attach (source, g_main_context_get_thread_default ());
175   g_source_unref (source);
176 }
177
178 static gssize
179 _g_kdbus_read_finish (GKdbus        *kdbus,
180                       GAsyncResult  *result,
181                       GError       **error)
182 {
183   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
184
185   g_return_val_if_fail (G_IS_KDBUS (kdbus), -1);
186   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read);
187
188   if (g_simple_async_result_propagate_error (simple, error))
189     return -1;
190   else
191     return g_simple_async_result_get_op_res_gssize (simple);
192 }
193
194 #endif /* defined (G_OS_UNIX) && (KDBUS_TRANSPORT) */
195
196 /* Unfortunately ancillary messages are discarded when reading from a
197  * socket using the GSocketInputStream abstraction. So we provide a
198  * very GInputStream-ish API that uses GSocket in this case (very
199  * similar to GSocketInputStream).
200  */
201
202 typedef struct
203 {
204   GSocket *socket;
205   GCancellable *cancellable;
206
207   void *buffer;
208   gsize count;
209
210   GSocketControlMessage ***messages;
211   gint *num_messages;
212
213   GSimpleAsyncResult *simple;
214
215   gboolean from_mainloop;
216 } ReadWithControlData;
217
218 static void
219 read_with_control_data_free (ReadWithControlData *data)
220 {
221   g_object_unref (data->socket);
222   if (data->cancellable != NULL)
223     g_object_unref (data->cancellable);
224   g_object_unref (data->simple);
225   g_free (data);
226 }
227
228 static gboolean
229 _g_socket_read_with_control_messages_ready (GSocket      *socket,
230                                             GIOCondition  condition,
231                                             gpointer      user_data)
232 {
233   ReadWithControlData *data = user_data;
234   GError *error;
235   gssize result;
236   GInputVector vector;
237
238   error = NULL;
239   vector.buffer = data->buffer;
240   vector.size = data->count;
241   result = g_socket_receive_message (data->socket,
242                                      NULL, /* address */
243                                      &vector,
244                                      1,
245                                      data->messages,
246                                      data->num_messages,
247                                      NULL,
248                                      data->cancellable,
249                                      &error);
250   if (result >= 0)
251     {
252       g_simple_async_result_set_op_res_gssize (data->simple, result);
253     }
254   else
255     {
256       g_assert (error != NULL);
257       g_simple_async_result_take_error (data->simple, error);
258     }
259
260   if (data->from_mainloop)
261     g_simple_async_result_complete (data->simple);
262   else
263     g_simple_async_result_complete_in_idle (data->simple);
264
265   return FALSE;
266 }
267
268 static void
269 _g_socket_read_with_control_messages (GSocket                 *socket,
270                                       void                    *buffer,
271                                       gsize                    count,
272                                       GSocketControlMessage ***messages,
273                                       gint                    *num_messages,
274                                       gint                     io_priority,
275                                       GCancellable            *cancellable,
276                                       GAsyncReadyCallback      callback,
277                                       gpointer                 user_data)
278 {
279   ReadWithControlData *data;
280
281   data = g_new0 (ReadWithControlData, 1);
282   data->socket = g_object_ref (socket);
283   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
284   data->buffer = buffer;
285   data->count = count;
286   data->messages = messages;
287   data->num_messages = num_messages;
288
289   data->simple = g_simple_async_result_new (G_OBJECT (socket),
290                                             callback,
291                                             user_data,
292                                             _g_socket_read_with_control_messages);
293   g_simple_async_result_set_check_cancellable (data->simple, cancellable);
294
295   if (!g_socket_condition_check (socket, G_IO_IN))
296     {
297       GSource *source;
298       data->from_mainloop = TRUE;
299       source = g_socket_create_source (data->socket,
300                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
301                                        cancellable);
302       g_source_set_callback (source,
303                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
304                              data,
305                              (GDestroyNotify) read_with_control_data_free);
306       g_source_attach (source, g_main_context_get_thread_default ());
307       g_source_unref (source);
308     }
309   else
310     {
311       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
312       read_with_control_data_free (data);
313     }
314 }
315
316 static gssize
317 _g_socket_read_with_control_messages_finish (GSocket       *socket,
318                                              GAsyncResult  *result,
319                                              GError       **error)
320 {
321   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
322
323   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
324   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
325
326   if (g_simple_async_result_propagate_error (simple, error))
327       return -1;
328   else
329     return g_simple_async_result_get_op_res_gssize (simple);
330 }
331
332 /* ---------------------------------------------------------------------------------------------------- */
333
334 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
335
336 static GPtrArray *ensured_classes = NULL;
337
338 static void
339 ensure_type (GType gtype)
340 {
341   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
342 }
343
344 static void
345 release_required_types (void)
346 {
347   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
348   g_ptr_array_unref (ensured_classes);
349   ensured_classes = NULL;
350 }
351
352 static void
353 ensure_required_types (void)
354 {
355   g_assert (ensured_classes == NULL);
356   ensured_classes = g_ptr_array_new ();
357   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
358   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
359 }
360 /* ---------------------------------------------------------------------------------------------------- */
361
362 typedef struct
363 {
364   volatile gint refcount;
365   GThread *thread;
366   GMainContext *context;
367   GMainLoop *loop;
368 } SharedThreadData;
369
370 static gpointer
371 gdbus_shared_thread_func (gpointer user_data)
372 {
373   SharedThreadData *data = user_data;
374
375   g_main_context_push_thread_default (data->context);
376   g_main_loop_run (data->loop);
377   g_main_context_pop_thread_default (data->context);
378
379   release_required_types ();
380
381   return NULL;
382 }
383
384 /* ---------------------------------------------------------------------------------------------------- */
385
386 static SharedThreadData *
387 _g_dbus_shared_thread_ref (void)
388 {
389   static gsize shared_thread_data = 0;
390   SharedThreadData *ret;
391
392   if (g_once_init_enter (&shared_thread_data))
393     {
394       SharedThreadData *data;
395
396       /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
397       ensure_required_types ();
398
399       data = g_new0 (SharedThreadData, 1);
400       data->refcount = 0;
401       
402       data->context = g_main_context_new ();
403       data->loop = g_main_loop_new (data->context, FALSE);
404       data->thread = g_thread_new ("gdbus",
405                                    gdbus_shared_thread_func,
406                                    data);
407       /* We can cast between gsize and gpointer safely */
408       g_once_init_leave (&shared_thread_data, (gsize) data);
409     }
410
411   ret = (SharedThreadData*) shared_thread_data;
412   g_atomic_int_inc (&ret->refcount);
413   return ret;
414 }
415
416 static void
417 _g_dbus_shared_thread_unref (SharedThreadData *data)
418 {
419   /* TODO: actually destroy the shared thread here */
420 #if 0
421   g_assert (data != NULL);
422   if (g_atomic_int_dec_and_test (&data->refcount))
423     {
424       g_main_loop_quit (data->loop);
425       //g_thread_join (data->thread);
426       g_main_loop_unref (data->loop);
427       g_main_context_unref (data->context);
428     }
429 #endif
430 }
431
432 /* ---------------------------------------------------------------------------------------------------- */
433
434 typedef enum {
435     PENDING_NONE = 0,
436     PENDING_WRITE,
437     PENDING_FLUSH,
438     PENDING_CLOSE
439 } OutputPending;
440
441 struct GDBusWorker
442 {
443   volatile gint                       ref_count;
444
445   SharedThreadData                   *shared_thread_data;
446
447   /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
448   volatile gint                       stopped;
449
450   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
451    * only affects messages received from the other peer (since GDBusServer is the
452    * only user) - we might want it to affect messages sent to the other peer too?
453    */
454   gboolean                            frozen;
455   GDBusCapabilityFlags                capabilities;
456   GQueue                             *received_messages_while_frozen;
457
458   GIOStream                          *stream;
459   GCancellable                       *cancellable;
460   GDBusWorkerMessageReceivedCallback  message_received_callback;
461   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
462   GDBusWorkerDisconnectedCallback     disconnected_callback;
463   gpointer                            user_data;
464
465   /* if GSocket and GKdbus are NULL, stream is GSocketConnection */
466   GSocket *socket;
467 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
468   GKdbus  *kdbus;
469 #endif
470
471   /* used for reading */
472   GMutex                              read_lock;
473   gchar                              *read_buffer;
474   gsize                               read_buffer_allocated_size;
475   gsize                               read_buffer_cur_size;
476   gsize                               read_buffer_bytes_wanted;
477   GUnixFDList                        *read_fd_list;
478   GSocketControlMessage             **read_ancillary_messages;
479   gint                                read_num_ancillary_messages;
480
481   /* Whether an async write, flush or close, or none of those, is pending.
482    * Only the worker thread may change its value, and only with the write_lock.
483    * Other threads may read its value when holding the write_lock.
484    * The worker thread may read its value at any time.
485    */
486   OutputPending                       output_pending;
487   /* used for writing */
488   GMutex                              write_lock;
489   /* queue of MessageToWriteData, protected by write_lock */
490   GQueue                             *write_queue;
491   /* protected by write_lock */
492   guint64                             write_num_messages_written;
493   /* number of messages we'd written out last time we flushed;
494    * protected by write_lock
495    */
496   guint64                             write_num_messages_flushed;
497   /* list of FlushData, protected by write_lock */
498   GList                              *write_pending_flushes;
499   /* list of CloseData, protected by write_lock */
500   GList                              *pending_close_attempts;
501   /* no lock - only used from the worker thread */
502   gboolean                            close_expected;
503 };
504
505 static void _g_dbus_worker_unref (GDBusWorker *worker);
506
507 /* ---------------------------------------------------------------------------------------------------- */
508
509 typedef struct
510 {
511   GMutex  mutex;
512   GCond   cond;
513   guint64 number_to_wait_for;
514   GError *error;
515 } FlushData;
516
517 struct _MessageToWriteData ;
518 typedef struct _MessageToWriteData MessageToWriteData;
519
520 static void message_to_write_data_free (MessageToWriteData *data);
521
522 static void read_message_print_transport_debug (gssize bytes_read,
523                                                 GDBusWorker *worker);
524
525 static void write_message_print_transport_debug (gssize bytes_written,
526                                                  MessageToWriteData *data);
527
528 typedef struct {
529     GDBusWorker *worker;
530     GCancellable *cancellable;
531     GSimpleAsyncResult *result;
532 } CloseData;
533
534 static void close_data_free (CloseData *close_data)
535 {
536   if (close_data->cancellable != NULL)
537     g_object_unref (close_data->cancellable);
538
539   if (close_data->result != NULL)
540     g_object_unref (close_data->result);
541
542   _g_dbus_worker_unref (close_data->worker);
543   g_slice_free (CloseData, close_data);
544 }
545
546 /* ---------------------------------------------------------------------------------------------------- */
547
548 static GDBusWorker *
549 _g_dbus_worker_ref (GDBusWorker *worker)
550 {
551   g_atomic_int_inc (&worker->ref_count);
552   return worker;
553 }
554
555 static void
556 _g_dbus_worker_unref (GDBusWorker *worker)
557 {
558   if (g_atomic_int_dec_and_test (&worker->ref_count))
559     {
560       g_assert (worker->write_pending_flushes == NULL);
561
562       _g_dbus_shared_thread_unref (worker->shared_thread_data);
563
564       g_object_unref (worker->stream);
565
566       g_mutex_clear (&worker->read_lock);
567       g_object_unref (worker->cancellable);
568       if (worker->read_fd_list != NULL)
569         g_object_unref (worker->read_fd_list);
570
571       g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
572       g_mutex_clear (&worker->write_lock);
573       g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
574       g_free (worker->read_buffer);
575
576       g_free (worker);
577     }
578 }
579
580 static void
581 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
582                                   gboolean      remote_peer_vanished,
583                                   GError       *error)
584 {
585   if (!g_atomic_int_get (&worker->stopped))
586     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
587 }
588
589 static void
590 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
591                                       GDBusMessage *message)
592 {
593   if (!g_atomic_int_get (&worker->stopped))
594     worker->message_received_callback (worker, message, worker->user_data);
595 }
596
597 static GDBusMessage *
598 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
599                                               GDBusMessage *message)
600 {
601   GDBusMessage *ret;
602   if (!g_atomic_int_get (&worker->stopped))
603     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
604   else
605     ret = message;
606   return ret;
607 }
608
609 /* can only be called from private thread with read-lock held - takes ownership of @message */
610 static void
611 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
612                                                   GDBusMessage *message)
613 {
614   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
615     {
616       /* queue up */
617       g_queue_push_tail (worker->received_messages_while_frozen, message);
618     }
619   else
620     {
621       /* not frozen, nor anything in queue */
622       _g_dbus_worker_emit_message_received (worker, message);
623       g_object_unref (message);
624     }
625 }
626
627 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
628 static gboolean
629 unfreeze_in_idle_cb (gpointer user_data)
630 {
631   GDBusWorker *worker = user_data;
632   GDBusMessage *message;
633
634   g_mutex_lock (&worker->read_lock);
635   if (worker->frozen)
636     {
637       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
638         {
639           _g_dbus_worker_emit_message_received (worker, message);
640           g_object_unref (message);
641         }
642       worker->frozen = FALSE;
643     }
644   else
645     {
646       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
647     }
648   g_mutex_unlock (&worker->read_lock);
649   return FALSE;
650 }
651
652 /* can be called from any thread */
653 void
654 _g_dbus_worker_unfreeze (GDBusWorker *worker)
655 {
656   GSource *idle_source;
657   idle_source = g_idle_source_new ();
658   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
659   g_source_set_callback (idle_source,
660                          unfreeze_in_idle_cb,
661                          _g_dbus_worker_ref (worker),
662                          (GDestroyNotify) _g_dbus_worker_unref);
663   g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb");
664   g_source_attach (idle_source, worker->shared_thread_data->context);
665   g_source_unref (idle_source);
666 }
667
668 /* ---------------------------------------------------------------------------------------------------- */
669
670 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
671
672 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
673 static void
674 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
675                            GAsyncResult  *res,
676                            gpointer       user_data)
677 {
678   GDBusWorker *worker = user_data;
679   GError *error;
680   gssize bytes_read;
681
682   g_mutex_lock (&worker->read_lock);
683
684   /* If already stopped, don't even process the reply */
685   if (g_atomic_int_get (&worker->stopped))
686     goto out;
687
688   error = NULL;
689   bytes_read = 0;
690
691   if (FALSE)
692     {
693     }
694 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
695   else if (G_IS_KDBUS_CONNECTION (worker->stream))
696     {
697       bytes_read = _g_kdbus_read_finish (worker->kdbus,
698                                          res,
699                                          &error);
700       g_error ("[KDBUS] _g_dbus_worker_do_read_cb() - work in progress");
701     }
702 #endif
703   else if (worker->socket == NULL)
704     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
705                                              res,
706                                              &error);
707   else
708     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
709                                                               res,
710                                                               &error);
711   if (worker->read_num_ancillary_messages > 0)
712     {
713       gint n;
714       for (n = 0; n < worker->read_num_ancillary_messages; n++)
715         {
716           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
717
718           if (FALSE)
719             {
720             }
721 #ifdef G_OS_UNIX
722           else if (G_IS_UNIX_FD_MESSAGE (control_message))
723             {
724               GUnixFDMessage *fd_message;
725               gint *fds;
726               gint num_fds;
727
728               fd_message = G_UNIX_FD_MESSAGE (control_message);
729               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
730               if (worker->read_fd_list == NULL)
731                 {
732                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
733                 }
734               else
735                 {
736                   gint n;
737                   for (n = 0; n < num_fds; n++)
738                     {
739                       /* TODO: really want a append_steal() */
740                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
741                       (void) g_close (fds[n], NULL);
742                     }
743                 }
744               g_free (fds);
745             }
746           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
747             {
748               /* do nothing */
749             }
750 #endif
751           else
752             {
753               if (error == NULL)
754                 {
755                   g_set_error (&error,
756                                G_IO_ERROR,
757                                G_IO_ERROR_FAILED,
758                                "Unexpected ancillary message of type %s received from peer",
759                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
760                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
761                   g_error_free (error);
762                   g_object_unref (control_message);
763                   n++;
764                   while (n < worker->read_num_ancillary_messages)
765                     g_object_unref (worker->read_ancillary_messages[n++]);
766                   g_free (worker->read_ancillary_messages);
767                   goto out;
768                 }
769             }
770           g_object_unref (control_message);
771         }
772       g_free (worker->read_ancillary_messages);
773     }
774
775   if (bytes_read == -1)
776     {
777       if (G_UNLIKELY (_g_dbus_debug_transport ()))
778         {
779           _g_dbus_debug_print_lock ();
780           g_print ("========================================================================\n"
781                    "GDBus-debug:Transport:\n"
782                    "  ---- READ ERROR on stream of type %s:\n"
783                    "  ---- %s %d: %s\n",
784                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
785                    g_quark_to_string (error->domain), error->code,
786                    error->message);
787           _g_dbus_debug_print_unlock ();
788         }
789
790       /* Every async read that uses this callback uses worker->cancellable
791        * as its GCancellable. worker->cancellable gets cancelled if and only
792        * if the GDBusConnection tells us to close (either via
793        * _g_dbus_worker_stop, which is called on last-unref, or directly),
794        * so a cancelled read must mean our connection was closed locally.
795        *
796        * If we're closing, other errors are possible - notably,
797        * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
798        * read in-flight. It seems sensible to treat all read errors during
799        * closing as an expected thing that doesn't trip exit-on-close.
800        *
801        * Because close_expected can't be set until we get into the worker
802        * thread, but the cancellable is signalled sooner (from another
803        * thread), we do still need to check the error.
804        */
805       if (worker->close_expected ||
806           g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
807         _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
808       else
809         _g_dbus_worker_emit_disconnected (worker, TRUE, error);
810
811       g_error_free (error);
812       goto out;
813     }
814
815 #if 0
816   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
817            (gint) bytes_read,
818            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
819            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
820            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
821                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
822            worker->stream,
823            worker);
824 #endif
825
826   /* TODO: hmm, hmm... */
827   if (bytes_read == 0)
828     {
829       g_set_error (&error,
830                    G_IO_ERROR,
831                    G_IO_ERROR_FAILED,
832                    "Underlying GIOStream returned 0 bytes on an async read");
833       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
834       g_error_free (error);
835       goto out;
836     }
837
838   read_message_print_transport_debug (bytes_read, worker);
839
840   worker->read_buffer_cur_size += bytes_read;
841   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
842     {
843       /* OK, got what we asked for! */
844       if (worker->read_buffer_bytes_wanted == 16)
845         {
846           gssize message_len;
847           /* OK, got the header - determine how many more bytes are needed */
848           error = NULL;
849           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
850                                                      16,
851                                                      &error);
852           if (message_len == -1)
853             {
854               g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
855               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
856               g_error_free (error);
857               goto out;
858             }
859
860           worker->read_buffer_bytes_wanted = message_len;
861           _g_dbus_worker_do_read_unlocked (worker);
862         }
863       else
864         {
865           GDBusMessage *message;
866           error = NULL;
867
868           /* TODO: use connection->priv->auth to decode the message */
869
870           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
871                                                   worker->read_buffer_cur_size,
872                                                   worker->capabilities,
873                                                   &error);
874           if (message == NULL)
875             {
876               gchar *s;
877               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
878               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
879                          "The error is: %s\n"
880                          "The payload is as follows:\n"
881                          "%s\n",
882                          worker->read_buffer_cur_size,
883                          error->message,
884                          s);
885               g_free (s);
886               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
887               g_error_free (error);
888               goto out;
889             }
890
891 #ifdef G_OS_UNIX
892           if (worker->read_fd_list != NULL)
893             {
894               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
895               g_object_unref (worker->read_fd_list);
896               worker->read_fd_list = NULL;
897             }
898 #endif
899
900           if (G_UNLIKELY (_g_dbus_debug_message ()))
901             {
902               gchar *s;
903               _g_dbus_debug_print_lock ();
904               g_print ("========================================================================\n"
905                        "GDBus-debug:Message:\n"
906                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
907                        worker->read_buffer_cur_size);
908               s = g_dbus_message_print (message, 2);
909               g_print ("%s", s);
910               g_free (s);
911               if (G_UNLIKELY (_g_dbus_debug_payload ()))
912                 {
913                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
914                   g_print ("%s\n", s);
915                   g_free (s);
916                 }
917               _g_dbus_debug_print_unlock ();
918             }
919
920           /* yay, got a message, go deliver it */
921           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
922
923           /* start reading another message! */
924           worker->read_buffer_bytes_wanted = 0;
925           worker->read_buffer_cur_size = 0;
926           _g_dbus_worker_do_read_unlocked (worker);
927         }
928     }
929   else
930     {
931       /* didn't get all the bytes we requested - so repeat the request... */
932       _g_dbus_worker_do_read_unlocked (worker);
933     }
934
935  out:
936   g_mutex_unlock (&worker->read_lock);
937
938   /* gives up the reference acquired when calling g_input_stream_read_async() */
939   _g_dbus_worker_unref (worker);
940 }
941
942 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
943 static void
944 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
945 {
946   /* Note that we do need to keep trying to read even if close_expected is
947    * true, because only failing a read causes us to signal 'closed'.
948    */
949
950   /* [KDBUS]
951    * For KDBUS transport we don't  have to alloc buffer (worker->read_buffer)
952    * instead of it we use kdbus memory pool. On connection stage KDBUS client
953    * have to register a memory pool, large enough to  carry all backlog of
954    * data enqueued for the connection.
955    */
956
957 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
958   if (G_IS_KDBUS_CONNECTION (worker->stream))
959     {
960       _g_kdbus_read(worker->kdbus,
961                     worker->cancellable,
962                     (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
963                     _g_dbus_worker_ref (worker));
964       return;
965     }
966 #endif
967
968   /* if bytes_wanted is zero, it means start reading a message */
969   if (worker->read_buffer_bytes_wanted == 0)
970     {
971       worker->read_buffer_cur_size = 0;
972       worker->read_buffer_bytes_wanted = 16;
973     }
974
975   /* ensure we have a (big enough) buffer */
976   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
977     {
978       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
979       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
980       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
981     }
982
983   if (worker->socket == NULL)
984     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
985                                worker->read_buffer + worker->read_buffer_cur_size,
986                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
987                                G_PRIORITY_DEFAULT,
988                                worker->cancellable,
989                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
990                                _g_dbus_worker_ref (worker));
991   else
992     {
993       worker->read_ancillary_messages = NULL;
994       worker->read_num_ancillary_messages = 0;
995       _g_socket_read_with_control_messages (worker->socket,
996                                             worker->read_buffer + worker->read_buffer_cur_size,
997                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
998                                             &worker->read_ancillary_messages,
999                                             &worker->read_num_ancillary_messages,
1000                                             G_PRIORITY_DEFAULT,
1001                                             worker->cancellable,
1002                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
1003                                             _g_dbus_worker_ref (worker));
1004     }
1005 }
1006
1007 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
1008 static gboolean
1009 _g_dbus_worker_do_initial_read (gpointer data)
1010 {
1011   GDBusWorker *worker = data;
1012   g_mutex_lock (&worker->read_lock);
1013   _g_dbus_worker_do_read_unlocked (worker);
1014   g_mutex_unlock (&worker->read_lock);
1015   return FALSE;
1016 }
1017
1018 /* ---------------------------------------------------------------------------------------------------- */
1019
1020 struct _MessageToWriteData
1021 {
1022   GDBusWorker  *worker;
1023   GDBusMessage *message;
1024   gchar        *blob;
1025   gsize         blob_size;
1026
1027   gsize               total_written;
1028   GSimpleAsyncResult *simple;
1029
1030 };
1031
1032 static void
1033 message_to_write_data_free (MessageToWriteData *data)
1034 {
1035   _g_dbus_worker_unref (data->worker);
1036   if (data->message)
1037     g_object_unref (data->message);
1038   g_free (data->blob);
1039   g_free (data);
1040 }
1041
1042 /* ---------------------------------------------------------------------------------------------------- */
1043
1044 static void write_message_continue_writing (MessageToWriteData *data);
1045
1046 /* called in private thread shared by all GDBusConnection instances
1047  *
1048  * write-lock is not held on entry
1049  * output_pending is PENDING_WRITE on entry
1050  */
1051 static void
1052 write_message_async_cb (GObject      *source_object,
1053                         GAsyncResult *res,
1054                         gpointer      user_data)
1055 {
1056   MessageToWriteData *data = user_data;
1057   GSimpleAsyncResult *simple;
1058   gssize bytes_written;
1059   GError *error;
1060
1061   /* Note: we can't access data->simple after calling g_async_result_complete () because the
1062    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
1063    */
1064   simple = data->simple;
1065
1066   error = NULL;
1067   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
1068                                                 res,
1069                                                 &error);
1070   if (bytes_written == -1)
1071     {
1072       g_simple_async_result_take_error (simple, error);
1073       g_simple_async_result_complete (simple);
1074       g_object_unref (simple);
1075       goto out;
1076     }
1077   g_assert (bytes_written > 0); /* zero is never returned */
1078
1079   write_message_print_transport_debug (bytes_written, data);
1080
1081   data->total_written += bytes_written;
1082   g_assert (data->total_written <= data->blob_size);
1083   if (data->total_written == data->blob_size)
1084     {
1085       g_simple_async_result_complete (simple);
1086       g_object_unref (simple);
1087       goto out;
1088     }
1089
1090   write_message_continue_writing (data);
1091
1092  out:
1093   ;
1094 }
1095
1096 /* called in private thread shared by all GDBusConnection instances
1097  *
1098  * write-lock is not held on entry
1099  * output_pending is PENDING_WRITE on entry
1100  */
1101 #ifdef G_OS_UNIX
1102 static gboolean
1103 on_socket_ready (GSocket      *socket,
1104                  GIOCondition  condition,
1105                  gpointer      user_data)
1106 {
1107   MessageToWriteData *data = user_data;
1108   write_message_continue_writing (data);
1109   return FALSE; /* remove source */
1110 }
1111 #endif
1112
1113 /* called in private thread shared by all GDBusConnection instances
1114  *
1115  * write-lock is not held on entry
1116  * output_pending is PENDING_WRITE on entry
1117  */
1118 static void
1119 write_message_continue_writing (MessageToWriteData *data)
1120 {
1121   GOutputStream *ostream;
1122 #ifdef G_OS_UNIX
1123   GSimpleAsyncResult *simple;
1124   GUnixFDList *fd_list;
1125 #endif
1126
1127 #ifdef G_OS_UNIX
1128   /* Note: we can't access data->simple after calling g_async_result_complete () because the
1129    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
1130    */
1131   simple = data->simple;
1132 #endif
1133
1134   ostream = g_io_stream_get_output_stream (data->worker->stream);
1135 #ifdef G_OS_UNIX
1136   fd_list = g_dbus_message_get_unix_fd_list (data->message);
1137 #endif
1138
1139   g_assert (!g_output_stream_has_pending (ostream));
1140   g_assert_cmpint (data->total_written, <, data->blob_size);
1141
1142   if (FALSE)
1143     {
1144     }
1145 #ifdef G_OS_UNIX
1146   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
1147     {
1148       GOutputVector vector;
1149       GSocketControlMessage *control_message;
1150       gssize bytes_written;
1151       GError *error;
1152
1153       vector.buffer = data->blob;
1154       vector.size = data->blob_size;
1155
1156       control_message = NULL;
1157       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
1158         {
1159           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
1160             {
1161               g_simple_async_result_set_error (simple,
1162                                                G_IO_ERROR,
1163                                                G_IO_ERROR_FAILED,
1164                                                "Tried sending a file descriptor but remote peer does not support this capability");
1165               g_simple_async_result_complete (simple);
1166               g_object_unref (simple);
1167               goto out;
1168             }
1169           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
1170         }
1171
1172       error = NULL;
1173       bytes_written = g_socket_send_message (data->worker->socket,
1174                                              NULL, /* address */
1175                                              &vector,
1176                                              1,
1177                                              control_message != NULL ? &control_message : NULL,
1178                                              control_message != NULL ? 1 : 0,
1179                                              G_SOCKET_MSG_NONE,
1180                                              data->worker->cancellable,
1181                                              &error);
1182       if (control_message != NULL)
1183         g_object_unref (control_message);
1184
1185       if (bytes_written == -1)
1186         {
1187           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1188           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1189             {
1190               GSource *source;
1191               source = g_socket_create_source (data->worker->socket,
1192                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
1193                                                data->worker->cancellable);
1194               g_source_set_callback (source,
1195                                      (GSourceFunc) on_socket_ready,
1196                                      data,
1197                                      NULL); /* GDestroyNotify */
1198               g_source_attach (source, g_main_context_get_thread_default ());
1199               g_source_unref (source);
1200               g_error_free (error);
1201               goto out;
1202             }
1203           g_simple_async_result_take_error (simple, error);
1204           g_simple_async_result_complete (simple);
1205           g_object_unref (simple);
1206           goto out;
1207         }
1208       g_assert (bytes_written > 0); /* zero is never returned */
1209
1210       write_message_print_transport_debug (bytes_written, data);
1211
1212       data->total_written += bytes_written;
1213       g_assert (data->total_written <= data->blob_size);
1214       if (data->total_written == data->blob_size)
1215         {
1216           g_simple_async_result_complete (simple);
1217           g_object_unref (simple);
1218           goto out;
1219         }
1220
1221       write_message_continue_writing (data);
1222     }
1223 #endif
1224   else
1225     {
1226 #ifdef G_OS_UNIX
1227       if (fd_list != NULL)
1228         {
1229           g_simple_async_result_set_error (simple,
1230                                            G_IO_ERROR,
1231                                            G_IO_ERROR_FAILED,
1232                                            "Tried sending a file descriptor on unsupported stream of type %s",
1233                                            g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1234           g_simple_async_result_complete (simple);
1235           g_object_unref (simple);
1236           goto out;
1237         }
1238 #endif
1239
1240       g_output_stream_write_async (ostream,
1241                                    (const gchar *) data->blob + data->total_written,
1242                                    data->blob_size - data->total_written,
1243                                    G_PRIORITY_DEFAULT,
1244                                    data->worker->cancellable,
1245                                    write_message_async_cb,
1246                                    data);
1247     }
1248 #ifdef G_OS_UNIX
1249  out:
1250 #endif
1251   ;
1252 }
1253
1254 /* called in private thread shared by all GDBusConnection instances
1255  *
1256  * write-lock is not held on entry
1257  * output_pending is PENDING_WRITE on entry
1258  */
1259 static void
1260 write_message_async (GDBusWorker         *worker,
1261                      MessageToWriteData  *data,
1262                      GAsyncReadyCallback  callback,
1263                      gpointer             user_data)
1264 {
1265   data->simple = g_simple_async_result_new (NULL,
1266                                             callback,
1267                                             user_data,
1268                                             write_message_async);
1269   data->total_written = 0;
1270   write_message_continue_writing (data);
1271 }
1272
1273 /* called in private thread shared by all GDBusConnection instances (with write-lock held) */
1274 static gboolean
1275 write_message_finish (GAsyncResult   *res,
1276                       GError        **error)
1277 {
1278   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1279   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1280     return FALSE;
1281   else
1282     return TRUE;
1283 }
1284 /* ---------------------------------------------------------------------------------------------------- */
1285
1286 static void continue_writing (GDBusWorker *worker);
1287
1288 typedef struct
1289 {
1290   GDBusWorker *worker;
1291   GList *flushers;
1292 } FlushAsyncData;
1293
1294 static void
1295 flush_data_list_complete (const GList  *flushers,
1296                           const GError *error)
1297 {
1298   const GList *l;
1299
1300   for (l = flushers; l != NULL; l = l->next)
1301     {
1302       FlushData *f = l->data;
1303
1304       f->error = error != NULL ? g_error_copy (error) : NULL;
1305
1306       g_mutex_lock (&f->mutex);
1307       g_cond_signal (&f->cond);
1308       g_mutex_unlock (&f->mutex);
1309     }
1310 }
1311
1312 /* called in private thread shared by all GDBusConnection instances
1313  *
1314  * write-lock is not held on entry
1315  * output_pending is PENDING_FLUSH on entry
1316  */
1317 static void
1318 ostream_flush_cb (GObject      *source_object,
1319                   GAsyncResult *res,
1320                   gpointer      user_data)
1321 {
1322   FlushAsyncData *data = user_data;
1323   GError *error;
1324
1325   error = NULL;
1326   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1327                                 res,
1328                                 &error);
1329
1330   if (error == NULL)
1331     {
1332       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1333         {
1334           _g_dbus_debug_print_lock ();
1335           g_print ("========================================================================\n"
1336                    "GDBus-debug:Transport:\n"
1337                    "  ---- FLUSHED stream of type %s\n",
1338                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1339           _g_dbus_debug_print_unlock ();
1340         }
1341     }
1342
1343   g_assert (data->flushers != NULL);
1344   flush_data_list_complete (data->flushers, error);
1345   g_list_free (data->flushers);
1346
1347   if (error != NULL)
1348     g_error_free (error);
1349
1350   /* Make sure we tell folks that we don't have additional
1351      flushes pending */
1352   g_mutex_lock (&data->worker->write_lock);
1353   data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
1354   g_assert (data->worker->output_pending == PENDING_FLUSH);
1355   data->worker->output_pending = PENDING_NONE;
1356   g_mutex_unlock (&data->worker->write_lock);
1357
1358   /* OK, cool, finally kick off the next write */
1359   continue_writing (data->worker);
1360
1361   _g_dbus_worker_unref (data->worker);
1362   g_free (data);
1363 }
1364
1365 /* called in private thread shared by all GDBusConnection instances
1366  *
1367  * write-lock is not held on entry
1368  * output_pending is PENDING_FLUSH on entry
1369  */
1370 static void
1371 start_flush (FlushAsyncData *data)
1372 {
1373   g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
1374                                G_PRIORITY_DEFAULT,
1375                                data->worker->cancellable,
1376                                ostream_flush_cb,
1377                                data);
1378 }
1379
1380 /* called in private thread shared by all GDBusConnection instances
1381  *
1382  * write-lock is held on entry
1383  * output_pending is PENDING_NONE on entry
1384  */
1385 static void
1386 message_written_unlocked (GDBusWorker *worker,
1387                           MessageToWriteData *message_data)
1388 {
1389   if (G_UNLIKELY (_g_dbus_debug_message ()))
1390     {
1391       gchar *s;
1392       _g_dbus_debug_print_lock ();
1393       g_print ("========================================================================\n"
1394                "GDBus-debug:Message:\n"
1395                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1396                message_data->blob_size);
1397       s = g_dbus_message_print (message_data->message, 2);
1398       g_print ("%s", s);
1399       g_free (s);
1400       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1401         {
1402           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1403           g_print ("%s\n", s);
1404           g_free (s);
1405         }
1406       _g_dbus_debug_print_unlock ();
1407     }
1408
1409   worker->write_num_messages_written += 1;
1410 }
1411
1412 /* called in private thread shared by all GDBusConnection instances
1413  *
1414  * write-lock is held on entry
1415  * output_pending is PENDING_NONE on entry
1416  *
1417  * Returns: non-%NULL, setting @output_pending, if we need to flush now
1418  */
1419 static FlushAsyncData *
1420 prepare_flush_unlocked (GDBusWorker *worker)
1421 {
1422   GList *l;
1423   GList *ll;
1424   GList *flushers;
1425
1426   flushers = NULL;
1427   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1428     {
1429       FlushData *f = l->data;
1430       ll = l->next;
1431
1432       if (f->number_to_wait_for == worker->write_num_messages_written)
1433         {
1434           flushers = g_list_append (flushers, f);
1435           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1436         }
1437     }
1438   if (flushers != NULL)
1439     {
1440       g_assert (worker->output_pending == PENDING_NONE);
1441       worker->output_pending = PENDING_FLUSH;
1442     }
1443
1444   if (flushers != NULL)
1445     {
1446       FlushAsyncData *data;
1447
1448       data = g_new0 (FlushAsyncData, 1);
1449       data->worker = _g_dbus_worker_ref (worker);
1450       data->flushers = flushers;
1451       return data;
1452     }
1453
1454   return NULL;
1455 }
1456
1457 /* called in private thread shared by all GDBusConnection instances
1458  *
1459  * write-lock is not held on entry
1460  * output_pending is PENDING_WRITE on entry
1461  */
1462 static void
1463 write_message_cb (GObject       *source_object,
1464                   GAsyncResult  *res,
1465                   gpointer       user_data)
1466 {
1467   MessageToWriteData *data = user_data;
1468   GError *error;
1469
1470   g_mutex_lock (&data->worker->write_lock);
1471   g_assert (data->worker->output_pending == PENDING_WRITE);
1472   data->worker->output_pending = PENDING_NONE;
1473
1474   error = NULL;
1475   if (!write_message_finish (res, &error))
1476     {
1477       g_mutex_unlock (&data->worker->write_lock);
1478
1479       /* TODO: handle */
1480       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1481       g_error_free (error);
1482
1483       g_mutex_lock (&data->worker->write_lock);
1484     }
1485
1486   message_written_unlocked (data->worker, data);
1487
1488   g_mutex_unlock (&data->worker->write_lock);
1489
1490   continue_writing (data->worker);
1491
1492   message_to_write_data_free (data);
1493 }
1494
1495 /* called in private thread shared by all GDBusConnection instances
1496  *
1497  * write-lock is not held on entry
1498  * output_pending is PENDING_CLOSE on entry
1499  */
1500 static void
1501 iostream_close_cb (GObject      *source_object,
1502                    GAsyncResult *res,
1503                    gpointer      user_data)
1504 {
1505   GDBusWorker *worker = user_data;
1506   GError *error = NULL;
1507   GList *pending_close_attempts, *pending_flush_attempts;
1508   GQueue *send_queue;
1509
1510   g_io_stream_close_finish (worker->stream, res, &error);
1511
1512   g_mutex_lock (&worker->write_lock);
1513
1514   pending_close_attempts = worker->pending_close_attempts;
1515   worker->pending_close_attempts = NULL;
1516
1517   pending_flush_attempts = worker->write_pending_flushes;
1518   worker->write_pending_flushes = NULL;
1519
1520   send_queue = worker->write_queue;
1521   worker->write_queue = g_queue_new ();
1522
1523   g_assert (worker->output_pending == PENDING_CLOSE);
1524   worker->output_pending = PENDING_NONE;
1525
1526   g_mutex_unlock (&worker->write_lock);
1527
1528   while (pending_close_attempts != NULL)
1529     {
1530       CloseData *close_data = pending_close_attempts->data;
1531
1532       pending_close_attempts = g_list_delete_link (pending_close_attempts,
1533                                                    pending_close_attempts);
1534
1535       if (close_data->result != NULL)
1536         {
1537           if (error != NULL)
1538             g_simple_async_result_set_from_error (close_data->result, error);
1539
1540           /* this must be in an idle because the result is likely to be
1541            * intended for another thread
1542            */
1543           g_simple_async_result_complete_in_idle (close_data->result);
1544         }
1545
1546       close_data_free (close_data);
1547     }
1548
1549   g_clear_error (&error);
1550
1551   /* all messages queued for sending are discarded */
1552   g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
1553   /* all queued flushes fail */
1554   error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
1555                        _("Operation was cancelled"));
1556   flush_data_list_complete (pending_flush_attempts, error);
1557   g_list_free (pending_flush_attempts);
1558   g_clear_error (&error);
1559
1560   _g_dbus_worker_unref (worker);
1561 }
1562
1563 /* called in private thread shared by all GDBusConnection instances
1564  *
1565  * write-lock is not held on entry
1566  * output_pending must be PENDING_NONE on entry
1567  */
1568 static void
1569 continue_writing (GDBusWorker *worker)
1570 {
1571   MessageToWriteData *data;
1572   FlushAsyncData *flush_async_data;
1573
1574  write_next:
1575   /* we mustn't try to write two things at once */
1576   g_assert (worker->output_pending == PENDING_NONE);
1577
1578   g_mutex_lock (&worker->write_lock);
1579
1580   data = NULL;
1581   flush_async_data = NULL;
1582
1583   /* if we want to close the connection, that takes precedence */
1584   if (worker->pending_close_attempts != NULL)
1585     {
1586       worker->close_expected = TRUE;
1587       worker->output_pending = PENDING_CLOSE;
1588
1589       g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
1590                                NULL, iostream_close_cb,
1591                                _g_dbus_worker_ref (worker));
1592     }
1593   else
1594     {
1595       flush_async_data = prepare_flush_unlocked (worker);
1596
1597       if (flush_async_data == NULL)
1598         {
1599           data = g_queue_pop_head (worker->write_queue);
1600
1601           if (data != NULL)
1602             worker->output_pending = PENDING_WRITE;
1603         }
1604     }
1605
1606   g_mutex_unlock (&worker->write_lock);
1607
1608   /* Note that write_lock is only used for protecting the @write_queue
1609    * and @output_pending fields of the GDBusWorker struct ... which we
1610    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1611    *
1612    * Therefore, it's fine to drop it here when calling back into user
1613    * code and then writing the message out onto the GIOStream since this
1614    * function only runs on the worker thread.
1615    */
1616
1617   if (flush_async_data != NULL)
1618     {
1619       start_flush (flush_async_data);
1620       g_assert (data == NULL);
1621     }
1622   else if (data != NULL)
1623     {
1624       GDBusMessage *old_message;
1625       guchar *new_blob;
1626       gsize new_blob_size;
1627       GError *error;
1628
1629       old_message = data->message;
1630       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1631       if (data->message == old_message)
1632         {
1633           /* filters had no effect - do nothing */
1634         }
1635       else if (data->message == NULL)
1636         {
1637           /* filters dropped message */
1638           g_mutex_lock (&worker->write_lock);
1639           worker->output_pending = PENDING_NONE;
1640           g_mutex_unlock (&worker->write_lock);
1641           message_to_write_data_free (data);
1642           goto write_next;
1643         }
1644       else
1645         {
1646           /* filters altered the message -> reencode */
1647           error = NULL;
1648           new_blob = g_dbus_message_to_blob (data->message,
1649                                              &new_blob_size,
1650                                              worker->capabilities,
1651                                              &error);
1652           if (new_blob == NULL)
1653             {
1654               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1655                * the old message instead
1656                */
1657               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1658                          g_dbus_message_get_serial (data->message),
1659                          error->message);
1660               g_error_free (error);
1661             }
1662           else
1663             {
1664               g_free (data->blob);
1665               data->blob = (gchar *) new_blob;
1666               data->blob_size = new_blob_size;
1667             }
1668         }
1669
1670       write_message_async (worker,
1671                            data,
1672                            write_message_cb,
1673                            data);
1674     }
1675 }
1676
1677 /* called in private thread shared by all GDBusConnection instances
1678  *
1679  * write-lock is not held on entry
1680  * output_pending may be anything
1681  */
1682 static gboolean
1683 continue_writing_in_idle_cb (gpointer user_data)
1684 {
1685   GDBusWorker *worker = user_data;
1686
1687   /* Because this is the worker thread, we can read this struct member
1688    * without holding the lock: no other thread ever modifies it.
1689    */
1690   if (worker->output_pending == PENDING_NONE)
1691     continue_writing (worker);
1692
1693   return FALSE;
1694 }
1695
1696 /**
1697  * @write_data: (transfer full) (allow-none):
1698  * @flush_data: (transfer full) (allow-none):
1699  * @close_data: (transfer full) (allow-none):
1700  *
1701  * Can be called from any thread
1702  *
1703  * write_lock is held on entry
1704  * output_pending may be anything
1705  */
1706 static void
1707 schedule_writing_unlocked (GDBusWorker        *worker,
1708                            MessageToWriteData *write_data,
1709                            FlushData          *flush_data,
1710                            CloseData          *close_data)
1711 {
1712   if (write_data != NULL)
1713     g_queue_push_tail (worker->write_queue, write_data);
1714
1715   if (flush_data != NULL)
1716     worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
1717
1718   if (close_data != NULL)
1719     worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
1720                                                      close_data);
1721
1722   /* If we had output pending, the next bit of output will happen
1723    * automatically when it finishes, so we only need to do this
1724    * if nothing was pending.
1725    *
1726    * The idle callback will re-check that output_pending is still
1727    * PENDING_NONE, to guard against output starting before the idle.
1728    */
1729   if (worker->output_pending == PENDING_NONE)
1730     {
1731       GSource *idle_source;
1732       idle_source = g_idle_source_new ();
1733       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1734       g_source_set_callback (idle_source,
1735                              continue_writing_in_idle_cb,
1736                              _g_dbus_worker_ref (worker),
1737                              (GDestroyNotify) _g_dbus_worker_unref);
1738       g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb");
1739       g_source_attach (idle_source, worker->shared_thread_data->context);
1740       g_source_unref (idle_source);
1741     }
1742 }
1743
1744 /* ---------------------------------------------------------------------------------------------------- */
1745
1746 /* can be called from any thread - steals blob
1747  *
1748  * write_lock is not held on entry
1749  * output_pending may be anything
1750  */
1751 void
1752 _g_dbus_worker_send_message (GDBusWorker    *worker,
1753                              GDBusMessage   *message,
1754                              gchar          *blob,
1755                              gsize           blob_len)
1756 {
1757   MessageToWriteData *data;
1758
1759   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1760   g_return_if_fail (blob != NULL);
1761   g_return_if_fail (blob_len > 16);
1762
1763   data = g_new0 (MessageToWriteData, 1);
1764   data->worker = _g_dbus_worker_ref (worker);
1765   data->message = g_object_ref (message);
1766   data->blob = blob; /* steal! */
1767   data->blob_size = blob_len;
1768
1769   g_mutex_lock (&worker->write_lock);
1770   schedule_writing_unlocked (worker, data, NULL, NULL);
1771   g_mutex_unlock (&worker->write_lock);
1772 }
1773
1774 /* ---------------------------------------------------------------------------------------------------- */
1775
1776 GDBusWorker *
1777 _g_dbus_worker_new (GIOStream                              *stream,
1778                     GDBusCapabilityFlags                    capabilities,
1779                     gboolean                                initially_frozen,
1780                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1781                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1782                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1783                     gpointer                                user_data)
1784 {
1785   GDBusWorker *worker;
1786   GSource *idle_source;
1787
1788   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1789   g_return_val_if_fail (message_received_callback != NULL, NULL);
1790   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1791   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1792
1793   worker = g_new0 (GDBusWorker, 1);
1794   worker->ref_count = 1;
1795
1796   g_mutex_init (&worker->read_lock);
1797   worker->message_received_callback = message_received_callback;
1798   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1799   worker->disconnected_callback = disconnected_callback;
1800   worker->user_data = user_data;
1801   worker->stream = g_object_ref (stream);
1802   worker->capabilities = capabilities;
1803   worker->cancellable = g_cancellable_new ();
1804   worker->output_pending = PENDING_NONE;
1805
1806   worker->frozen = initially_frozen;
1807   worker->received_messages_while_frozen = g_queue_new ();
1808
1809   g_mutex_init (&worker->write_lock);
1810   worker->write_queue = g_queue_new ();
1811
1812   if (G_IS_SOCKET_CONNECTION (worker->stream))
1813     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1814
1815 #if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
1816   if (G_IS_KDBUS_CONNECTION (worker->stream))
1817     worker->kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream));
1818 #endif
1819
1820   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1821
1822   /* begin reading */
1823   idle_source = g_idle_source_new ();
1824   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1825   g_source_set_callback (idle_source,
1826                          _g_dbus_worker_do_initial_read,
1827                          _g_dbus_worker_ref (worker),
1828                          (GDestroyNotify) _g_dbus_worker_unref);
1829   g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read");
1830   g_source_attach (idle_source, worker->shared_thread_data->context);
1831   g_source_unref (idle_source);
1832
1833   return worker;
1834 }
1835
1836 /* ---------------------------------------------------------------------------------------------------- */
1837
1838 /* can be called from any thread
1839  *
1840  * write_lock is not held on entry
1841  * output_pending may be anything
1842  */
1843 void
1844 _g_dbus_worker_close (GDBusWorker         *worker,
1845                       GCancellable        *cancellable,
1846                       GSimpleAsyncResult  *result)
1847 {
1848   CloseData *close_data;
1849
1850   close_data = g_slice_new0 (CloseData);
1851   close_data->worker = _g_dbus_worker_ref (worker);
1852   close_data->cancellable =
1853       (cancellable == NULL ? NULL : g_object_ref (cancellable));
1854   close_data->result = (result == NULL ? NULL : g_object_ref (result));
1855
1856   /* Don't set worker->close_expected here - we're in the wrong thread.
1857    * It'll be set before the actual close happens.
1858    */
1859   g_cancellable_cancel (worker->cancellable);
1860   g_mutex_lock (&worker->write_lock);
1861   schedule_writing_unlocked (worker, NULL, NULL, close_data);
1862   g_mutex_unlock (&worker->write_lock);
1863 }
1864
1865 /* This can be called from any thread - frees worker. Note that
1866  * callbacks might still happen if called from another thread than the
1867  * worker - use your own synchronization primitive in the callbacks.
1868  *
1869  * write_lock is not held on entry
1870  * output_pending may be anything
1871  */
1872 void
1873 _g_dbus_worker_stop (GDBusWorker *worker)
1874 {
1875   g_atomic_int_set (&worker->stopped, TRUE);
1876
1877   /* Cancel any pending operations and schedule a close of the underlying I/O
1878    * stream in the worker thread
1879    */
1880   _g_dbus_worker_close (worker, NULL, NULL);
1881
1882   /* _g_dbus_worker_close holds a ref until after an idle in the worker
1883    * thread has run, so we no longer need to unref in an idle like in
1884    * commit 322e25b535
1885    */
1886   _g_dbus_worker_unref (worker);
1887 }
1888
1889 /* ---------------------------------------------------------------------------------------------------- */
1890
1891 /* can be called from any thread (except the worker thread) - blocks
1892  * calling thread until all queued outgoing messages are written and
1893  * the transport has been flushed
1894  *
1895  * write_lock is not held on entry
1896  * output_pending may be anything
1897  */
1898 gboolean
1899 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1900                            GCancellable   *cancellable,
1901                            GError        **error)
1902 {
1903   gboolean ret;
1904   FlushData *data;
1905   guint64 pending_writes;
1906
1907   data = NULL;
1908   ret = TRUE;
1909
1910   g_mutex_lock (&worker->write_lock);
1911
1912   /* if the queue is empty, no write is in-flight and we haven't written
1913    * anything since the last flush, then there's nothing to wait for
1914    */
1915   pending_writes = g_queue_get_length (worker->write_queue);
1916
1917   /* if a write is in-flight, we shouldn't be satisfied until the first
1918    * flush operation that follows it
1919    */
1920   if (worker->output_pending == PENDING_WRITE)
1921     pending_writes += 1;
1922
1923   if (pending_writes > 0 ||
1924       worker->write_num_messages_written != worker->write_num_messages_flushed)
1925     {
1926       data = g_new0 (FlushData, 1);
1927       g_mutex_init (&data->mutex);
1928       g_cond_init (&data->cond);
1929       data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
1930       g_mutex_lock (&data->mutex);
1931
1932       schedule_writing_unlocked (worker, NULL, data, NULL);
1933     }
1934   g_mutex_unlock (&worker->write_lock);
1935
1936   if (data != NULL)
1937     {
1938       g_cond_wait (&data->cond, &data->mutex);
1939       g_mutex_unlock (&data->mutex);
1940
1941       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1942       g_cond_clear (&data->cond);
1943       g_mutex_clear (&data->mutex);
1944       if (data->error != NULL)
1945         {
1946           ret = FALSE;
1947           g_propagate_error (error, data->error);
1948         }
1949       g_free (data);
1950     }
1951
1952   return ret;
1953 }
1954
1955 /* ---------------------------------------------------------------------------------------------------- */
1956
1957 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1958 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1959 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1960 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1961 #define G_DBUS_DEBUG_CALL           (1<<4)
1962 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1963 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1964 #define G_DBUS_DEBUG_RETURN         (1<<7)
1965 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1966 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1967
1968 static gint _gdbus_debug_flags = 0;
1969
1970 gboolean
1971 _g_dbus_debug_authentication (void)
1972 {
1973   _g_dbus_initialize ();
1974   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1975 }
1976
1977 gboolean
1978 _g_dbus_debug_transport (void)
1979 {
1980   _g_dbus_initialize ();
1981   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1982 }
1983
1984 gboolean
1985 _g_dbus_debug_message (void)
1986 {
1987   _g_dbus_initialize ();
1988   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1989 }
1990
1991 gboolean
1992 _g_dbus_debug_payload (void)
1993 {
1994   _g_dbus_initialize ();
1995   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1996 }
1997
1998 gboolean
1999 _g_dbus_debug_call (void)
2000 {
2001   _g_dbus_initialize ();
2002   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
2003 }
2004
2005 gboolean
2006 _g_dbus_debug_signal (void)
2007 {
2008   _g_dbus_initialize ();
2009   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
2010 }
2011
2012 gboolean
2013 _g_dbus_debug_incoming (void)
2014 {
2015   _g_dbus_initialize ();
2016   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
2017 }
2018
2019 gboolean
2020 _g_dbus_debug_return (void)
2021 {
2022   _g_dbus_initialize ();
2023   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
2024 }
2025
2026 gboolean
2027 _g_dbus_debug_emission (void)
2028 {
2029   _g_dbus_initialize ();
2030   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
2031 }
2032
2033 gboolean
2034 _g_dbus_debug_address (void)
2035 {
2036   _g_dbus_initialize ();
2037   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
2038 }
2039
2040 G_LOCK_DEFINE_STATIC (print_lock);
2041
2042 void
2043 _g_dbus_debug_print_lock (void)
2044 {
2045   G_LOCK (print_lock);
2046 }
2047
2048 void
2049 _g_dbus_debug_print_unlock (void)
2050 {
2051   G_UNLOCK (print_lock);
2052 }
2053
2054 /**
2055  * _g_dbus_initialize:
2056  *
2057  * Does various one-time init things such as
2058  *
2059  *  - registering the G_DBUS_ERROR error domain
2060  *  - parses the G_DBUS_DEBUG environment variable
2061  */
2062 void
2063 _g_dbus_initialize (void)
2064 {
2065   static volatile gsize initialized = 0;
2066
2067   if (g_once_init_enter (&initialized))
2068     {
2069       volatile GQuark g_dbus_error_domain;
2070       const gchar *debug;
2071
2072       g_dbus_error_domain = G_DBUS_ERROR;
2073       (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
2074
2075       debug = g_getenv ("G_DBUS_DEBUG");
2076       if (debug != NULL)
2077         {
2078           const GDebugKey keys[] = {
2079             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
2080             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
2081             { "message",        G_DBUS_DEBUG_MESSAGE        },
2082             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
2083             { "call",           G_DBUS_DEBUG_CALL           },
2084             { "signal",         G_DBUS_DEBUG_SIGNAL         },
2085             { "incoming",       G_DBUS_DEBUG_INCOMING       },
2086             { "return",         G_DBUS_DEBUG_RETURN         },
2087             { "emission",       G_DBUS_DEBUG_EMISSION       },
2088             { "address",        G_DBUS_DEBUG_ADDRESS        }
2089           };
2090
2091           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
2092           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
2093             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
2094         }
2095
2096       g_once_init_leave (&initialized, 1);
2097     }
2098 }
2099
2100 /* ---------------------------------------------------------------------------------------------------- */
2101
2102 GVariantType *
2103 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
2104 {
2105   const GVariantType *arg_types[256];
2106   guint n;
2107
2108   if (args)
2109     for (n = 0; args[n] != NULL; n++)
2110       {
2111         /* DBus places a hard limit of 255 on signature length.
2112          * therefore number of args must be less than 256.
2113          */
2114         g_assert (n < 256);
2115
2116         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
2117
2118         if G_UNLIKELY (arg_types[n] == NULL)
2119           return NULL;
2120       }
2121   else
2122     n = 0;
2123
2124   return g_variant_type_new_tuple (arg_types, n);
2125 }
2126
2127 /* ---------------------------------------------------------------------------------------------------- */
2128
2129 #ifdef G_OS_WIN32
2130
2131 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
2132
2133 gchar *
2134 _g_dbus_win32_get_user_sid (void)
2135 {
2136   HANDLE h;
2137   TOKEN_USER *user;
2138   DWORD token_information_len;
2139   PSID psid;
2140   gchar *sid;
2141   gchar *ret;
2142
2143   ret = NULL;
2144   user = NULL;
2145   h = INVALID_HANDLE_VALUE;
2146
2147   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
2148     {
2149       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
2150       goto out;
2151     }
2152
2153   /* Get length of buffer */
2154   token_information_len = 0;
2155   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
2156     {
2157       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
2158         {
2159           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2160           goto out;
2161         }
2162     }
2163   user = g_malloc (token_information_len);
2164   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
2165     {
2166       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2167       goto out;
2168     }
2169
2170   psid = user->User.Sid;
2171   if (!IsValidSid (psid))
2172     {
2173       g_warning ("Invalid SID");
2174       goto out;
2175     }
2176
2177   if (!ConvertSidToStringSidA (psid, &sid))
2178     {
2179       g_warning ("Invalid SID");
2180       goto out;
2181     }
2182
2183   ret = g_strdup (sid);
2184   LocalFree (sid);
2185
2186 out:
2187   g_free (user);
2188   if (h != INVALID_HANDLE_VALUE)
2189     CloseHandle (h);
2190   return ret;
2191 }
2192 #endif
2193
2194 /* ---------------------------------------------------------------------------------------------------- */
2195
2196 gchar *
2197 _g_dbus_get_machine_id (GError **error)
2198 {
2199 #ifdef G_OS_WIN32
2200   HW_PROFILE_INFOA info;
2201   char *src, *dest, *res;
2202   int i;
2203
2204   if (!GetCurrentHwProfileA (&info))
2205     {
2206       char *message = g_win32_error_message (GetLastError ());
2207       g_set_error (error,
2208                    G_IO_ERROR,
2209                    G_IO_ERROR_FAILED,
2210                    _("Unable to get Hardware profile: %s"), message);
2211       g_free (message);
2212       return NULL;
2213     }
2214
2215   /* Form: {12340001-4980-1920-6788-123456789012} */
2216   src = &info.szHwProfileGuid[0];
2217
2218   res = g_malloc (32+1);
2219   dest = res;
2220
2221   src++; /* Skip { */
2222   for (i = 0; i < 8; i++)
2223     *dest++ = *src++;
2224   src++; /* Skip - */
2225   for (i = 0; i < 4; i++)
2226     *dest++ = *src++;
2227   src++; /* Skip - */
2228   for (i = 0; i < 4; i++)
2229     *dest++ = *src++;
2230   src++; /* Skip - */
2231   for (i = 0; i < 4; i++)
2232     *dest++ = *src++;
2233   src++; /* Skip - */
2234   for (i = 0; i < 12; i++)
2235     *dest++ = *src++;
2236   *dest = 0;
2237
2238   return res;
2239 #else
2240   gchar *ret;
2241   GError *first_error;
2242   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
2243   ret = NULL;
2244   first_error = NULL;
2245   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
2246                             &ret,
2247                             NULL,
2248                             &first_error) &&
2249       !g_file_get_contents ("/etc/machine-id",
2250                             &ret,
2251                             NULL,
2252                             NULL))
2253     {
2254       g_propagate_prefixed_error (error, first_error,
2255                                   _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
2256     }
2257   else
2258     {
2259       /* ignore the error from the first try, if any */
2260       g_clear_error (&first_error);
2261       /* TODO: validate value */
2262       g_strstrip (ret);
2263     }
2264   return ret;
2265 #endif
2266 }
2267
2268 /* ---------------------------------------------------------------------------------------------------- */
2269
2270 gchar *
2271 _g_dbus_enum_to_string (GType enum_type, gint value)
2272 {
2273   gchar *ret;
2274   GEnumClass *klass;
2275   GEnumValue *enum_value;
2276
2277   klass = g_type_class_ref (enum_type);
2278   enum_value = g_enum_get_value (klass, value);
2279   if (enum_value != NULL)
2280     ret = g_strdup (enum_value->value_nick);
2281   else
2282     ret = g_strdup_printf ("unknown (value %d)", value);
2283   g_type_class_unref (klass);
2284   return ret;
2285 }
2286
2287 /* ---------------------------------------------------------------------------------------------------- */
2288
2289 static void
2290 write_message_print_transport_debug (gssize bytes_written,
2291                                      MessageToWriteData *data)
2292 {
2293   if (G_LIKELY (!_g_dbus_debug_transport ()))
2294     goto out;
2295
2296   _g_dbus_debug_print_lock ();
2297   g_print ("========================================================================\n"
2298            "GDBus-debug:Transport:\n"
2299            "  >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2300            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
2301            bytes_written,
2302            g_dbus_message_get_serial (data->message),
2303            data->blob_size,
2304            data->total_written,
2305            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
2306   _g_dbus_debug_print_unlock ();
2307  out:
2308   ;
2309 }
2310
2311 /* ---------------------------------------------------------------------------------------------------- */
2312
2313 static void
2314 read_message_print_transport_debug (gssize bytes_read,
2315                                     GDBusWorker *worker)
2316 {
2317   gsize size;
2318   gint32 serial;
2319   gint32 message_length;
2320
2321   if (G_LIKELY (!_g_dbus_debug_transport ()))
2322     goto out;
2323
2324   size = bytes_read + worker->read_buffer_cur_size;
2325   serial = 0;
2326   message_length = 0;
2327   if (size >= 16)
2328     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
2329   if (size >= 1)
2330     {
2331       switch (worker->read_buffer[0])
2332         {
2333         case 'l':
2334           if (size >= 12)
2335             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
2336           break;
2337         case 'B':
2338           if (size >= 12)
2339             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
2340           break;
2341         default:
2342           /* an error will be set elsewhere if this happens */
2343           goto out;
2344         }
2345     }
2346
2347     _g_dbus_debug_print_lock ();
2348   g_print ("========================================================================\n"
2349            "GDBus-debug:Transport:\n"
2350            "  <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2351            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
2352            bytes_read,
2353            serial,
2354            message_length,
2355            worker->read_buffer_cur_size,
2356            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
2357   _g_dbus_debug_print_unlock ();
2358  out:
2359   ;
2360 }
2361
2362 /* ---------------------------------------------------------------------------------------------------- */
2363
2364 gboolean
2365 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
2366                                      GValue                *return_accu,
2367                                      const GValue          *handler_return,
2368                                      gpointer               dummy)
2369 {
2370   gboolean continue_emission;
2371   gboolean signal_return;
2372
2373   signal_return = g_value_get_boolean (handler_return);
2374   g_value_set_boolean (return_accu, signal_return);
2375   continue_emission = signal_return;
2376
2377   return continue_emission;
2378 }