Removed kdbus file descriptor condition check
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: David Zeuthen <davidz@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <stdlib.h>
26 #include <string.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30
31 #include "giotypes.h"
32 #include "gsocket.h"
33 #include "gdbusprivate.h"
34 #include "gdbusmessage.h"
35 #include "gdbuserror.h"
36 #include "gdbusintrospection.h"
37 #include "gasyncresult.h"
38 #include "gsimpleasyncresult.h"
39 #include "ginputstream.h"
40 #include "gmemoryinputstream.h"
41 #include "giostream.h"
42 #include "glib/gstdio.h"
43 #include "gsocketcontrolmessage.h"
44 #include "gsocketconnection.h"
45 #include "gkdbusconnection.h"
46 #include "gsocketoutputstream.h"
47
48 #ifdef G_OS_UNIX
49 #include "gunixfdmessage.h"
50 #include "gunixconnection.h"
51 #include "gunixcredentialsmessage.h"
52 #endif
53
54 #ifdef G_OS_WIN32
55 #include <windows.h>
56 #endif
57
58 #include "glibintl.h"
59
60 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
61
62 /* ---------------------------------------------------------------------------------------------------- */
63
64 gchar *
65 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
66 {
67  guint n, m;
68  GString *ret;
69
70  ret = g_string_new (NULL);
71
72  for (n = 0; n < len; n += 16)
73    {
74      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
75
76      for (m = n; m < n + 16; m++)
77        {
78          if (m > n && (m%4) == 0)
79            g_string_append_c (ret, ' ');
80          if (m < len)
81            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
82          else
83            g_string_append (ret, "   ");
84        }
85
86      g_string_append (ret, "   ");
87
88      for (m = n; m < len && m < n + 16; m++)
89        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
90
91      g_string_append_c (ret, '\n');
92    }
93
94  return g_string_free (ret, FALSE);
95 }
96
97 /* ---------------------------------------------------------------------------------------------------- */
98
99 /* Unfortunately ancillary messages are discarded when reading from a
100  * socket using the GSocketInputStream abstraction. So we provide a
101  * very GInputStream-ish API that uses GSocket in this case (very
102  * similar to GSocketInputStream).
103  */
104
105 typedef struct
106 {
107   GSocket *socket;
108   GCancellable *cancellable;
109
110   void *buffer;
111   gsize count;
112
113   GSocketControlMessage ***messages;
114   gint *num_messages;
115
116   GSimpleAsyncResult *simple;
117
118   gboolean from_mainloop;
119 } ReadWithControlData;
120
121 typedef struct
122 {
123   GKdbus *kdbus;
124   GCancellable *cancellable;
125
126   void *buffer;
127   gsize count;
128
129   GSimpleAsyncResult *simple;
130
131   gboolean from_mainloop;
132 } ReadKdbusData;
133
134 static void
135 read_kdbus_data_free (ReadKdbusData *data)
136 {
137   //g_object_unref (data->kdbus); TODO
138   if (data->cancellable != NULL)
139     g_object_unref (data->cancellable);
140   g_object_unref (data->simple);
141   g_free (data);
142 }
143
144 static void
145 read_with_control_data_free (ReadWithControlData *data)
146 {
147   g_object_unref (data->socket);
148   if (data->cancellable != NULL)
149     g_object_unref (data->cancellable);
150   g_object_unref (data->simple);
151   g_free (data);
152 }
153
154 static gboolean
155 _g_kdbus_read_ready (GKdbus      *kdbus,
156                      GIOCondition  condition,
157                      gpointer      user_data)
158 {
159   ReadKdbusData *data = user_data;
160   GError *error;
161   gssize result;
162   
163   error = NULL;
164   
165   result = g_kdbus_receive (data->kdbus,
166                             data->buffer,
167                             &error);
168   if (result >= 0)
169     {
170       g_simple_async_result_set_op_res_gssize (data->simple, result);
171     }
172   else
173     {
174       g_assert (error != NULL);
175       g_simple_async_result_take_error (data->simple, error);
176     }
177
178   if (data->from_mainloop)
179     g_simple_async_result_complete (data->simple);
180   else
181     g_simple_async_result_complete_in_idle (data->simple);
182
183   return FALSE;
184 }
185
186 static gboolean
187 _g_socket_read_with_control_messages_ready (GSocket      *socket,
188                                             GIOCondition  condition,
189                                             gpointer      user_data)
190 {
191   ReadWithControlData *data = user_data;
192   GError *error;
193   gssize result;
194   GInputVector vector;
195
196   error = NULL;
197   vector.buffer = data->buffer;
198   vector.size = data->count;
199   result = g_socket_receive_message (data->socket,
200                                      NULL, /* address */
201                                      &vector,
202                                      1,
203                                      data->messages,
204                                      data->num_messages,
205                                      NULL,
206                                      data->cancellable,
207                                      &error);
208   if (result >= 0)
209     {
210       g_simple_async_result_set_op_res_gssize (data->simple, result);
211     }
212   else
213     {
214       g_assert (error != NULL);
215       g_simple_async_result_take_error (data->simple, error);
216     }
217
218   if (data->from_mainloop)
219     g_simple_async_result_complete (data->simple);
220   else
221     g_simple_async_result_complete_in_idle (data->simple);
222
223   return FALSE;
224 }
225
226 static void
227 _g_kdbus_read (GKdbus                  *kdbus,
228                void                    *buffer,
229                gsize                    count,
230                GCancellable            *cancellable,
231                GAsyncReadyCallback      callback,
232                gpointer                 user_data)
233 {
234   ReadKdbusData *data;
235   GSource *source;
236
237   data = g_new0 (ReadKdbusData, 1);
238   data->kdbus = kdbus; /*g_object_ref (socket);*/
239   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
240   data->buffer = buffer;
241   data->count = count;
242
243   data->simple = g_simple_async_result_new (G_OBJECT (kdbus),
244                                             callback,
245                                             user_data,
246                                             _g_kdbus_read);
247   g_simple_async_result_set_check_cancellable (data->simple, cancellable);
248
249   data->from_mainloop = TRUE;
250   source = g_kdbus_create_source (data->kdbus,
251                                    G_IO_IN | G_IO_HUP | G_IO_ERR,
252                                    cancellable);
253   g_source_set_callback (source,
254                          (GSourceFunc) _g_kdbus_read_ready,
255                          data,
256                          (GDestroyNotify) read_kdbus_data_free);
257   g_source_attach (source, g_main_context_get_thread_default ());
258   g_source_unref (source);
259 }
260
261 static void
262 _g_socket_read_with_control_messages (GSocket                 *socket,
263                                       void                    *buffer,
264                                       gsize                    count,
265                                       GSocketControlMessage ***messages,
266                                       gint                    *num_messages,
267                                       gint                     io_priority,
268                                       GCancellable            *cancellable,
269                                       GAsyncReadyCallback      callback,
270                                       gpointer                 user_data)
271 {
272   ReadWithControlData *data;
273
274   data = g_new0 (ReadWithControlData, 1);
275   data->socket = g_object_ref (socket);
276   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
277   data->buffer = buffer;
278   data->count = count;
279   data->messages = messages;
280   data->num_messages = num_messages;
281
282   data->simple = g_simple_async_result_new (G_OBJECT (socket),
283                                             callback,
284                                             user_data,
285                                             _g_socket_read_with_control_messages);
286   g_simple_async_result_set_check_cancellable (data->simple, cancellable);
287
288   if (!g_socket_condition_check (socket, G_IO_IN))
289     {
290       GSource *source;
291       data->from_mainloop = TRUE;
292       source = g_socket_create_source (data->socket,
293                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
294                                        cancellable);
295       g_source_set_callback (source,
296                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
297                              data,
298                              (GDestroyNotify) read_with_control_data_free);
299       g_source_attach (source, g_main_context_get_thread_default ());
300       g_source_unref (source);
301     }
302   else
303     {
304       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
305       read_with_control_data_free (data);
306     }
307 }
308
309 static gssize
310 _g_kdbus_read_finish (GKdbus       *kdbus,
311                                              GAsyncResult  *result,
312                                              GError       **error)
313 {
314   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
315
316   g_return_val_if_fail (G_IS_KDBUS (kdbus), -1);
317   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read);
318
319   if (g_simple_async_result_propagate_error (simple, error))
320       return -1;
321   else
322     return g_simple_async_result_get_op_res_gssize (simple);
323 }
324
325 static gssize
326 _g_socket_read_with_control_messages_finish (GSocket       *socket,
327                                              GAsyncResult  *result,
328                                              GError       **error)
329 {
330   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
331
332   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
333   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
334
335   if (g_simple_async_result_propagate_error (simple, error))
336       return -1;
337   else
338     return g_simple_async_result_get_op_res_gssize (simple);
339 }
340
341 /* ---------------------------------------------------------------------------------------------------- */
342
343 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
344
345 static GPtrArray *ensured_classes = NULL;
346
347 static void
348 ensure_type (GType gtype)
349 {
350   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
351 }
352
353 static void
354 release_required_types (void)
355 {
356   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
357   g_ptr_array_unref (ensured_classes);
358   ensured_classes = NULL;
359 }
360
361 static void
362 ensure_required_types (void)
363 {
364   g_assert (ensured_classes == NULL);
365   ensured_classes = g_ptr_array_new ();
366   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
367   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
368 }
369 /* ---------------------------------------------------------------------------------------------------- */
370
371 typedef struct
372 {
373   volatile gint refcount;
374   GThread *thread;
375   GMainContext *context;
376   GMainLoop *loop;
377 } SharedThreadData;
378
379 static gpointer
380 gdbus_shared_thread_func (gpointer user_data)
381 {
382   SharedThreadData *data = user_data;
383
384   g_main_context_push_thread_default (data->context);
385   g_main_loop_run (data->loop);
386   g_main_context_pop_thread_default (data->context);
387
388   release_required_types ();
389
390   return NULL;
391 }
392
393 /* ---------------------------------------------------------------------------------------------------- */
394
395 static SharedThreadData *
396 _g_dbus_shared_thread_ref (void)
397 {
398   static gsize shared_thread_data = 0;
399   SharedThreadData *ret;
400
401   if (g_once_init_enter (&shared_thread_data))
402     {
403       SharedThreadData *data;
404
405       /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
406       ensure_required_types ();
407
408       data = g_new0 (SharedThreadData, 1);
409       data->refcount = 0;
410       
411       data->context = g_main_context_new ();
412       data->loop = g_main_loop_new (data->context, FALSE);
413       data->thread = g_thread_new ("gdbus",
414                                    gdbus_shared_thread_func,
415                                    data);
416       /* We can cast between gsize and gpointer safely */
417       g_once_init_leave (&shared_thread_data, (gsize) data);
418     }
419
420   ret = (SharedThreadData*) shared_thread_data;
421   g_atomic_int_inc (&ret->refcount);
422   return ret;
423 }
424
425 static void
426 _g_dbus_shared_thread_unref (SharedThreadData *data)
427 {
428   /* TODO: actually destroy the shared thread here */
429 #if 0
430   g_assert (data != NULL);
431   if (g_atomic_int_dec_and_test (&data->refcount))
432     {
433       g_main_loop_quit (data->loop);
434       //g_thread_join (data->thread);
435       g_main_loop_unref (data->loop);
436       g_main_context_unref (data->context);
437     }
438 #endif
439 }
440
441 /* ---------------------------------------------------------------------------------------------------- */
442
443 typedef enum {
444     PENDING_NONE = 0,
445     PENDING_WRITE,
446     PENDING_FLUSH,
447     PENDING_CLOSE
448 } OutputPending;
449
450 struct GDBusWorker
451 {
452   volatile gint                       ref_count;
453
454   SharedThreadData                   *shared_thread_data;
455
456   /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
457   volatile gint                       stopped;
458
459   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
460    * only affects messages received from the other peer (since GDBusServer is the
461    * only user) - we might want it to affect messages sent to the other peer too?
462    */
463   gboolean                            frozen;
464   GDBusCapabilityFlags                capabilities;
465   GQueue                             *received_messages_while_frozen;
466
467   GIOStream                          *stream;
468   GCancellable                       *cancellable;
469   GDBusWorkerMessageReceivedCallback  message_received_callback;
470   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
471   GDBusWorkerDisconnectedCallback     disconnected_callback;
472   gpointer                            user_data;
473
474   /* if GSocket and GKdbus are NULL, stream is GSocketConnection */
475   GSocket *socket;
476   GKdbus  *kdbus;
477
478   /* used for reading */
479   GMutex                              read_lock;
480   gchar                              *read_buffer;
481   gsize                               read_buffer_allocated_size;
482   gsize                               read_buffer_cur_size;
483   gsize                               read_buffer_bytes_wanted;
484   GUnixFDList                        *read_fd_list;
485   GSocketControlMessage             **read_ancillary_messages;
486   gint                                read_num_ancillary_messages;
487
488   /* Whether an async write, flush or close, or none of those, is pending.
489    * Only the worker thread may change its value, and only with the write_lock.
490    * Other threads may read its value when holding the write_lock.
491    * The worker thread may read its value at any time.
492    */
493   OutputPending                       output_pending;
494   /* used for writing */
495   GMutex                              write_lock;
496   /* queue of MessageToWriteData, protected by write_lock */
497   GQueue                             *write_queue;
498   /* protected by write_lock */
499   guint64                             write_num_messages_written;
500   /* number of messages we'd written out last time we flushed;
501    * protected by write_lock
502    */
503   guint64                             write_num_messages_flushed;
504   /* list of FlushData, protected by write_lock */
505   GList                              *write_pending_flushes;
506   /* list of CloseData, protected by write_lock */
507   GList                              *pending_close_attempts;
508   /* no lock - only used from the worker thread */
509   gboolean                            close_expected;
510 };
511
512 static void _g_dbus_worker_unref (GDBusWorker *worker);
513
514 /* ---------------------------------------------------------------------------------------------------- */
515
516 typedef struct
517 {
518   GMutex  mutex;
519   GCond   cond;
520   guint64 number_to_wait_for;
521   GError *error;
522 } FlushData;
523
524 struct _MessageToWriteData ;
525 typedef struct _MessageToWriteData MessageToWriteData;
526
527 static void message_to_write_data_free (MessageToWriteData *data);
528
529 static void read_message_print_transport_debug (gssize bytes_read,
530                                                 GDBusWorker *worker);
531
532 static void write_message_print_transport_debug (gssize bytes_written,
533                                                  MessageToWriteData *data);
534
535 typedef struct {
536     GDBusWorker *worker;
537     GCancellable *cancellable;
538     GSimpleAsyncResult *result;
539 } CloseData;
540
541 static void close_data_free (CloseData *close_data)
542 {
543   if (close_data->cancellable != NULL)
544     g_object_unref (close_data->cancellable);
545
546   if (close_data->result != NULL)
547     g_object_unref (close_data->result);
548
549   _g_dbus_worker_unref (close_data->worker);
550   g_slice_free (CloseData, close_data);
551 }
552
553 /* ---------------------------------------------------------------------------------------------------- */
554
555 static GDBusWorker *
556 _g_dbus_worker_ref (GDBusWorker *worker)
557 {
558   g_atomic_int_inc (&worker->ref_count);
559   return worker;
560 }
561
562 static void
563 _g_dbus_worker_unref (GDBusWorker *worker)
564 {
565   if (g_atomic_int_dec_and_test (&worker->ref_count))
566     {
567       g_assert (worker->write_pending_flushes == NULL);
568
569       _g_dbus_shared_thread_unref (worker->shared_thread_data);
570
571       g_object_unref (worker->stream);
572
573       g_mutex_clear (&worker->read_lock);
574       g_object_unref (worker->cancellable);
575       if (worker->read_fd_list != NULL)
576         g_object_unref (worker->read_fd_list);
577
578       g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
579       g_mutex_clear (&worker->write_lock);
580       g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
581       g_free (worker->read_buffer);
582
583       g_free (worker);
584     }
585 }
586
587 static void
588 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
589                                   gboolean      remote_peer_vanished,
590                                   GError       *error)
591 {
592   if (!g_atomic_int_get (&worker->stopped))
593     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
594 }
595
596 static void
597 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
598                                       GDBusMessage *message)
599 {
600   if (!g_atomic_int_get (&worker->stopped))
601     worker->message_received_callback (worker, message, worker->user_data);
602 }
603
604 static GDBusMessage *
605 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
606                                               GDBusMessage *message)
607 {
608   GDBusMessage *ret;
609   if (!g_atomic_int_get (&worker->stopped))
610     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
611   else
612     ret = message;
613   return ret;
614 }
615
616 /* can only be called from private thread with read-lock held - takes ownership of @message */
617 void
618 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
619                                                   GDBusMessage *message)
620 {
621   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
622     {
623       /* queue up */
624       g_queue_push_tail (worker->received_messages_while_frozen, message);
625     }
626   else
627     {
628       /* not frozen, nor anything in queue */
629       _g_dbus_worker_emit_message_received (worker, message);
630       g_object_unref (message);
631     }
632 }
633
634 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
635 static gboolean
636 unfreeze_in_idle_cb (gpointer user_data)
637 {
638   GDBusWorker *worker = user_data;
639   GDBusMessage *message;
640
641   g_mutex_lock (&worker->read_lock);
642   if (worker->frozen)
643     {
644       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
645         {
646           _g_dbus_worker_emit_message_received (worker, message);
647           g_object_unref (message);
648         }
649       worker->frozen = FALSE;
650     }
651   else
652     {
653       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
654     }
655   g_mutex_unlock (&worker->read_lock);
656   return FALSE;
657 }
658
659 /* can be called from any thread */
660 void
661 _g_dbus_worker_unfreeze (GDBusWorker *worker)
662 {
663   GSource *idle_source;
664   idle_source = g_idle_source_new ();
665   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
666   g_source_set_callback (idle_source,
667                          unfreeze_in_idle_cb,
668                          _g_dbus_worker_ref (worker),
669                          (GDestroyNotify) _g_dbus_worker_unref);
670   g_source_attach (idle_source, worker->shared_thread_data->context);
671   g_source_unref (idle_source);
672 }
673
674 /* ---------------------------------------------------------------------------------------------------- */
675
676 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
677
678 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
679 static void
680 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
681                            GAsyncResult  *res,
682                            gpointer       user_data)
683 {
684   GDBusWorker *worker = user_data;
685   GError *error;
686   gssize bytes_read;
687
688   g_mutex_lock (&worker->read_lock);
689
690   /* If already stopped, don't even process the reply */
691   if (g_atomic_int_get (&worker->stopped))
692     goto out;
693
694   error = NULL;
695   if (G_IS_KDBUS_CONNECTION (worker->stream))
696     {
697       bytes_read = _g_kdbus_read_finish (worker->kdbus,
698                                          res,
699                                          &error);
700   } else if (worker->socket == NULL)
701     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
702                                              res,
703                                              &error);
704   else
705     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
706                                                               res,
707                                                               &error);
708   if (worker->read_num_ancillary_messages > 0)
709     {
710       gint n;
711       for (n = 0; n < worker->read_num_ancillary_messages; n++)
712         {
713           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
714
715           if (FALSE)
716             {
717             }
718 #ifdef G_OS_UNIX
719           else if (G_IS_UNIX_FD_MESSAGE (control_message))
720             {
721               GUnixFDMessage *fd_message;
722               gint *fds;
723               gint num_fds;
724
725               fd_message = G_UNIX_FD_MESSAGE (control_message);
726               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
727               if (worker->read_fd_list == NULL)
728                 {
729                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
730                 }
731               else
732                 {
733                   gint n;
734                   for (n = 0; n < num_fds; n++)
735                     {
736                       /* TODO: really want a append_steal() */
737                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
738                       (void) g_close (fds[n], NULL);
739                     }
740                 }
741               g_free (fds);
742             }
743           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
744             {
745               /* do nothing */
746             }
747 #endif
748           else
749             {
750               if (error == NULL)
751                 {
752                   g_set_error (&error,
753                                G_IO_ERROR,
754                                G_IO_ERROR_FAILED,
755                                "Unexpected ancillary message of type %s received from peer",
756                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
757                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
758                   g_error_free (error);
759                   g_object_unref (control_message);
760                   n++;
761                   while (n < worker->read_num_ancillary_messages)
762                     g_object_unref (worker->read_ancillary_messages[n++]);
763                   g_free (worker->read_ancillary_messages);
764                   goto out;
765                 }
766             }
767           g_object_unref (control_message);
768         }
769       g_free (worker->read_ancillary_messages);
770     }
771
772   if (bytes_read == -1)
773     {
774       if (G_UNLIKELY (_g_dbus_debug_transport ()))
775         {
776           _g_dbus_debug_print_lock ();
777           g_print ("========================================================================\n"
778                    "GDBus-debug:Transport:\n"
779                    "  ---- READ ERROR on stream of type %s:\n"
780                    "  ---- %s %d: %s\n",
781                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
782                    g_quark_to_string (error->domain), error->code,
783                    error->message);
784           _g_dbus_debug_print_unlock ();
785         }
786
787       /* Every async read that uses this callback uses worker->cancellable
788        * as its GCancellable. worker->cancellable gets cancelled if and only
789        * if the GDBusConnection tells us to close (either via
790        * _g_dbus_worker_stop, which is called on last-unref, or directly),
791        * so a cancelled read must mean our connection was closed locally.
792        *
793        * If we're closing, other errors are possible - notably,
794        * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
795        * read in-flight. It seems sensible to treat all read errors during
796        * closing as an expected thing that doesn't trip exit-on-close.
797        *
798        * Because close_expected can't be set until we get into the worker
799        * thread, but the cancellable is signalled sooner (from another
800        * thread), we do still need to check the error.
801        */
802       if (worker->close_expected ||
803           g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
804         _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
805       else
806         _g_dbus_worker_emit_disconnected (worker, TRUE, error);
807
808       g_error_free (error);
809       goto out;
810     }
811
812 #if 0
813   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
814            (gint) bytes_read,
815            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
816            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
817            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
818                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
819            worker->stream,
820            worker);
821 #endif
822
823   /* TODO: hmm, hmm... */
824   if (bytes_read == 0)
825     {
826       g_set_error (&error,
827                    G_IO_ERROR,
828                    G_IO_ERROR_FAILED,
829                    "Underlying GIOStream returned 0 bytes on an async read");
830       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
831       g_error_free (error);
832       goto out;
833     }
834
835   read_message_print_transport_debug (bytes_read, worker);
836
837   worker->read_buffer_cur_size += bytes_read;
838
839   /* TODO: [KDBUS] Sprawdzic pole read_buffer_bytes_wanted */
840   if (G_IS_KDBUS_CONNECTION (worker->stream))
841     worker->read_buffer_bytes_wanted = worker->read_buffer_cur_size;
842
843   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
844     {
845       /* OK, got what we asked for! */
846       if (worker->read_buffer_bytes_wanted == 16)
847         {
848           gssize message_len;
849           /* OK, got the header - determine how many more bytes are needed */
850           error = NULL;
851           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
852                                                      16,
853                                                      &error);
854           if (message_len == -1)
855             {
856               g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
857               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
858               g_error_free (error);
859               goto out;
860             }
861
862           worker->read_buffer_bytes_wanted = message_len;
863           _g_dbus_worker_do_read_unlocked (worker);
864         }
865       else
866         {
867           GDBusMessage *message;
868           error = NULL;
869
870           /* TODO: use connection->priv->auth to decode the message */
871
872           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
873                                                   worker->read_buffer_cur_size,
874                                                   worker->capabilities,
875                                                   &error);
876           if (message == NULL)
877             {
878               gchar *s;
879               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
880               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
881                          "The error is: %s\n"
882                          "The payload is as follows:\n"
883                          "%s\n",
884                          worker->read_buffer_cur_size,
885                          error->message,
886                          s);
887               g_free (s);
888               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
889               g_error_free (error);
890               goto out;
891             }
892
893 #ifdef G_OS_UNIX
894           if (worker->read_fd_list != NULL)
895             {
896               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
897               g_object_unref (worker->read_fd_list);
898               worker->read_fd_list = NULL;
899             }
900 #endif
901
902           if (G_UNLIKELY (_g_dbus_debug_message ()))
903             {
904               gchar *s;
905               _g_dbus_debug_print_lock ();
906               g_print ("========================================================================\n"
907                        "GDBus-debug:Message:\n"
908                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
909                        worker->read_buffer_cur_size);
910               s = g_dbus_message_print (message, 2);
911               g_print ("%s", s);
912               g_free (s);
913               if (G_UNLIKELY (_g_dbus_debug_payload ()))
914                 {
915                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
916                   g_print ("%s\n", s);
917                   g_free (s);
918                 }
919               _g_dbus_debug_print_unlock ();
920             }
921
922           /* yay, got a message, go deliver it */
923           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
924
925           /* start reading another message! */
926           worker->read_buffer_bytes_wanted = 0;
927           worker->read_buffer_cur_size = 0;
928           _g_dbus_worker_do_read_unlocked (worker);
929         }
930     }
931   else
932     {
933       /* didn't get all the bytes we requested - so repeat the request... */
934       _g_dbus_worker_do_read_unlocked (worker);
935     }
936
937  out:
938   g_mutex_unlock (&worker->read_lock);
939
940   /* gives up the reference acquired when calling g_input_stream_read_async() */
941   _g_dbus_worker_unref (worker);
942 }
943
944 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
945 static void
946 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
947 {
948   /* Note that we do need to keep trying to read even if close_expected is
949    * true, because only failing a read causes us to signal 'closed'.
950    */
951
952   /* if bytes_wanted is zero, it means start reading a message */
953   if (worker->read_buffer_bytes_wanted == 0)
954     {
955       worker->read_buffer_cur_size = 0;
956       worker->read_buffer_bytes_wanted = 16;
957     }
958
959   /* ensure we have a (big enough) buffer */
960   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
961     {
962       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
963       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
964       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
965     }
966
967   if (G_IS_KDBUS_CONNECTION (worker->stream))
968     {
969       //GError *error;
970       //error = NULL;
971
972       
973       _g_kdbus_read(worker->kdbus, 
974                     worker->read_buffer,
975                     worker->read_buffer_bytes_wanted,
976                                             worker->cancellable,
977                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
978                                             _g_dbus_worker_ref (worker));
979       
980
981   } else if (worker->socket == NULL)
982     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
983                                worker->read_buffer + worker->read_buffer_cur_size,
984                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
985                                G_PRIORITY_DEFAULT,
986                                worker->cancellable,
987                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
988                                _g_dbus_worker_ref (worker));
989   else
990     {
991       worker->read_ancillary_messages = NULL;
992       worker->read_num_ancillary_messages = 0;
993       _g_socket_read_with_control_messages (worker->socket,
994                                             worker->read_buffer + worker->read_buffer_cur_size,
995                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
996                                             &worker->read_ancillary_messages,
997                                             &worker->read_num_ancillary_messages,
998                                             G_PRIORITY_DEFAULT,
999                                             worker->cancellable,
1000                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
1001                                             _g_dbus_worker_ref (worker));
1002     }
1003 }
1004
1005 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
1006 static gboolean
1007 _g_dbus_worker_do_initial_read (gpointer data)
1008 {
1009   GDBusWorker *worker = data;
1010   g_mutex_lock (&worker->read_lock);
1011   _g_dbus_worker_do_read_unlocked (worker);
1012   g_mutex_unlock (&worker->read_lock);
1013   return FALSE;
1014 }
1015
1016 /* ---------------------------------------------------------------------------------------------------- */
1017
1018 struct _MessageToWriteData
1019 {
1020   GDBusWorker  *worker;
1021   GDBusMessage *message;
1022   gchar        *blob;
1023   gsize         blob_size;
1024
1025   gsize               total_written;
1026   GSimpleAsyncResult *simple;
1027
1028 };
1029
1030 static void
1031 message_to_write_data_free (MessageToWriteData *data)
1032 {
1033   _g_dbus_worker_unref (data->worker);
1034   if (data->message)
1035     g_object_unref (data->message);
1036   g_free (data->blob);
1037   g_free (data);
1038 }
1039
1040 /* ---------------------------------------------------------------------------------------------------- */
1041
1042 static void write_message_continue_writing (MessageToWriteData *data);
1043
1044 /* called in private thread shared by all GDBusConnection instances
1045  *
1046  * write-lock is not held on entry
1047  * output_pending is PENDING_WRITE on entry
1048  */
1049 static void
1050 write_message_async_cb (GObject      *source_object,
1051                         GAsyncResult *res,
1052                         gpointer      user_data)
1053 {
1054   MessageToWriteData *data = user_data;
1055   GSimpleAsyncResult *simple;
1056   gssize bytes_written;
1057   GError *error;
1058
1059   /* Note: we can't access data->simple after calling g_async_result_complete () because the
1060    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
1061    */
1062   simple = data->simple;
1063
1064   error = NULL;
1065   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
1066                                                 res,
1067                                                 &error);
1068   if (bytes_written == -1)
1069     {
1070       g_simple_async_result_take_error (simple, error);
1071       g_simple_async_result_complete (simple);
1072       g_object_unref (simple);
1073       goto out;
1074     }
1075   g_assert (bytes_written > 0); /* zero is never returned */
1076
1077   write_message_print_transport_debug (bytes_written, data);
1078
1079   data->total_written += bytes_written;
1080   g_assert (data->total_written <= data->blob_size);
1081   if (data->total_written == data->blob_size)
1082     {
1083       g_simple_async_result_complete (simple);
1084       g_object_unref (simple);
1085       goto out;
1086     }
1087
1088   write_message_continue_writing (data);
1089
1090  out:
1091   ;
1092 }
1093
1094 /* called in private thread shared by all GDBusConnection instances
1095  *
1096  * write-lock is not held on entry
1097  * output_pending is PENDING_WRITE on entry
1098  */
1099 #ifdef G_OS_UNIX
1100 static gboolean
1101 on_socket_ready (GSocket      *socket,
1102                  GIOCondition  condition,
1103                  gpointer      user_data)
1104 {
1105   MessageToWriteData *data = user_data;
1106   write_message_continue_writing (data);
1107   return FALSE; /* remove source */
1108 }
1109 #endif
1110
1111 /* called in private thread shared by all GDBusConnection instances
1112  *
1113  * write-lock is not held on entry
1114  * output_pending is PENDING_WRITE on entry
1115  */
1116 static void
1117 write_message_continue_writing (MessageToWriteData *data)
1118 {
1119 #ifdef G_OS_UNIX
1120   GSimpleAsyncResult *simple;
1121   simple = data->simple;
1122 #endif
1123
1124   if (G_IS_KDBUS_CONNECTION (data->worker->stream))
1125     {
1126       GError *error;
1127       error = NULL;
1128       data->total_written = g_kdbus_send_message(data->worker, data->worker->kdbus, data->message, data->blob, data->blob_size, &error);
1129     
1130       write_message_print_transport_debug (data->total_written, data);  
1131       g_simple_async_result_complete (simple);
1132       g_object_unref (simple);
1133       goto out;
1134     }
1135   else
1136     {
1137       GOutputStream *ostream;
1138 #ifdef G_OS_UNIX
1139       GUnixFDList *fd_list;
1140 #endif
1141
1142       ostream = g_io_stream_get_output_stream (data->worker->stream);
1143 #ifdef G_OS_UNIX
1144       fd_list = g_dbus_message_get_unix_fd_list (data->message);
1145 #endif
1146
1147       g_assert (!g_output_stream_has_pending (ostream));
1148       g_assert_cmpint (data->total_written, <, data->blob_size);
1149
1150       if (FALSE)
1151         {
1152         }
1153 #ifdef G_OS_UNIX
1154       else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
1155         {
1156            GOutputVector vector;
1157            GSocketControlMessage *control_message;
1158            gssize bytes_written;
1159            GError *error;
1160
1161            vector.buffer = data->blob;
1162            vector.size = data->blob_size;
1163
1164            control_message = NULL;
1165            if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
1166              {
1167                if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
1168                  {
1169                    g_simple_async_result_set_error (simple,
1170                                                     G_IO_ERROR,
1171                                                     G_IO_ERROR_FAILED,
1172                                                     "Tried sending a file descriptor but remote peer does not support this capability");
1173                    g_simple_async_result_complete (simple);
1174                    g_object_unref (simple);
1175                    goto out;
1176                  }
1177                 control_message = g_unix_fd_message_new_with_fd_list (fd_list);
1178               }
1179
1180             error = NULL;
1181             bytes_written = g_socket_send_message (data->worker->socket,
1182                                                   NULL, /* address */
1183                                                   &vector,
1184                                                   1,
1185                                                   control_message != NULL ? &control_message : NULL,
1186                                                   control_message != NULL ? 1 : 0,
1187                                                   G_SOCKET_MSG_NONE,
1188                                                   data->worker->cancellable,
1189                                                   &error);
1190            if (control_message != NULL)
1191              g_object_unref (control_message);
1192
1193            if (bytes_written == -1)
1194              {
1195                /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1196                if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1197                  {
1198                    GSource *source;
1199                    source = g_socket_create_source (data->worker->socket,
1200                                                     G_IO_OUT | G_IO_HUP | G_IO_ERR,
1201                                                     data->worker->cancellable);
1202                    g_source_set_callback (source,
1203                                           (GSourceFunc) on_socket_ready,
1204                                           data,
1205                                           NULL); /* GDestroyNotify */
1206                    g_source_attach (source, g_main_context_get_thread_default ());
1207                    g_source_unref (source);
1208                    g_error_free (error);
1209                    goto out;
1210                  }
1211                g_simple_async_result_take_error (simple, error);
1212                g_simple_async_result_complete (simple);
1213                g_object_unref (simple);
1214                goto out;
1215              }
1216            g_assert (bytes_written > 0); /* zero is never returned */
1217
1218            write_message_print_transport_debug (bytes_written, data);
1219
1220            data->total_written += bytes_written;
1221            g_assert (data->total_written <= data->blob_size);
1222            if (data->total_written == data->blob_size)
1223              {
1224                g_simple_async_result_complete (simple);
1225                g_object_unref (simple);
1226                goto out;
1227              }
1228
1229            write_message_continue_writing (data);
1230          }
1231 #endif
1232        else
1233          {
1234 #ifdef G_OS_UNIX
1235            if (fd_list != NULL)
1236              {
1237                g_simple_async_result_set_error (simple,
1238                                                 G_IO_ERROR,
1239                                                 G_IO_ERROR_FAILED,
1240                                                 "Tried sending a file descriptor on unsupported stream of type %s",
1241                                                 g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1242                g_simple_async_result_complete (simple);
1243                g_object_unref (simple);
1244                goto out;
1245              }
1246 #endif
1247
1248             g_output_stream_write_async (ostream,
1249                                         (const gchar *) data->blob + data->total_written,
1250                                         data->blob_size - data->total_written,
1251                                         G_PRIORITY_DEFAULT,
1252                                         data->worker->cancellable,
1253                                         write_message_async_cb,
1254                                         data);
1255          }
1256     }
1257 #ifdef G_OS_UNIX
1258  out:
1259 #endif
1260   ;
1261 }
1262
1263 /* called in private thread shared by all GDBusConnection instances
1264  *
1265  * write-lock is not held on entry
1266  * output_pending is PENDING_WRITE on entry
1267  */
1268 static void
1269 write_message_async (GDBusWorker         *worker,
1270                      MessageToWriteData  *data,
1271                      GAsyncReadyCallback  callback,
1272                      gpointer             user_data)
1273 {
1274   data->simple = g_simple_async_result_new (NULL,
1275                                             callback,
1276                                             user_data,
1277                                             write_message_async);
1278   data->total_written = 0;
1279   write_message_continue_writing (data);
1280 }
1281
1282 /* called in private thread shared by all GDBusConnection instances (with write-lock held) */
1283 static gboolean
1284 write_message_finish (GAsyncResult   *res,
1285                       GError        **error)
1286 {
1287   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1288   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1289     return FALSE;
1290   else
1291     return TRUE;
1292 }
1293 /* ---------------------------------------------------------------------------------------------------- */
1294
1295 static void continue_writing (GDBusWorker *worker);
1296
1297 typedef struct
1298 {
1299   GDBusWorker *worker;
1300   GList *flushers;
1301 } FlushAsyncData;
1302
1303 static void
1304 flush_data_list_complete (const GList  *flushers,
1305                           const GError *error)
1306 {
1307   const GList *l;
1308
1309   for (l = flushers; l != NULL; l = l->next)
1310     {
1311       FlushData *f = l->data;
1312
1313       f->error = error != NULL ? g_error_copy (error) : NULL;
1314
1315       g_mutex_lock (&f->mutex);
1316       g_cond_signal (&f->cond);
1317       g_mutex_unlock (&f->mutex);
1318     }
1319 }
1320
1321 /* called in private thread shared by all GDBusConnection instances
1322  *
1323  * write-lock is not held on entry
1324  * output_pending is PENDING_FLUSH on entry
1325  */
1326 static void
1327 ostream_flush_cb (GObject      *source_object,
1328                   GAsyncResult *res,
1329                   gpointer      user_data)
1330 {
1331   FlushAsyncData *data = user_data;
1332   GError *error;
1333
1334   error = NULL;
1335   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1336                                 res,
1337                                 &error);
1338
1339   if (error == NULL)
1340     {
1341       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1342         {
1343           _g_dbus_debug_print_lock ();
1344           g_print ("========================================================================\n"
1345                    "GDBus-debug:Transport:\n"
1346                    "  ---- FLUSHED stream of type %s\n",
1347                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1348           _g_dbus_debug_print_unlock ();
1349         }
1350     }
1351
1352   g_assert (data->flushers != NULL);
1353   flush_data_list_complete (data->flushers, error);
1354   g_list_free (data->flushers);
1355
1356   if (error != NULL)
1357     g_error_free (error);
1358
1359   /* Make sure we tell folks that we don't have additional
1360      flushes pending */
1361   g_mutex_lock (&data->worker->write_lock);
1362   data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
1363   g_assert (data->worker->output_pending == PENDING_FLUSH);
1364   data->worker->output_pending = PENDING_NONE;
1365   g_mutex_unlock (&data->worker->write_lock);
1366
1367   /* OK, cool, finally kick off the next write */
1368   continue_writing (data->worker);
1369
1370   _g_dbus_worker_unref (data->worker);
1371   g_free (data);
1372 }
1373
1374 /* called in private thread shared by all GDBusConnection instances
1375  *
1376  * write-lock is not held on entry
1377  * output_pending is PENDING_FLUSH on entry
1378  */
1379 static void
1380 start_flush (FlushAsyncData *data)
1381 {
1382   g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
1383                                G_PRIORITY_DEFAULT,
1384                                data->worker->cancellable,
1385                                ostream_flush_cb,
1386                                data);
1387 }
1388
1389 /* called in private thread shared by all GDBusConnection instances
1390  *
1391  * write-lock is held on entry
1392  * output_pending is PENDING_NONE on entry
1393  */
1394 static void
1395 message_written_unlocked (GDBusWorker *worker,
1396                           MessageToWriteData *message_data)
1397 {
1398   if (G_UNLIKELY (_g_dbus_debug_message ()))
1399     {
1400       gchar *s;
1401       _g_dbus_debug_print_lock ();
1402       g_print ("========================================================================\n"
1403                "GDBus-debug:Message:\n"
1404                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1405                message_data->blob_size);
1406       s = g_dbus_message_print (message_data->message, 2);
1407       g_print ("%s", s);
1408       g_free (s);
1409       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1410         {
1411           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1412           g_print ("%s\n", s);
1413           g_free (s);
1414         }
1415       _g_dbus_debug_print_unlock ();
1416     }
1417
1418   worker->write_num_messages_written += 1;
1419 }
1420
1421 /* called in private thread shared by all GDBusConnection instances
1422  *
1423  * write-lock is held on entry
1424  * output_pending is PENDING_NONE on entry
1425  *
1426  * Returns: non-%NULL, setting @output_pending, if we need to flush now
1427  */
1428 static FlushAsyncData *
1429 prepare_flush_unlocked (GDBusWorker *worker)
1430 {
1431   GList *l;
1432   GList *ll;
1433   GList *flushers;
1434
1435   flushers = NULL;
1436   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1437     {
1438       FlushData *f = l->data;
1439       ll = l->next;
1440
1441       if (f->number_to_wait_for == worker->write_num_messages_written)
1442         {
1443           flushers = g_list_append (flushers, f);
1444           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1445         }
1446     }
1447   if (flushers != NULL)
1448     {
1449       g_assert (worker->output_pending == PENDING_NONE);
1450       worker->output_pending = PENDING_FLUSH;
1451     }
1452
1453   if (flushers != NULL)
1454     {
1455       FlushAsyncData *data;
1456
1457       data = g_new0 (FlushAsyncData, 1);
1458       data->worker = _g_dbus_worker_ref (worker);
1459       data->flushers = flushers;
1460       return data;
1461     }
1462
1463   return NULL;
1464 }
1465
1466 /* called in private thread shared by all GDBusConnection instances
1467  *
1468  * write-lock is not held on entry
1469  * output_pending is PENDING_WRITE on entry
1470  */
1471 static void
1472 write_message_cb (GObject       *source_object,
1473                   GAsyncResult  *res,
1474                   gpointer       user_data)
1475 {
1476   MessageToWriteData *data = user_data;
1477   GError *error;
1478
1479   g_mutex_lock (&data->worker->write_lock);
1480   g_assert (data->worker->output_pending == PENDING_WRITE);
1481   data->worker->output_pending = PENDING_NONE;
1482
1483   error = NULL;
1484   if (!write_message_finish (res, &error))
1485     {
1486       g_mutex_unlock (&data->worker->write_lock);
1487
1488       /* TODO: handle */
1489       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1490       g_error_free (error);
1491
1492       g_mutex_lock (&data->worker->write_lock);
1493     }
1494
1495   message_written_unlocked (data->worker, data);
1496
1497   g_mutex_unlock (&data->worker->write_lock);
1498
1499   continue_writing (data->worker);
1500
1501   message_to_write_data_free (data);
1502 }
1503
1504 /* called in private thread shared by all GDBusConnection instances
1505  *
1506  * write-lock is not held on entry
1507  * output_pending is PENDING_CLOSE on entry
1508  */
1509 static void
1510 iostream_close_cb (GObject      *source_object,
1511                    GAsyncResult *res,
1512                    gpointer      user_data)
1513 {
1514   GDBusWorker *worker = user_data;
1515   GError *error = NULL;
1516   GList *pending_close_attempts, *pending_flush_attempts;
1517   GQueue *send_queue;
1518
1519   g_io_stream_close_finish (worker->stream, res, &error);
1520
1521   g_mutex_lock (&worker->write_lock);
1522
1523   pending_close_attempts = worker->pending_close_attempts;
1524   worker->pending_close_attempts = NULL;
1525
1526   pending_flush_attempts = worker->write_pending_flushes;
1527   worker->write_pending_flushes = NULL;
1528
1529   send_queue = worker->write_queue;
1530   worker->write_queue = g_queue_new ();
1531
1532   g_assert (worker->output_pending == PENDING_CLOSE);
1533   worker->output_pending = PENDING_NONE;
1534
1535   g_mutex_unlock (&worker->write_lock);
1536
1537   while (pending_close_attempts != NULL)
1538     {
1539       CloseData *close_data = pending_close_attempts->data;
1540
1541       pending_close_attempts = g_list_delete_link (pending_close_attempts,
1542                                                    pending_close_attempts);
1543
1544       if (close_data->result != NULL)
1545         {
1546           if (error != NULL)
1547             g_simple_async_result_set_from_error (close_data->result, error);
1548
1549           /* this must be in an idle because the result is likely to be
1550            * intended for another thread
1551            */
1552           g_simple_async_result_complete_in_idle (close_data->result);
1553         }
1554
1555       close_data_free (close_data);
1556     }
1557
1558   g_clear_error (&error);
1559
1560   /* all messages queued for sending are discarded */
1561   g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
1562   /* all queued flushes fail */
1563   error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
1564                        _("Operation was cancelled"));
1565   flush_data_list_complete (pending_flush_attempts, error);
1566   g_list_free (pending_flush_attempts);
1567   g_clear_error (&error);
1568
1569   _g_dbus_worker_unref (worker);
1570 }
1571
1572 /* called in private thread shared by all GDBusConnection instances
1573  *
1574  * write-lock is not held on entry
1575  * output_pending must be PENDING_NONE on entry
1576  */
1577 static void
1578 continue_writing (GDBusWorker *worker)
1579 {
1580   MessageToWriteData *data;
1581   FlushAsyncData *flush_async_data;
1582
1583  write_next:
1584   /* we mustn't try to write two things at once */
1585   g_assert (worker->output_pending == PENDING_NONE);
1586
1587   g_mutex_lock (&worker->write_lock);
1588
1589   data = NULL;
1590   flush_async_data = NULL;
1591
1592   /* if we want to close the connection, that takes precedence */
1593   if (worker->pending_close_attempts != NULL)
1594     {
1595       worker->close_expected = TRUE;
1596       worker->output_pending = PENDING_CLOSE;
1597
1598       g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
1599                                NULL, iostream_close_cb,
1600                                _g_dbus_worker_ref (worker));
1601     }
1602   else
1603     {
1604       flush_async_data = prepare_flush_unlocked (worker);
1605
1606       if (flush_async_data == NULL)
1607         {
1608           data = g_queue_pop_head (worker->write_queue);
1609
1610           if (data != NULL)
1611             worker->output_pending = PENDING_WRITE;
1612         }
1613     }
1614
1615   g_mutex_unlock (&worker->write_lock);
1616
1617   /* Note that write_lock is only used for protecting the @write_queue
1618    * and @output_pending fields of the GDBusWorker struct ... which we
1619    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1620    *
1621    * Therefore, it's fine to drop it here when calling back into user
1622    * code and then writing the message out onto the GIOStream since this
1623    * function only runs on the worker thread.
1624    */
1625
1626   if (flush_async_data != NULL)
1627     {
1628       start_flush (flush_async_data);
1629       g_assert (data == NULL);
1630     }
1631   else if (data != NULL)
1632     {
1633       GDBusMessage *old_message;
1634       guchar *new_blob;
1635       gsize new_blob_size;
1636       GError *error;
1637
1638       old_message = data->message;
1639       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1640       if (data->message == old_message)
1641         {
1642           /* filters had no effect - do nothing */
1643         }
1644       else if (data->message == NULL)
1645         {
1646           /* filters dropped message */
1647           g_mutex_lock (&worker->write_lock);
1648           worker->output_pending = PENDING_NONE;
1649           g_mutex_unlock (&worker->write_lock);
1650           message_to_write_data_free (data);
1651           goto write_next;
1652         }
1653       else
1654         {
1655           /* filters altered the message -> reencode */
1656           error = NULL;
1657           new_blob = g_dbus_message_to_blob (data->message,
1658                                              &new_blob_size,
1659                                              worker->capabilities,
1660                                              &error);
1661           if (new_blob == NULL)
1662             {
1663               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1664                * the old message instead
1665                */
1666               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1667                          g_dbus_message_get_serial (data->message),
1668                          error->message);
1669               g_error_free (error);
1670             }
1671           else
1672             {
1673               g_free (data->blob);
1674               data->blob = (gchar *) new_blob;
1675               data->blob_size = new_blob_size;
1676             }
1677         }
1678
1679       write_message_async (worker,
1680                            data,
1681                            write_message_cb,
1682                            data);
1683     }
1684 }
1685
1686 /* called in private thread shared by all GDBusConnection instances
1687  *
1688  * write-lock is not held on entry
1689  * output_pending may be anything
1690  */
1691 static gboolean
1692 continue_writing_in_idle_cb (gpointer user_data)
1693 {
1694   GDBusWorker *worker = user_data;
1695
1696   /* Because this is the worker thread, we can read this struct member
1697    * without holding the lock: no other thread ever modifies it.
1698    */
1699   if (worker->output_pending == PENDING_NONE)
1700     continue_writing (worker);
1701
1702   return FALSE;
1703 }
1704
1705 /*
1706  * @write_data: (transfer full) (allow-none):
1707  * @flush_data: (transfer full) (allow-none):
1708  * @close_data: (transfer full) (allow-none):
1709  *
1710  * Can be called from any thread
1711  *
1712  * write_lock is held on entry
1713  * output_pending may be anything
1714  */
1715 static void
1716 schedule_writing_unlocked (GDBusWorker        *worker,
1717                            MessageToWriteData *write_data,
1718                            FlushData          *flush_data,
1719                            CloseData          *close_data)
1720 {
1721   if (write_data != NULL)
1722     g_queue_push_tail (worker->write_queue, write_data);
1723
1724   if (flush_data != NULL)
1725     worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
1726
1727   if (close_data != NULL)
1728     worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
1729                                                      close_data);
1730
1731   /* If we had output pending, the next bit of output will happen
1732    * automatically when it finishes, so we only need to do this
1733    * if nothing was pending.
1734    *
1735    * The idle callback will re-check that output_pending is still
1736    * PENDING_NONE, to guard against output starting before the idle.
1737    */
1738   if (worker->output_pending == PENDING_NONE)
1739     {
1740       GSource *idle_source;
1741       idle_source = g_idle_source_new ();
1742       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1743       g_source_set_callback (idle_source,
1744                              continue_writing_in_idle_cb,
1745                              _g_dbus_worker_ref (worker),
1746                              (GDestroyNotify) _g_dbus_worker_unref);
1747       g_source_attach (idle_source, worker->shared_thread_data->context);
1748       g_source_unref (idle_source);
1749     }
1750 }
1751
1752 /* ---------------------------------------------------------------------------------------------------- */
1753
1754 /* can be called from any thread - steals blob
1755  *
1756  * write_lock is not held on entry
1757  * output_pending may be anything
1758  */
1759 void
1760 _g_dbus_worker_send_message (GDBusWorker    *worker,
1761                              GDBusMessage   *message,
1762                              gchar          *blob,
1763                              gsize           blob_len)
1764 {
1765   MessageToWriteData *data;
1766
1767   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1768   g_return_if_fail (blob != NULL);
1769   g_return_if_fail (blob_len > 16);
1770
1771   data = g_new0 (MessageToWriteData, 1);
1772   data->worker = _g_dbus_worker_ref (worker);
1773   data->message = g_object_ref (message);
1774   data->blob = blob; /* steal! */
1775   data->blob_size = blob_len;
1776
1777   g_mutex_lock (&worker->write_lock);
1778   schedule_writing_unlocked (worker, data, NULL, NULL);
1779   g_mutex_unlock (&worker->write_lock);
1780 }
1781
1782 /* ---------------------------------------------------------------------------------------------------- */
1783 GKdbus*        g_dbus_worker_get_kdbus    (GDBusWorker  *worker)
1784 {
1785   return worker->kdbus;
1786 }
1787
1788 GDBusWorker *
1789 _g_dbus_worker_new (GIOStream                              *stream,
1790                     GDBusCapabilityFlags                    capabilities,
1791                     gboolean                                initially_frozen,
1792                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1793                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1794                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1795                     gpointer                                user_data)
1796 {
1797   GDBusWorker *worker;
1798   GSource *idle_source;
1799
1800   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1801   g_return_val_if_fail (message_received_callback != NULL, NULL);
1802   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1803   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1804
1805   worker = g_new0 (GDBusWorker, 1);
1806   worker->ref_count = 1;
1807
1808   g_mutex_init (&worker->read_lock);
1809   worker->message_received_callback = message_received_callback;
1810   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1811   worker->disconnected_callback = disconnected_callback;
1812   worker->user_data = user_data;
1813   worker->stream = g_object_ref (stream);
1814   worker->capabilities = capabilities;
1815   worker->cancellable = g_cancellable_new ();
1816   worker->output_pending = PENDING_NONE;
1817
1818   worker->frozen = initially_frozen;
1819   worker->received_messages_while_frozen = g_queue_new ();
1820
1821   g_mutex_init (&worker->write_lock);
1822   worker->write_queue = g_queue_new ();
1823
1824   if (G_IS_SOCKET_CONNECTION (worker->stream))
1825     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1826
1827   if (G_IS_KDBUS_CONNECTION (worker->stream))
1828     worker->kdbus = g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream));
1829
1830   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1831
1832   /* begin reading */
1833   idle_source = g_idle_source_new ();
1834   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1835   g_source_set_callback (idle_source,
1836                          _g_dbus_worker_do_initial_read,
1837                          _g_dbus_worker_ref (worker),
1838                          (GDestroyNotify) _g_dbus_worker_unref);
1839   g_source_attach (idle_source, worker->shared_thread_data->context);
1840   g_source_unref (idle_source);
1841
1842   return worker;
1843 }
1844
1845 /* ---------------------------------------------------------------------------------------------------- */
1846
1847 /* can be called from any thread
1848  *
1849  * write_lock is not held on entry
1850  * output_pending may be anything
1851  */
1852 void
1853 _g_dbus_worker_close (GDBusWorker         *worker,
1854                       GCancellable        *cancellable,
1855                       GSimpleAsyncResult  *result)
1856 {
1857   CloseData *close_data;
1858
1859   close_data = g_slice_new0 (CloseData);
1860   close_data->worker = _g_dbus_worker_ref (worker);
1861   close_data->cancellable =
1862       (cancellable == NULL ? NULL : g_object_ref (cancellable));
1863   close_data->result = (result == NULL ? NULL : g_object_ref (result));
1864
1865   /* Don't set worker->close_expected here - we're in the wrong thread.
1866    * It'll be set before the actual close happens.
1867    */
1868   g_cancellable_cancel (worker->cancellable);
1869   g_mutex_lock (&worker->write_lock);
1870   schedule_writing_unlocked (worker, NULL, NULL, close_data);
1871   g_mutex_unlock (&worker->write_lock);
1872 }
1873
1874 /* This can be called from any thread - frees worker. Note that
1875  * callbacks might still happen if called from another thread than the
1876  * worker - use your own synchronization primitive in the callbacks.
1877  *
1878  * write_lock is not held on entry
1879  * output_pending may be anything
1880  */
1881 void
1882 _g_dbus_worker_stop (GDBusWorker *worker)
1883 {
1884   g_atomic_int_set (&worker->stopped, TRUE);
1885
1886   /* Cancel any pending operations and schedule a close of the underlying I/O
1887    * stream in the worker thread
1888    */
1889   _g_dbus_worker_close (worker, NULL, NULL);
1890
1891   /* _g_dbus_worker_close holds a ref until after an idle in the worker
1892    * thread has run, so we no longer need to unref in an idle like in
1893    * commit 322e25b535
1894    */
1895   _g_dbus_worker_unref (worker);
1896 }
1897
1898 /* ---------------------------------------------------------------------------------------------------- */
1899
1900 /* can be called from any thread (except the worker thread) - blocks
1901  * calling thread until all queued outgoing messages are written and
1902  * the transport has been flushed
1903  *
1904  * write_lock is not held on entry
1905  * output_pending may be anything
1906  */
1907 gboolean
1908 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1909                            GCancellable   *cancellable,
1910                            GError        **error)
1911 {
1912   gboolean ret;
1913   FlushData *data;
1914   guint64 pending_writes;
1915
1916   data = NULL;
1917   ret = TRUE;
1918
1919   g_mutex_lock (&worker->write_lock);
1920
1921   /* if the queue is empty, no write is in-flight and we haven't written
1922    * anything since the last flush, then there's nothing to wait for
1923    */
1924   pending_writes = g_queue_get_length (worker->write_queue);
1925
1926   /* if a write is in-flight, we shouldn't be satisfied until the first
1927    * flush operation that follows it
1928    */
1929   if (worker->output_pending == PENDING_WRITE)
1930     pending_writes += 1;
1931
1932   if (pending_writes > 0 ||
1933       worker->write_num_messages_written != worker->write_num_messages_flushed)
1934     {
1935       data = g_new0 (FlushData, 1);
1936       g_mutex_init (&data->mutex);
1937       g_cond_init (&data->cond);
1938       data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
1939       g_mutex_lock (&data->mutex);
1940
1941       schedule_writing_unlocked (worker, NULL, data, NULL);
1942     }
1943   g_mutex_unlock (&worker->write_lock);
1944
1945   if (data != NULL)
1946     {
1947       g_cond_wait (&data->cond, &data->mutex);
1948       g_mutex_unlock (&data->mutex);
1949
1950       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1951       g_cond_clear (&data->cond);
1952       g_mutex_clear (&data->mutex);
1953       if (data->error != NULL)
1954         {
1955           ret = FALSE;
1956           g_propagate_error (error, data->error);
1957         }
1958       g_free (data);
1959     }
1960
1961   return ret;
1962 }
1963
1964 /* ---------------------------------------------------------------------------------------------------- */
1965
1966 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1967 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1968 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1969 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1970 #define G_DBUS_DEBUG_CALL           (1<<4)
1971 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1972 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1973 #define G_DBUS_DEBUG_RETURN         (1<<7)
1974 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1975 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1976
1977 static gint _gdbus_debug_flags = 0;
1978
1979 gboolean
1980 _g_dbus_debug_authentication (void)
1981 {
1982   _g_dbus_initialize ();
1983   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1984 }
1985
1986 gboolean
1987 _g_dbus_debug_transport (void)
1988 {
1989   _g_dbus_initialize ();
1990   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1991 }
1992
1993 gboolean
1994 _g_dbus_debug_message (void)
1995 {
1996   _g_dbus_initialize ();
1997   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1998 }
1999
2000 gboolean
2001 _g_dbus_debug_payload (void)
2002 {
2003   _g_dbus_initialize ();
2004   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
2005 }
2006
2007 gboolean
2008 _g_dbus_debug_call (void)
2009 {
2010   _g_dbus_initialize ();
2011   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
2012 }
2013
2014 gboolean
2015 _g_dbus_debug_signal (void)
2016 {
2017   _g_dbus_initialize ();
2018   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
2019 }
2020
2021 gboolean
2022 _g_dbus_debug_incoming (void)
2023 {
2024   _g_dbus_initialize ();
2025   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
2026 }
2027
2028 gboolean
2029 _g_dbus_debug_return (void)
2030 {
2031   _g_dbus_initialize ();
2032   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
2033 }
2034
2035 gboolean
2036 _g_dbus_debug_emission (void)
2037 {
2038   _g_dbus_initialize ();
2039   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
2040 }
2041
2042 gboolean
2043 _g_dbus_debug_address (void)
2044 {
2045   _g_dbus_initialize ();
2046   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
2047 }
2048
2049 G_LOCK_DEFINE_STATIC (print_lock);
2050
2051 void
2052 _g_dbus_debug_print_lock (void)
2053 {
2054   G_LOCK (print_lock);
2055 }
2056
2057 void
2058 _g_dbus_debug_print_unlock (void)
2059 {
2060   G_UNLOCK (print_lock);
2061 }
2062
2063 /*
2064  * _g_dbus_initialize:
2065  *
2066  * Does various one-time init things such as
2067  *
2068  *  - registering the G_DBUS_ERROR error domain
2069  *  - parses the G_DBUS_DEBUG environment variable
2070  */
2071 void
2072 _g_dbus_initialize (void)
2073 {
2074   static volatile gsize initialized = 0;
2075
2076   if (g_once_init_enter (&initialized))
2077     {
2078       volatile GQuark g_dbus_error_domain;
2079       const gchar *debug;
2080
2081       g_dbus_error_domain = G_DBUS_ERROR;
2082       (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
2083
2084       debug = g_getenv ("G_DBUS_DEBUG");
2085       if (debug != NULL)
2086         {
2087           const GDebugKey keys[] = {
2088             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
2089             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
2090             { "message",        G_DBUS_DEBUG_MESSAGE        },
2091             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
2092             { "call",           G_DBUS_DEBUG_CALL           },
2093             { "signal",         G_DBUS_DEBUG_SIGNAL         },
2094             { "incoming",       G_DBUS_DEBUG_INCOMING       },
2095             { "return",         G_DBUS_DEBUG_RETURN         },
2096             { "emission",       G_DBUS_DEBUG_EMISSION       },
2097             { "address",        G_DBUS_DEBUG_ADDRESS        }
2098           };
2099
2100           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
2101           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
2102             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
2103         }
2104
2105       g_once_init_leave (&initialized, 1);
2106     }
2107 }
2108
2109 /* ---------------------------------------------------------------------------------------------------- */
2110
2111 GVariantType *
2112 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
2113 {
2114   const GVariantType *arg_types[256];
2115   guint n;
2116
2117   if (args)
2118     for (n = 0; args[n] != NULL; n++)
2119       {
2120         /* DBus places a hard limit of 255 on signature length.
2121          * therefore number of args must be less than 256.
2122          */
2123         g_assert (n < 256);
2124
2125         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
2126
2127         if G_UNLIKELY (arg_types[n] == NULL)
2128           return NULL;
2129       }
2130   else
2131     n = 0;
2132
2133   return g_variant_type_new_tuple (arg_types, n);
2134 }
2135
2136 /* ---------------------------------------------------------------------------------------------------- */
2137
2138 #ifdef G_OS_WIN32
2139
2140 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
2141
2142 gchar *
2143 _g_dbus_win32_get_user_sid (void)
2144 {
2145   HANDLE h;
2146   TOKEN_USER *user;
2147   DWORD token_information_len;
2148   PSID psid;
2149   gchar *sid;
2150   gchar *ret;
2151
2152   ret = NULL;
2153   user = NULL;
2154   h = INVALID_HANDLE_VALUE;
2155
2156   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
2157     {
2158       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
2159       goto out;
2160     }
2161
2162   /* Get length of buffer */
2163   token_information_len = 0;
2164   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
2165     {
2166       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
2167         {
2168           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2169           goto out;
2170         }
2171     }
2172   user = g_malloc (token_information_len);
2173   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
2174     {
2175       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2176       goto out;
2177     }
2178
2179   psid = user->User.Sid;
2180   if (!IsValidSid (psid))
2181     {
2182       g_warning ("Invalid SID");
2183       goto out;
2184     }
2185
2186   if (!ConvertSidToStringSidA (psid, &sid))
2187     {
2188       g_warning ("Invalid SID");
2189       goto out;
2190     }
2191
2192   ret = g_strdup (sid);
2193   LocalFree (sid);
2194
2195 out:
2196   g_free (user);
2197   if (h != INVALID_HANDLE_VALUE)
2198     CloseHandle (h);
2199   return ret;
2200 }
2201 #endif
2202
2203 /* ---------------------------------------------------------------------------------------------------- */
2204
2205 gchar *
2206 _g_dbus_get_machine_id (GError **error)
2207 {
2208 #ifdef G_OS_WIN32
2209   HW_PROFILE_INFOA info;
2210   char *src, *dest, *res;
2211   int i;
2212
2213   if (!GetCurrentHwProfileA (&info))
2214     {
2215       char *message = g_win32_error_message (GetLastError ());
2216       g_set_error (error,
2217                    G_IO_ERROR,
2218                    G_IO_ERROR_FAILED,
2219                    _("Unable to get Hardware profile: %s"), message);
2220       g_free (message);
2221       return NULL;
2222     }
2223
2224   /* Form: {12340001-4980-1920-6788-123456789012} */
2225   src = &info.szHwProfileGuid[0];
2226
2227   res = g_malloc (32+1);
2228   dest = res;
2229
2230   src++; /* Skip { */
2231   for (i = 0; i < 8; i++)
2232     *dest++ = *src++;
2233   src++; /* Skip - */
2234   for (i = 0; i < 4; i++)
2235     *dest++ = *src++;
2236   src++; /* Skip - */
2237   for (i = 0; i < 4; i++)
2238     *dest++ = *src++;
2239   src++; /* Skip - */
2240   for (i = 0; i < 4; i++)
2241     *dest++ = *src++;
2242   src++; /* Skip - */
2243   for (i = 0; i < 12; i++)
2244     *dest++ = *src++;
2245   *dest = 0;
2246
2247   return res;
2248 #else
2249   gchar *ret;
2250   GError *first_error;
2251   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
2252   ret = NULL;
2253   first_error = NULL;
2254   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
2255                             &ret,
2256                             NULL,
2257                             &first_error) &&
2258       !g_file_get_contents ("/etc/machine-id",
2259                             &ret,
2260                             NULL,
2261                             NULL))
2262     {
2263       g_propagate_prefixed_error (error, first_error,
2264                                   _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
2265     }
2266   else
2267     {
2268       /* ignore the error from the first try, if any */
2269       g_clear_error (&first_error);
2270       /* TODO: validate value */
2271       g_strstrip (ret);
2272     }
2273   return ret;
2274 #endif
2275 }
2276
2277 /* ---------------------------------------------------------------------------------------------------- */
2278
2279 gchar *
2280 _g_dbus_enum_to_string (GType enum_type, gint value)
2281 {
2282   gchar *ret;
2283   GEnumClass *klass;
2284   GEnumValue *enum_value;
2285
2286   klass = g_type_class_ref (enum_type);
2287   enum_value = g_enum_get_value (klass, value);
2288   if (enum_value != NULL)
2289     ret = g_strdup (enum_value->value_nick);
2290   else
2291     ret = g_strdup_printf ("unknown (value %d)", value);
2292   g_type_class_unref (klass);
2293   return ret;
2294 }
2295
2296 /* ---------------------------------------------------------------------------------------------------- */
2297
2298 static void
2299 write_message_print_transport_debug (gssize bytes_written,
2300                                      MessageToWriteData *data)
2301 {
2302   if (G_LIKELY (!_g_dbus_debug_transport ()))
2303     goto out;
2304
2305   _g_dbus_debug_print_lock ();
2306   g_print ("========================================================================\n"
2307            "GDBus-debug:Transport:\n"
2308            "  >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
2309            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
2310            bytes_written,
2311            g_dbus_message_get_serial (data->message),
2312            data->blob_size,
2313            data->total_written,
2314            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
2315   _g_dbus_debug_print_unlock ();
2316  out:
2317   ;
2318 }
2319
2320 /* ---------------------------------------------------------------------------------------------------- */
2321
2322 static void
2323 read_message_print_transport_debug (gssize bytes_read,
2324                                     GDBusWorker *worker)
2325 {
2326   gsize size;
2327   gint32 serial;
2328   gint32 message_length;
2329
2330   if (G_LIKELY (!_g_dbus_debug_transport ()))
2331     goto out;
2332
2333   size = bytes_read + worker->read_buffer_cur_size;
2334   serial = 0;
2335   message_length = 0;
2336   if (size >= 16)
2337     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
2338   if (size >= 1)
2339     {
2340       switch (worker->read_buffer[0])
2341         {
2342         case 'l':
2343           if (size >= 12)
2344             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
2345           break;
2346         case 'B':
2347           if (size >= 12)
2348             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
2349           break;
2350         default:
2351           /* an error will be set elsewhere if this happens */
2352           goto out;
2353         }
2354     }
2355
2356     _g_dbus_debug_print_lock ();
2357   g_print ("========================================================================\n"
2358            "GDBus-debug:Transport:\n"
2359            "  <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
2360            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
2361            bytes_read,
2362            serial,
2363            message_length,
2364            worker->read_buffer_cur_size,
2365            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
2366   _g_dbus_debug_print_unlock ();
2367  out:
2368   ;
2369 }
2370
2371 /* ---------------------------------------------------------------------------------------------------- */
2372
2373 gboolean
2374 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
2375                                      GValue                *return_accu,
2376                                      const GValue          *handler_return,
2377                                      gpointer               dummy)
2378 {
2379   gboolean continue_emission;
2380   gboolean signal_return;
2381
2382   signal_return = g_value_get_boolean (handler_return);
2383   g_value_set_boolean (return_accu, signal_return);
2384   continue_emission = signal_return;
2385
2386   return continue_emission;
2387 }