[Receiving] async functions copied/modified from gsocket
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: David Zeuthen <davidz@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <stdlib.h>
26 #include <string.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30
31 #include "giotypes.h"
32 #include "gsocket.h"
33 #include "gdbusprivate.h"
34 #include "gdbusmessage.h"
35 #include "gdbuserror.h"
36 #include "gdbusintrospection.h"
37 #include "gasyncresult.h"
38 #include "gsimpleasyncresult.h"
39 #include "ginputstream.h"
40 #include "gmemoryinputstream.h"
41 #include "giostream.h"
42 #include "glib/gstdio.h"
43 #include "gsocketcontrolmessage.h"
44 #include "gsocketconnection.h"
45 #include "gkdbusconnection.h"
46 #include "gsocketoutputstream.h"
47
48 #ifdef G_OS_UNIX
49 #include "gunixfdmessage.h"
50 #include "gunixconnection.h"
51 #include "gunixcredentialsmessage.h"
52 #endif
53
54 #ifdef G_OS_WIN32
55 #include <windows.h>
56 #endif
57
58 #include "glibintl.h"
59
60 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
61
62 /* ---------------------------------------------------------------------------------------------------- */
63
64 gchar *
65 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
66 {
67  guint n, m;
68  GString *ret;
69
70  ret = g_string_new (NULL);
71
72  for (n = 0; n < len; n += 16)
73    {
74      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
75
76      for (m = n; m < n + 16; m++)
77        {
78          if (m > n && (m%4) == 0)
79            g_string_append_c (ret, ' ');
80          if (m < len)
81            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
82          else
83            g_string_append (ret, "   ");
84        }
85
86      g_string_append (ret, "   ");
87
88      for (m = n; m < len && m < n + 16; m++)
89        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
90
91      g_string_append_c (ret, '\n');
92    }
93
94  return g_string_free (ret, FALSE);
95 }
96
97 /* ---------------------------------------------------------------------------------------------------- */
98
99 /* Unfortunately ancillary messages are discarded when reading from a
100  * socket using the GSocketInputStream abstraction. So we provide a
101  * very GInputStream-ish API that uses GSocket in this case (very
102  * similar to GSocketInputStream).
103  */
104
105 typedef struct
106 {
107   GSocket *socket;
108   GCancellable *cancellable;
109
110   void *buffer;
111   gsize count;
112
113   GSocketControlMessage ***messages;
114   gint *num_messages;
115
116   GSimpleAsyncResult *simple;
117
118   gboolean from_mainloop;
119 } ReadWithControlData;
120
121 typedef struct
122 {
123   GKdbus *kdbus;
124   GCancellable *cancellable;
125
126   void *buffer;
127   gsize count;
128
129   GSimpleAsyncResult *simple;
130
131   gboolean from_mainloop;
132 } ReadKdbusData;
133
134 static void
135 read_kdbus_data_free (ReadKdbusData *data)
136 {
137   //g_object_unref (data->kdbus); TODO
138   if (data->cancellable != NULL)
139     g_object_unref (data->cancellable);
140   g_object_unref (data->simple);
141   g_free (data);
142 }
143
144 static void
145 read_with_control_data_free (ReadWithControlData *data)
146 {
147   g_object_unref (data->socket);
148   if (data->cancellable != NULL)
149     g_object_unref (data->cancellable);
150   g_object_unref (data->simple);
151   g_free (data);
152 }
153
154 static gboolean
155 _g_kdbus_read_ready (GKdbus      *kdbus,
156                      GIOCondition  condition,
157                      gpointer      user_data)
158 {
159   ReadKdbusData *data = user_data;
160   GError *error;
161   gssize result;
162   
163   error = NULL;
164   
165   result = g_kdbus_receive (data->kdbus,
166                             data->buffer,
167                             &error);
168   if (result >= 0)
169     {
170       g_simple_async_result_set_op_res_gssize (data->simple, result);
171     }
172   else
173     {
174       g_assert (error != NULL);
175       g_simple_async_result_take_error (data->simple, error);
176     }
177
178   if (data->from_mainloop)
179     g_simple_async_result_complete (data->simple);
180   else
181     g_simple_async_result_complete_in_idle (data->simple);
182
183   return FALSE;
184 }
185
186 static gboolean
187 _g_socket_read_with_control_messages_ready (GSocket      *socket,
188                                             GIOCondition  condition,
189                                             gpointer      user_data)
190 {
191   ReadWithControlData *data = user_data;
192   GError *error;
193   gssize result;
194   GInputVector vector;
195
196   error = NULL;
197   vector.buffer = data->buffer;
198   vector.size = data->count;
199   result = g_socket_receive_message (data->socket,
200                                      NULL, /* address */
201                                      &vector,
202                                      1,
203                                      data->messages,
204                                      data->num_messages,
205                                      NULL,
206                                      data->cancellable,
207                                      &error);
208   if (result >= 0)
209     {
210       g_simple_async_result_set_op_res_gssize (data->simple, result);
211     }
212   else
213     {
214       g_assert (error != NULL);
215       g_simple_async_result_take_error (data->simple, error);
216     }
217
218   if (data->from_mainloop)
219     g_simple_async_result_complete (data->simple);
220   else
221     g_simple_async_result_complete_in_idle (data->simple);
222
223   return FALSE;
224 }
225
226 static void
227 _g_kdbus_read (GKdbus                  *kdbus,
228                void                    *buffer,
229                gsize                    count,
230                GCancellable            *cancellable,
231                GAsyncReadyCallback      callback,
232                gpointer                 user_data)
233 {
234   ReadKdbusData *data;
235
236   data = g_new0 (ReadKdbusData, 1);
237   data->kdbus = kdbus; /*g_object_ref (socket);*/
238   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
239   data->buffer = buffer;
240   data->count = count;
241
242   data->simple = g_simple_async_result_new (G_OBJECT (kdbus),
243                                             callback,
244                                             user_data,
245                                             _g_kdbus_read);
246   g_simple_async_result_set_check_cancellable (data->simple, cancellable);
247
248   if (!g_kdbus_condition_check (kdbus, G_IO_IN))
249     {
250       GSource *source;
251       data->from_mainloop = TRUE;
252       source = g_kdbus_create_source (data->kdbus,
253                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
254                                        cancellable);
255       g_source_set_callback (source,
256                              (GSourceFunc) _g_kdbus_read_ready,
257                              data,
258                              (GDestroyNotify) read_kdbus_data_free);
259       g_source_attach (source, g_main_context_get_thread_default ());
260       g_source_unref (source);
261     }
262   else
263     {
264       _g_kdbus_read_ready (data->kdbus, G_IO_IN, data);
265       read_kdbus_data_free (data);
266     }
267 }
268
269 static void
270 _g_socket_read_with_control_messages (GSocket                 *socket,
271                                       void                    *buffer,
272                                       gsize                    count,
273                                       GSocketControlMessage ***messages,
274                                       gint                    *num_messages,
275                                       gint                     io_priority,
276                                       GCancellable            *cancellable,
277                                       GAsyncReadyCallback      callback,
278                                       gpointer                 user_data)
279 {
280   ReadWithControlData *data;
281
282   data = g_new0 (ReadWithControlData, 1);
283   data->socket = g_object_ref (socket);
284   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
285   data->buffer = buffer;
286   data->count = count;
287   data->messages = messages;
288   data->num_messages = num_messages;
289
290   data->simple = g_simple_async_result_new (G_OBJECT (socket),
291                                             callback,
292                                             user_data,
293                                             _g_socket_read_with_control_messages);
294   g_simple_async_result_set_check_cancellable (data->simple, cancellable);
295
296   if (!g_socket_condition_check (socket, G_IO_IN))
297     {
298       GSource *source;
299       data->from_mainloop = TRUE;
300       source = g_socket_create_source (data->socket,
301                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
302                                        cancellable);
303       g_source_set_callback (source,
304                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
305                              data,
306                              (GDestroyNotify) read_with_control_data_free);
307       g_source_attach (source, g_main_context_get_thread_default ());
308       g_source_unref (source);
309     }
310   else
311     {
312       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
313       read_with_control_data_free (data);
314     }
315 }
316
317 static gssize
318 _g_kdbus_read_finish (GKdbus       *kdbus,
319                                              GAsyncResult  *result,
320                                              GError       **error)
321 {
322   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
323
324   g_return_val_if_fail (G_IS_KDBUS (kdbus), -1);
325   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read);
326
327   if (g_simple_async_result_propagate_error (simple, error))
328       return -1;
329   else
330     return g_simple_async_result_get_op_res_gssize (simple);
331 }
332
333 static gssize
334 _g_socket_read_with_control_messages_finish (GSocket       *socket,
335                                              GAsyncResult  *result,
336                                              GError       **error)
337 {
338   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
339
340   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
341   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
342
343   if (g_simple_async_result_propagate_error (simple, error))
344       return -1;
345   else
346     return g_simple_async_result_get_op_res_gssize (simple);
347 }
348
349 /* ---------------------------------------------------------------------------------------------------- */
350
351 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
352
353 static GPtrArray *ensured_classes = NULL;
354
355 static void
356 ensure_type (GType gtype)
357 {
358   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
359 }
360
361 static void
362 release_required_types (void)
363 {
364   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
365   g_ptr_array_unref (ensured_classes);
366   ensured_classes = NULL;
367 }
368
369 static void
370 ensure_required_types (void)
371 {
372   g_assert (ensured_classes == NULL);
373   ensured_classes = g_ptr_array_new ();
374   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
375   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
376 }
377 /* ---------------------------------------------------------------------------------------------------- */
378
379 typedef struct
380 {
381   volatile gint refcount;
382   GThread *thread;
383   GMainContext *context;
384   GMainLoop *loop;
385 } SharedThreadData;
386
387 static gpointer
388 gdbus_shared_thread_func (gpointer user_data)
389 {
390   SharedThreadData *data = user_data;
391
392   g_main_context_push_thread_default (data->context);
393   g_main_loop_run (data->loop);
394   g_main_context_pop_thread_default (data->context);
395
396   release_required_types ();
397
398   return NULL;
399 }
400
401 /* ---------------------------------------------------------------------------------------------------- */
402
403 static SharedThreadData *
404 _g_dbus_shared_thread_ref (void)
405 {
406   static gsize shared_thread_data = 0;
407   SharedThreadData *ret;
408
409   if (g_once_init_enter (&shared_thread_data))
410     {
411       SharedThreadData *data;
412
413       /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
414       ensure_required_types ();
415
416       data = g_new0 (SharedThreadData, 1);
417       data->refcount = 0;
418       
419       data->context = g_main_context_new ();
420       data->loop = g_main_loop_new (data->context, FALSE);
421       data->thread = g_thread_new ("gdbus",
422                                    gdbus_shared_thread_func,
423                                    data);
424       /* We can cast between gsize and gpointer safely */
425       g_once_init_leave (&shared_thread_data, (gsize) data);
426     }
427
428   ret = (SharedThreadData*) shared_thread_data;
429   g_atomic_int_inc (&ret->refcount);
430   return ret;
431 }
432
433 static void
434 _g_dbus_shared_thread_unref (SharedThreadData *data)
435 {
436   /* TODO: actually destroy the shared thread here */
437 #if 0
438   g_assert (data != NULL);
439   if (g_atomic_int_dec_and_test (&data->refcount))
440     {
441       g_main_loop_quit (data->loop);
442       //g_thread_join (data->thread);
443       g_main_loop_unref (data->loop);
444       g_main_context_unref (data->context);
445     }
446 #endif
447 }
448
449 /* ---------------------------------------------------------------------------------------------------- */
450
451 typedef enum {
452     PENDING_NONE = 0,
453     PENDING_WRITE,
454     PENDING_FLUSH,
455     PENDING_CLOSE
456 } OutputPending;
457
458 struct GDBusWorker
459 {
460   volatile gint                       ref_count;
461
462   SharedThreadData                   *shared_thread_data;
463
464   /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
465   volatile gint                       stopped;
466
467   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
468    * only affects messages received from the other peer (since GDBusServer is the
469    * only user) - we might want it to affect messages sent to the other peer too?
470    */
471   gboolean                            frozen;
472   GDBusCapabilityFlags                capabilities;
473   GQueue                             *received_messages_while_frozen;
474
475   GIOStream                          *stream;
476   GCancellable                       *cancellable;
477   GDBusWorkerMessageReceivedCallback  message_received_callback;
478   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
479   GDBusWorkerDisconnectedCallback     disconnected_callback;
480   gpointer                            user_data;
481
482   /* if GSocket and GKdbus are NULL, stream is GSocketConnection */
483   GSocket *socket;
484   GKdbus  *kdbus;
485
486   /* used for reading */
487   GMutex                              read_lock;
488   gchar                              *read_buffer;
489   gsize                               read_buffer_allocated_size;
490   gsize                               read_buffer_cur_size;
491   gsize                               read_buffer_bytes_wanted;
492   GUnixFDList                        *read_fd_list;
493   GSocketControlMessage             **read_ancillary_messages;
494   gint                                read_num_ancillary_messages;
495
496   /* Whether an async write, flush or close, or none of those, is pending.
497    * Only the worker thread may change its value, and only with the write_lock.
498    * Other threads may read its value when holding the write_lock.
499    * The worker thread may read its value at any time.
500    */
501   OutputPending                       output_pending;
502   /* used for writing */
503   GMutex                              write_lock;
504   /* queue of MessageToWriteData, protected by write_lock */
505   GQueue                             *write_queue;
506   /* protected by write_lock */
507   guint64                             write_num_messages_written;
508   /* number of messages we'd written out last time we flushed;
509    * protected by write_lock
510    */
511   guint64                             write_num_messages_flushed;
512   /* list of FlushData, protected by write_lock */
513   GList                              *write_pending_flushes;
514   /* list of CloseData, protected by write_lock */
515   GList                              *pending_close_attempts;
516   /* no lock - only used from the worker thread */
517   gboolean                            close_expected;
518 };
519
520 static void _g_dbus_worker_unref (GDBusWorker *worker);
521
522 /* ---------------------------------------------------------------------------------------------------- */
523
524 typedef struct
525 {
526   GMutex  mutex;
527   GCond   cond;
528   guint64 number_to_wait_for;
529   GError *error;
530 } FlushData;
531
532 struct _MessageToWriteData ;
533 typedef struct _MessageToWriteData MessageToWriteData;
534
535 static void message_to_write_data_free (MessageToWriteData *data);
536
537 static void read_message_print_transport_debug (gssize bytes_read,
538                                                 GDBusWorker *worker);
539
540 static void write_message_print_transport_debug (gssize bytes_written,
541                                                  MessageToWriteData *data);
542
543 typedef struct {
544     GDBusWorker *worker;
545     GCancellable *cancellable;
546     GSimpleAsyncResult *result;
547 } CloseData;
548
549 static void close_data_free (CloseData *close_data)
550 {
551   if (close_data->cancellable != NULL)
552     g_object_unref (close_data->cancellable);
553
554   if (close_data->result != NULL)
555     g_object_unref (close_data->result);
556
557   _g_dbus_worker_unref (close_data->worker);
558   g_slice_free (CloseData, close_data);
559 }
560
561 /* ---------------------------------------------------------------------------------------------------- */
562
563 static GDBusWorker *
564 _g_dbus_worker_ref (GDBusWorker *worker)
565 {
566   g_atomic_int_inc (&worker->ref_count);
567   return worker;
568 }
569
570 static void
571 _g_dbus_worker_unref (GDBusWorker *worker)
572 {
573   if (g_atomic_int_dec_and_test (&worker->ref_count))
574     {
575       g_assert (worker->write_pending_flushes == NULL);
576
577       _g_dbus_shared_thread_unref (worker->shared_thread_data);
578
579       g_object_unref (worker->stream);
580
581       g_mutex_clear (&worker->read_lock);
582       g_object_unref (worker->cancellable);
583       if (worker->read_fd_list != NULL)
584         g_object_unref (worker->read_fd_list);
585
586       g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
587       g_mutex_clear (&worker->write_lock);
588       g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
589       g_free (worker->read_buffer);
590
591       g_free (worker);
592     }
593 }
594
595 static void
596 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
597                                   gboolean      remote_peer_vanished,
598                                   GError       *error)
599 {
600   if (!g_atomic_int_get (&worker->stopped))
601     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
602 }
603
604 static void
605 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
606                                       GDBusMessage *message)
607 {
608   if (!g_atomic_int_get (&worker->stopped))
609     worker->message_received_callback (worker, message, worker->user_data);
610 }
611
612 static GDBusMessage *
613 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
614                                               GDBusMessage *message)
615 {
616   GDBusMessage *ret;
617   if (!g_atomic_int_get (&worker->stopped))
618     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
619   else
620     ret = message;
621   return ret;
622 }
623
624 /* can only be called from private thread with read-lock held - takes ownership of @message */
625 void
626 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
627                                                   GDBusMessage *message)
628 {
629   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
630     {
631       /* queue up */
632       g_queue_push_tail (worker->received_messages_while_frozen, message);
633     }
634   else
635     {
636       /* not frozen, nor anything in queue */
637       _g_dbus_worker_emit_message_received (worker, message);
638       g_object_unref (message);
639     }
640 }
641
642 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
643 static gboolean
644 unfreeze_in_idle_cb (gpointer user_data)
645 {
646   GDBusWorker *worker = user_data;
647   GDBusMessage *message;
648
649   g_mutex_lock (&worker->read_lock);
650   if (worker->frozen)
651     {
652       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
653         {
654           _g_dbus_worker_emit_message_received (worker, message);
655           g_object_unref (message);
656         }
657       worker->frozen = FALSE;
658     }
659   else
660     {
661       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
662     }
663   g_mutex_unlock (&worker->read_lock);
664   return FALSE;
665 }
666
667 /* can be called from any thread */
668 void
669 _g_dbus_worker_unfreeze (GDBusWorker *worker)
670 {
671   GSource *idle_source;
672   idle_source = g_idle_source_new ();
673   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
674   g_source_set_callback (idle_source,
675                          unfreeze_in_idle_cb,
676                          _g_dbus_worker_ref (worker),
677                          (GDestroyNotify) _g_dbus_worker_unref);
678   g_source_attach (idle_source, worker->shared_thread_data->context);
679   g_source_unref (idle_source);
680 }
681
682 /* ---------------------------------------------------------------------------------------------------- */
683
684 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
685
686 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
687 static void
688 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
689                            GAsyncResult  *res,
690                            gpointer       user_data)
691 {
692   GDBusWorker *worker = user_data;
693   GError *error;
694   gssize bytes_read;
695
696   g_mutex_lock (&worker->read_lock);
697
698   /* If already stopped, don't even process the reply */
699   if (g_atomic_int_get (&worker->stopped))
700     goto out;
701
702   error = NULL;
703   if (G_IS_KDBUS_CONNECTION (worker->stream))
704     {
705       bytes_read = _g_kdbus_read_finish (worker->kdbus,
706                                          res,
707                                          &error);
708   } else if (worker->socket == NULL)
709     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
710                                              res,
711                                              &error);
712   else
713     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
714                                                               res,
715                                                               &error);
716   if (worker->read_num_ancillary_messages > 0)
717     {
718       gint n;
719       for (n = 0; n < worker->read_num_ancillary_messages; n++)
720         {
721           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
722
723           if (FALSE)
724             {
725             }
726 #ifdef G_OS_UNIX
727           else if (G_IS_UNIX_FD_MESSAGE (control_message))
728             {
729               GUnixFDMessage *fd_message;
730               gint *fds;
731               gint num_fds;
732
733               fd_message = G_UNIX_FD_MESSAGE (control_message);
734               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
735               if (worker->read_fd_list == NULL)
736                 {
737                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
738                 }
739               else
740                 {
741                   gint n;
742                   for (n = 0; n < num_fds; n++)
743                     {
744                       /* TODO: really want a append_steal() */
745                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
746                       (void) g_close (fds[n], NULL);
747                     }
748                 }
749               g_free (fds);
750             }
751           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
752             {
753               /* do nothing */
754             }
755 #endif
756           else
757             {
758               if (error == NULL)
759                 {
760                   g_set_error (&error,
761                                G_IO_ERROR,
762                                G_IO_ERROR_FAILED,
763                                "Unexpected ancillary message of type %s received from peer",
764                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
765                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
766                   g_error_free (error);
767                   g_object_unref (control_message);
768                   n++;
769                   while (n < worker->read_num_ancillary_messages)
770                     g_object_unref (worker->read_ancillary_messages[n++]);
771                   g_free (worker->read_ancillary_messages);
772                   goto out;
773                 }
774             }
775           g_object_unref (control_message);
776         }
777       g_free (worker->read_ancillary_messages);
778     }
779
780   if (bytes_read == -1)
781     {
782       if (G_UNLIKELY (_g_dbus_debug_transport ()))
783         {
784           _g_dbus_debug_print_lock ();
785           g_print ("========================================================================\n"
786                    "GDBus-debug:Transport:\n"
787                    "  ---- READ ERROR on stream of type %s:\n"
788                    "  ---- %s %d: %s\n",
789                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
790                    g_quark_to_string (error->domain), error->code,
791                    error->message);
792           _g_dbus_debug_print_unlock ();
793         }
794
795       /* Every async read that uses this callback uses worker->cancellable
796        * as its GCancellable. worker->cancellable gets cancelled if and only
797        * if the GDBusConnection tells us to close (either via
798        * _g_dbus_worker_stop, which is called on last-unref, or directly),
799        * so a cancelled read must mean our connection was closed locally.
800        *
801        * If we're closing, other errors are possible - notably,
802        * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
803        * read in-flight. It seems sensible to treat all read errors during
804        * closing as an expected thing that doesn't trip exit-on-close.
805        *
806        * Because close_expected can't be set until we get into the worker
807        * thread, but the cancellable is signalled sooner (from another
808        * thread), we do still need to check the error.
809        */
810       if (worker->close_expected ||
811           g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
812         _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
813       else
814         _g_dbus_worker_emit_disconnected (worker, TRUE, error);
815
816       g_error_free (error);
817       goto out;
818     }
819
820 #if 0
821   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
822            (gint) bytes_read,
823            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
824            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
825            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
826                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
827            worker->stream,
828            worker);
829 #endif
830
831   /* TODO: hmm, hmm... */
832   if (bytes_read == 0)
833     {
834       g_set_error (&error,
835                    G_IO_ERROR,
836                    G_IO_ERROR_FAILED,
837                    "Underlying GIOStream returned 0 bytes on an async read");
838       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
839       g_error_free (error);
840       goto out;
841     }
842
843   read_message_print_transport_debug (bytes_read, worker);
844
845   worker->read_buffer_cur_size += bytes_read;
846   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
847     {
848       /* OK, got what we asked for! */
849       if (worker->read_buffer_bytes_wanted == 16)
850         {
851           gssize message_len;
852           /* OK, got the header - determine how many more bytes are needed */
853           error = NULL;
854           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
855                                                      16,
856                                                      &error);
857           if (message_len == -1)
858             {
859               g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
860               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
861               g_error_free (error);
862               goto out;
863             }
864
865           worker->read_buffer_bytes_wanted = message_len;
866           _g_dbus_worker_do_read_unlocked (worker);
867         }
868       else
869         {
870           GDBusMessage *message;
871           error = NULL;
872
873           /* TODO: use connection->priv->auth to decode the message */
874
875           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
876                                                   worker->read_buffer_cur_size,
877                                                   worker->capabilities,
878                                                   &error);
879           if (message == NULL)
880             {
881               gchar *s;
882               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
883               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
884                          "The error is: %s\n"
885                          "The payload is as follows:\n"
886                          "%s\n",
887                          worker->read_buffer_cur_size,
888                          error->message,
889                          s);
890               g_free (s);
891               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
892               g_error_free (error);
893               goto out;
894             }
895
896 #ifdef G_OS_UNIX
897           if (worker->read_fd_list != NULL)
898             {
899               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
900               g_object_unref (worker->read_fd_list);
901               worker->read_fd_list = NULL;
902             }
903 #endif
904
905           if (G_UNLIKELY (_g_dbus_debug_message ()))
906             {
907               gchar *s;
908               _g_dbus_debug_print_lock ();
909               g_print ("========================================================================\n"
910                        "GDBus-debug:Message:\n"
911                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
912                        worker->read_buffer_cur_size);
913               s = g_dbus_message_print (message, 2);
914               g_print ("%s", s);
915               g_free (s);
916               if (G_UNLIKELY (_g_dbus_debug_payload ()))
917                 {
918                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
919                   g_print ("%s\n", s);
920                   g_free (s);
921                 }
922               _g_dbus_debug_print_unlock ();
923             }
924
925           /* yay, got a message, go deliver it */
926           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
927
928           /* start reading another message! */
929           worker->read_buffer_bytes_wanted = 0;
930           worker->read_buffer_cur_size = 0;
931           _g_dbus_worker_do_read_unlocked (worker);
932         }
933     }
934   else
935     {
936       /* didn't get all the bytes we requested - so repeat the request... */
937       _g_dbus_worker_do_read_unlocked (worker);
938     }
939
940  out:
941   g_mutex_unlock (&worker->read_lock);
942
943   /* gives up the reference acquired when calling g_input_stream_read_async() */
944   _g_dbus_worker_unref (worker);
945 }
946
947 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
948 static void
949 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
950 {
951   /* Note that we do need to keep trying to read even if close_expected is
952    * true, because only failing a read causes us to signal 'closed'.
953    */
954
955   /* if bytes_wanted is zero, it means start reading a message */
956   if (worker->read_buffer_bytes_wanted == 0)
957     {
958       worker->read_buffer_cur_size = 0;
959       worker->read_buffer_bytes_wanted = 16;
960     }
961
962   /* ensure we have a (big enough) buffer */
963   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
964     {
965       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
966       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
967       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
968     }
969
970   if (G_IS_KDBUS_CONNECTION (worker->stream))
971     {
972       //GError *error;
973       //error = NULL;
974
975       
976       _g_kdbus_read(worker->kdbus, 
977                     worker->read_buffer,
978                     worker->read_buffer_bytes_wanted,
979                                             worker->cancellable,
980                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
981                                             _g_dbus_worker_ref (worker));
982       
983
984   } else if (worker->socket == NULL)
985     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
986                                worker->read_buffer + worker->read_buffer_cur_size,
987                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
988                                G_PRIORITY_DEFAULT,
989                                worker->cancellable,
990                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
991                                _g_dbus_worker_ref (worker));
992   else
993     {
994       worker->read_ancillary_messages = NULL;
995       worker->read_num_ancillary_messages = 0;
996       _g_socket_read_with_control_messages (worker->socket,
997                                             worker->read_buffer + worker->read_buffer_cur_size,
998                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
999                                             &worker->read_ancillary_messages,
1000                                             &worker->read_num_ancillary_messages,
1001                                             G_PRIORITY_DEFAULT,
1002                                             worker->cancellable,
1003                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
1004                                             _g_dbus_worker_ref (worker));
1005     }
1006 }
1007
1008 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
1009 static gboolean
1010 _g_dbus_worker_do_initial_read (gpointer data)
1011 {
1012   //GDBusWorker *worker = data;
1013   //g_mutex_lock (&worker->read_lock);
1014   //_g_dbus_worker_do_read_unlocked (worker);
1015   //g_mutex_unlock (&worker->read_lock);
1016   return FALSE;
1017 }
1018
1019 /* ---------------------------------------------------------------------------------------------------- */
1020
1021 struct _MessageToWriteData
1022 {
1023   GDBusWorker  *worker;
1024   GDBusMessage *message;
1025   gchar        *blob;
1026   gsize         blob_size;
1027
1028   gsize               total_written;
1029   GSimpleAsyncResult *simple;
1030
1031 };
1032
1033 static void
1034 message_to_write_data_free (MessageToWriteData *data)
1035 {
1036   _g_dbus_worker_unref (data->worker);
1037   if (data->message)
1038     g_object_unref (data->message);
1039   g_free (data->blob);
1040   g_free (data);
1041 }
1042
1043 /* ---------------------------------------------------------------------------------------------------- */
1044
1045 static void write_message_continue_writing (MessageToWriteData *data);
1046
1047 /* called in private thread shared by all GDBusConnection instances
1048  *
1049  * write-lock is not held on entry
1050  * output_pending is PENDING_WRITE on entry
1051  */
1052 static void
1053 write_message_async_cb (GObject      *source_object,
1054                         GAsyncResult *res,
1055                         gpointer      user_data)
1056 {
1057   MessageToWriteData *data = user_data;
1058   GSimpleAsyncResult *simple;
1059   gssize bytes_written;
1060   GError *error;
1061
1062   /* Note: we can't access data->simple after calling g_async_result_complete () because the
1063    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
1064    */
1065   simple = data->simple;
1066
1067   error = NULL;
1068   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
1069                                                 res,
1070                                                 &error);
1071   if (bytes_written == -1)
1072     {
1073       g_simple_async_result_take_error (simple, error);
1074       g_simple_async_result_complete (simple);
1075       g_object_unref (simple);
1076       goto out;
1077     }
1078   g_assert (bytes_written > 0); /* zero is never returned */
1079
1080   write_message_print_transport_debug (bytes_written, data);
1081
1082   data->total_written += bytes_written;
1083   g_assert (data->total_written <= data->blob_size);
1084   if (data->total_written == data->blob_size)
1085     {
1086       g_simple_async_result_complete (simple);
1087       g_object_unref (simple);
1088       goto out;
1089     }
1090
1091   write_message_continue_writing (data);
1092
1093  out:
1094   ;
1095 }
1096
1097 /* called in private thread shared by all GDBusConnection instances
1098  *
1099  * write-lock is not held on entry
1100  * output_pending is PENDING_WRITE on entry
1101  */
1102 #ifdef G_OS_UNIX
1103 static gboolean
1104 on_socket_ready (GSocket      *socket,
1105                  GIOCondition  condition,
1106                  gpointer      user_data)
1107 {
1108   MessageToWriteData *data = user_data;
1109   write_message_continue_writing (data);
1110   return FALSE; /* remove source */
1111 }
1112 #endif
1113
1114 /* called in private thread shared by all GDBusConnection instances
1115  *
1116  * write-lock is not held on entry
1117  * output_pending is PENDING_WRITE on entry
1118  */
1119 static void
1120 write_message_continue_writing (MessageToWriteData *data)
1121 {
1122 #ifdef G_OS_UNIX
1123   GSimpleAsyncResult *simple;
1124   simple = data->simple;
1125 #endif
1126
1127   if (G_IS_KDBUS_CONNECTION (data->worker->stream))
1128     {
1129       GError *error;
1130       error = NULL;
1131       data->total_written = g_kdbus_send_message(data->worker, data->worker->kdbus, data->message, data->blob, data->blob_size, &error);
1132       
1133       if (data->total_written == data->blob_size)
1134         {
1135           g_simple_async_result_complete (simple);
1136           g_object_unref (simple);
1137           goto out;
1138         }
1139     }
1140   else
1141     {
1142       GOutputStream *ostream;
1143 #ifdef G_OS_UNIX
1144       GUnixFDList *fd_list;
1145 #endif
1146
1147       ostream = g_io_stream_get_output_stream (data->worker->stream);
1148 #ifdef G_OS_UNIX
1149       fd_list = g_dbus_message_get_unix_fd_list (data->message);
1150 #endif
1151
1152       g_assert (!g_output_stream_has_pending (ostream));
1153       g_assert_cmpint (data->total_written, <, data->blob_size);
1154
1155       if (FALSE)
1156         {
1157         }
1158 #ifdef G_OS_UNIX
1159       else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
1160         {
1161            GOutputVector vector;
1162            GSocketControlMessage *control_message;
1163            gssize bytes_written;
1164            GError *error;
1165
1166            vector.buffer = data->blob;
1167            vector.size = data->blob_size;
1168
1169            control_message = NULL;
1170            if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
1171              {
1172                if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
1173                  {
1174                    g_simple_async_result_set_error (simple,
1175                                                     G_IO_ERROR,
1176                                                     G_IO_ERROR_FAILED,
1177                                                     "Tried sending a file descriptor but remote peer does not support this capability");
1178                    g_simple_async_result_complete (simple);
1179                    g_object_unref (simple);
1180                    goto out;
1181                  }
1182                 control_message = g_unix_fd_message_new_with_fd_list (fd_list);
1183               }
1184
1185             error = NULL;
1186             bytes_written = g_socket_send_message (data->worker->socket,
1187                                                   NULL, /* address */
1188                                                   &vector,
1189                                                   1,
1190                                                   control_message != NULL ? &control_message : NULL,
1191                                                   control_message != NULL ? 1 : 0,
1192                                                   G_SOCKET_MSG_NONE,
1193                                                   data->worker->cancellable,
1194                                                   &error);
1195            if (control_message != NULL)
1196              g_object_unref (control_message);
1197
1198            if (bytes_written == -1)
1199              {
1200                /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1201                if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1202                  {
1203                    GSource *source;
1204                    source = g_socket_create_source (data->worker->socket,
1205                                                     G_IO_OUT | G_IO_HUP | G_IO_ERR,
1206                                                     data->worker->cancellable);
1207                    g_source_set_callback (source,
1208                                           (GSourceFunc) on_socket_ready,
1209                                           data,
1210                                           NULL); /* GDestroyNotify */
1211                    g_source_attach (source, g_main_context_get_thread_default ());
1212                    g_source_unref (source);
1213                    g_error_free (error);
1214                    goto out;
1215                  }
1216                g_simple_async_result_take_error (simple, error);
1217                g_simple_async_result_complete (simple);
1218                g_object_unref (simple);
1219                goto out;
1220              }
1221            g_assert (bytes_written > 0); /* zero is never returned */
1222
1223            write_message_print_transport_debug (bytes_written, data);
1224
1225            data->total_written += bytes_written;
1226            g_assert (data->total_written <= data->blob_size);
1227            if (data->total_written == data->blob_size)
1228              {
1229                g_simple_async_result_complete (simple);
1230                g_object_unref (simple);
1231                goto out;
1232              }
1233
1234            write_message_continue_writing (data);
1235          }
1236 #endif
1237        else
1238          {
1239 #ifdef G_OS_UNIX
1240            if (fd_list != NULL)
1241              {
1242                g_simple_async_result_set_error (simple,
1243                                                 G_IO_ERROR,
1244                                                 G_IO_ERROR_FAILED,
1245                                                 "Tried sending a file descriptor on unsupported stream of type %s",
1246                                                 g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1247                g_simple_async_result_complete (simple);
1248                g_object_unref (simple);
1249                goto out;
1250              }
1251 #endif
1252
1253             g_output_stream_write_async (ostream,
1254                                         (const gchar *) data->blob + data->total_written,
1255                                         data->blob_size - data->total_written,
1256                                         G_PRIORITY_DEFAULT,
1257                                         data->worker->cancellable,
1258                                         write_message_async_cb,
1259                                         data);
1260          }
1261     }
1262 #ifdef G_OS_UNIX
1263  out:
1264 #endif
1265   ;
1266 }
1267
1268 /* called in private thread shared by all GDBusConnection instances
1269  *
1270  * write-lock is not held on entry
1271  * output_pending is PENDING_WRITE on entry
1272  */
1273 static void
1274 write_message_async (GDBusWorker         *worker,
1275                      MessageToWriteData  *data,
1276                      GAsyncReadyCallback  callback,
1277                      gpointer             user_data)
1278 {
1279   data->simple = g_simple_async_result_new (NULL,
1280                                             callback,
1281                                             user_data,
1282                                             write_message_async);
1283   data->total_written = 0;
1284   write_message_continue_writing (data);
1285 }
1286
1287 /* called in private thread shared by all GDBusConnection instances (with write-lock held) */
1288 static gboolean
1289 write_message_finish (GAsyncResult   *res,
1290                       GError        **error)
1291 {
1292   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1293   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1294     return FALSE;
1295   else
1296     return TRUE;
1297 }
1298 /* ---------------------------------------------------------------------------------------------------- */
1299
1300 static void continue_writing (GDBusWorker *worker);
1301
1302 typedef struct
1303 {
1304   GDBusWorker *worker;
1305   GList *flushers;
1306 } FlushAsyncData;
1307
1308 static void
1309 flush_data_list_complete (const GList  *flushers,
1310                           const GError *error)
1311 {
1312   const GList *l;
1313
1314   for (l = flushers; l != NULL; l = l->next)
1315     {
1316       FlushData *f = l->data;
1317
1318       f->error = error != NULL ? g_error_copy (error) : NULL;
1319
1320       g_mutex_lock (&f->mutex);
1321       g_cond_signal (&f->cond);
1322       g_mutex_unlock (&f->mutex);
1323     }
1324 }
1325
1326 /* called in private thread shared by all GDBusConnection instances
1327  *
1328  * write-lock is not held on entry
1329  * output_pending is PENDING_FLUSH on entry
1330  */
1331 static void
1332 ostream_flush_cb (GObject      *source_object,
1333                   GAsyncResult *res,
1334                   gpointer      user_data)
1335 {
1336   FlushAsyncData *data = user_data;
1337   GError *error;
1338
1339   error = NULL;
1340   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1341                                 res,
1342                                 &error);
1343
1344   if (error == NULL)
1345     {
1346       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1347         {
1348           _g_dbus_debug_print_lock ();
1349           g_print ("========================================================================\n"
1350                    "GDBus-debug:Transport:\n"
1351                    "  ---- FLUSHED stream of type %s\n",
1352                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1353           _g_dbus_debug_print_unlock ();
1354         }
1355     }
1356
1357   g_assert (data->flushers != NULL);
1358   flush_data_list_complete (data->flushers, error);
1359   g_list_free (data->flushers);
1360
1361   if (error != NULL)
1362     g_error_free (error);
1363
1364   /* Make sure we tell folks that we don't have additional
1365      flushes pending */
1366   g_mutex_lock (&data->worker->write_lock);
1367   data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
1368   g_assert (data->worker->output_pending == PENDING_FLUSH);
1369   data->worker->output_pending = PENDING_NONE;
1370   g_mutex_unlock (&data->worker->write_lock);
1371
1372   /* OK, cool, finally kick off the next write */
1373   continue_writing (data->worker);
1374
1375   _g_dbus_worker_unref (data->worker);
1376   g_free (data);
1377 }
1378
1379 /* called in private thread shared by all GDBusConnection instances
1380  *
1381  * write-lock is not held on entry
1382  * output_pending is PENDING_FLUSH on entry
1383  */
1384 static void
1385 start_flush (FlushAsyncData *data)
1386 {
1387   g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
1388                                G_PRIORITY_DEFAULT,
1389                                data->worker->cancellable,
1390                                ostream_flush_cb,
1391                                data);
1392 }
1393
1394 /* called in private thread shared by all GDBusConnection instances
1395  *
1396  * write-lock is held on entry
1397  * output_pending is PENDING_NONE on entry
1398  */
1399 static void
1400 message_written_unlocked (GDBusWorker *worker,
1401                           MessageToWriteData *message_data)
1402 {
1403   if (G_UNLIKELY (_g_dbus_debug_message ()))
1404     {
1405       gchar *s;
1406       _g_dbus_debug_print_lock ();
1407       g_print ("========================================================================\n"
1408                "GDBus-debug:Message:\n"
1409                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1410                message_data->blob_size);
1411       s = g_dbus_message_print (message_data->message, 2);
1412       g_print ("%s", s);
1413       g_free (s);
1414       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1415         {
1416           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1417           g_print ("%s\n", s);
1418           g_free (s);
1419         }
1420       _g_dbus_debug_print_unlock ();
1421     }
1422
1423   worker->write_num_messages_written += 1;
1424 }
1425
1426 /* called in private thread shared by all GDBusConnection instances
1427  *
1428  * write-lock is held on entry
1429  * output_pending is PENDING_NONE on entry
1430  *
1431  * Returns: non-%NULL, setting @output_pending, if we need to flush now
1432  */
1433 static FlushAsyncData *
1434 prepare_flush_unlocked (GDBusWorker *worker)
1435 {
1436   GList *l;
1437   GList *ll;
1438   GList *flushers;
1439
1440   flushers = NULL;
1441   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1442     {
1443       FlushData *f = l->data;
1444       ll = l->next;
1445
1446       if (f->number_to_wait_for == worker->write_num_messages_written)
1447         {
1448           flushers = g_list_append (flushers, f);
1449           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1450         }
1451     }
1452   if (flushers != NULL)
1453     {
1454       g_assert (worker->output_pending == PENDING_NONE);
1455       worker->output_pending = PENDING_FLUSH;
1456     }
1457
1458   if (flushers != NULL)
1459     {
1460       FlushAsyncData *data;
1461
1462       data = g_new0 (FlushAsyncData, 1);
1463       data->worker = _g_dbus_worker_ref (worker);
1464       data->flushers = flushers;
1465       return data;
1466     }
1467
1468   return NULL;
1469 }
1470
1471 /* called in private thread shared by all GDBusConnection instances
1472  *
1473  * write-lock is not held on entry
1474  * output_pending is PENDING_WRITE on entry
1475  */
1476 static void
1477 write_message_cb (GObject       *source_object,
1478                   GAsyncResult  *res,
1479                   gpointer       user_data)
1480 {
1481   MessageToWriteData *data = user_data;
1482   GError *error;
1483
1484   g_mutex_lock (&data->worker->write_lock);
1485   g_assert (data->worker->output_pending == PENDING_WRITE);
1486   data->worker->output_pending = PENDING_NONE;
1487
1488   error = NULL;
1489   if (!write_message_finish (res, &error))
1490     {
1491       g_mutex_unlock (&data->worker->write_lock);
1492
1493       /* TODO: handle */
1494       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1495       g_error_free (error);
1496
1497       g_mutex_lock (&data->worker->write_lock);
1498     }
1499
1500   message_written_unlocked (data->worker, data);
1501
1502   g_mutex_unlock (&data->worker->write_lock);
1503
1504   continue_writing (data->worker);
1505
1506   message_to_write_data_free (data);
1507 }
1508
1509 /* called in private thread shared by all GDBusConnection instances
1510  *
1511  * write-lock is not held on entry
1512  * output_pending is PENDING_CLOSE on entry
1513  */
1514 static void
1515 iostream_close_cb (GObject      *source_object,
1516                    GAsyncResult *res,
1517                    gpointer      user_data)
1518 {
1519   GDBusWorker *worker = user_data;
1520   GError *error = NULL;
1521   GList *pending_close_attempts, *pending_flush_attempts;
1522   GQueue *send_queue;
1523
1524   g_io_stream_close_finish (worker->stream, res, &error);
1525
1526   g_mutex_lock (&worker->write_lock);
1527
1528   pending_close_attempts = worker->pending_close_attempts;
1529   worker->pending_close_attempts = NULL;
1530
1531   pending_flush_attempts = worker->write_pending_flushes;
1532   worker->write_pending_flushes = NULL;
1533
1534   send_queue = worker->write_queue;
1535   worker->write_queue = g_queue_new ();
1536
1537   g_assert (worker->output_pending == PENDING_CLOSE);
1538   worker->output_pending = PENDING_NONE;
1539
1540   g_mutex_unlock (&worker->write_lock);
1541
1542   while (pending_close_attempts != NULL)
1543     {
1544       CloseData *close_data = pending_close_attempts->data;
1545
1546       pending_close_attempts = g_list_delete_link (pending_close_attempts,
1547                                                    pending_close_attempts);
1548
1549       if (close_data->result != NULL)
1550         {
1551           if (error != NULL)
1552             g_simple_async_result_set_from_error (close_data->result, error);
1553
1554           /* this must be in an idle because the result is likely to be
1555            * intended for another thread
1556            */
1557           g_simple_async_result_complete_in_idle (close_data->result);
1558         }
1559
1560       close_data_free (close_data);
1561     }
1562
1563   g_clear_error (&error);
1564
1565   /* all messages queued for sending are discarded */
1566   g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
1567   /* all queued flushes fail */
1568   error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
1569                        _("Operation was cancelled"));
1570   flush_data_list_complete (pending_flush_attempts, error);
1571   g_list_free (pending_flush_attempts);
1572   g_clear_error (&error);
1573
1574   _g_dbus_worker_unref (worker);
1575 }
1576
1577 /* called in private thread shared by all GDBusConnection instances
1578  *
1579  * write-lock is not held on entry
1580  * output_pending must be PENDING_NONE on entry
1581  */
1582 static void
1583 continue_writing (GDBusWorker *worker)
1584 {
1585   MessageToWriteData *data;
1586   FlushAsyncData *flush_async_data;
1587
1588  write_next:
1589   /* we mustn't try to write two things at once */
1590   g_assert (worker->output_pending == PENDING_NONE);
1591
1592   g_mutex_lock (&worker->write_lock);
1593
1594   data = NULL;
1595   flush_async_data = NULL;
1596
1597   /* if we want to close the connection, that takes precedence */
1598   if (worker->pending_close_attempts != NULL)
1599     {
1600       worker->close_expected = TRUE;
1601       worker->output_pending = PENDING_CLOSE;
1602
1603       g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
1604                                NULL, iostream_close_cb,
1605                                _g_dbus_worker_ref (worker));
1606     }
1607   else
1608     {
1609       flush_async_data = prepare_flush_unlocked (worker);
1610
1611       if (flush_async_data == NULL)
1612         {
1613           data = g_queue_pop_head (worker->write_queue);
1614
1615           if (data != NULL)
1616             worker->output_pending = PENDING_WRITE;
1617         }
1618     }
1619
1620   g_mutex_unlock (&worker->write_lock);
1621
1622   /* Note that write_lock is only used for protecting the @write_queue
1623    * and @output_pending fields of the GDBusWorker struct ... which we
1624    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1625    *
1626    * Therefore, it's fine to drop it here when calling back into user
1627    * code and then writing the message out onto the GIOStream since this
1628    * function only runs on the worker thread.
1629    */
1630
1631   if (flush_async_data != NULL)
1632     {
1633       start_flush (flush_async_data);
1634       g_assert (data == NULL);
1635     }
1636   else if (data != NULL)
1637     {
1638       GDBusMessage *old_message;
1639       guchar *new_blob;
1640       gsize new_blob_size;
1641       GError *error;
1642
1643       old_message = data->message;
1644       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1645       if (data->message == old_message)
1646         {
1647           /* filters had no effect - do nothing */
1648         }
1649       else if (data->message == NULL)
1650         {
1651           /* filters dropped message */
1652           g_mutex_lock (&worker->write_lock);
1653           worker->output_pending = PENDING_NONE;
1654           g_mutex_unlock (&worker->write_lock);
1655           message_to_write_data_free (data);
1656           goto write_next;
1657         }
1658       else
1659         {
1660           /* filters altered the message -> reencode */
1661           error = NULL;
1662           new_blob = g_dbus_message_to_blob (data->message,
1663                                              &new_blob_size,
1664                                              worker->capabilities,
1665                                              &error);
1666           if (new_blob == NULL)
1667             {
1668               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1669                * the old message instead
1670                */
1671               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1672                          g_dbus_message_get_serial (data->message),
1673                          error->message);
1674               g_error_free (error);
1675             }
1676           else
1677             {
1678               g_free (data->blob);
1679               data->blob = (gchar *) new_blob;
1680               data->blob_size = new_blob_size;
1681             }
1682         }
1683
1684       write_message_async (worker,
1685                            data,
1686                            write_message_cb,
1687                            data);
1688     }
1689 }
1690
1691 /* called in private thread shared by all GDBusConnection instances
1692  *
1693  * write-lock is not held on entry
1694  * output_pending may be anything
1695  */
1696 static gboolean
1697 continue_writing_in_idle_cb (gpointer user_data)
1698 {
1699   GDBusWorker *worker = user_data;
1700
1701   /* Because this is the worker thread, we can read this struct member
1702    * without holding the lock: no other thread ever modifies it.
1703    */
1704   if (worker->output_pending == PENDING_NONE)
1705     continue_writing (worker);
1706
1707   return FALSE;
1708 }
1709
1710 /*
1711  * @write_data: (transfer full) (allow-none):
1712  * @flush_data: (transfer full) (allow-none):
1713  * @close_data: (transfer full) (allow-none):
1714  *
1715  * Can be called from any thread
1716  *
1717  * write_lock is held on entry
1718  * output_pending may be anything
1719  */
1720 static void
1721 schedule_writing_unlocked (GDBusWorker        *worker,
1722                            MessageToWriteData *write_data,
1723                            FlushData          *flush_data,
1724                            CloseData          *close_data)
1725 {
1726   if (write_data != NULL)
1727     g_queue_push_tail (worker->write_queue, write_data);
1728
1729   if (flush_data != NULL)
1730     worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
1731
1732   if (close_data != NULL)
1733     worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
1734                                                      close_data);
1735
1736   /* If we had output pending, the next bit of output will happen
1737    * automatically when it finishes, so we only need to do this
1738    * if nothing was pending.
1739    *
1740    * The idle callback will re-check that output_pending is still
1741    * PENDING_NONE, to guard against output starting before the idle.
1742    */
1743   if (worker->output_pending == PENDING_NONE)
1744     {
1745       GSource *idle_source;
1746       idle_source = g_idle_source_new ();
1747       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1748       g_source_set_callback (idle_source,
1749                              continue_writing_in_idle_cb,
1750                              _g_dbus_worker_ref (worker),
1751                              (GDestroyNotify) _g_dbus_worker_unref);
1752       g_source_attach (idle_source, worker->shared_thread_data->context);
1753       g_source_unref (idle_source);
1754     }
1755 }
1756
1757 /* ---------------------------------------------------------------------------------------------------- */
1758
1759 /* can be called from any thread - steals blob
1760  *
1761  * write_lock is not held on entry
1762  * output_pending may be anything
1763  */
1764 void
1765 _g_dbus_worker_send_message (GDBusWorker    *worker,
1766                              GDBusMessage   *message,
1767                              gchar          *blob,
1768                              gsize           blob_len)
1769 {
1770   MessageToWriteData *data;
1771
1772   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1773   g_return_if_fail (blob != NULL);
1774   g_return_if_fail (blob_len > 16);
1775
1776   data = g_new0 (MessageToWriteData, 1);
1777   data->worker = _g_dbus_worker_ref (worker);
1778   data->message = g_object_ref (message);
1779   data->blob = blob; /* steal! */
1780   data->blob_size = blob_len;
1781
1782   g_mutex_lock (&worker->write_lock);
1783   schedule_writing_unlocked (worker, data, NULL, NULL);
1784   g_mutex_unlock (&worker->write_lock);
1785 }
1786
1787 /* ---------------------------------------------------------------------------------------------------- */
1788 GKdbus*        g_dbus_worker_get_kdbus    (GDBusWorker  *worker)
1789 {
1790   return worker->kdbus;
1791 }
1792
1793 GDBusWorker *
1794 _g_dbus_worker_new (GIOStream                              *stream,
1795                     GDBusCapabilityFlags                    capabilities,
1796                     gboolean                                initially_frozen,
1797                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1798                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1799                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1800                     gpointer                                user_data)
1801 {
1802   GDBusWorker *worker;
1803   GSource *idle_source;
1804
1805   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1806   g_return_val_if_fail (message_received_callback != NULL, NULL);
1807   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1808   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1809
1810   worker = g_new0 (GDBusWorker, 1);
1811   worker->ref_count = 1;
1812
1813   g_mutex_init (&worker->read_lock);
1814   worker->message_received_callback = message_received_callback;
1815   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1816   worker->disconnected_callback = disconnected_callback;
1817   worker->user_data = user_data;
1818   worker->stream = g_object_ref (stream);
1819   worker->capabilities = capabilities;
1820   worker->cancellable = g_cancellable_new ();
1821   worker->output_pending = PENDING_NONE;
1822
1823   worker->frozen = initially_frozen;
1824   worker->received_messages_while_frozen = g_queue_new ();
1825
1826   g_mutex_init (&worker->write_lock);
1827   worker->write_queue = g_queue_new ();
1828
1829   if (G_IS_SOCKET_CONNECTION (worker->stream))
1830     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1831
1832   if (G_IS_KDBUS_CONNECTION (worker->stream))
1833     worker->kdbus = g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream));
1834
1835   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1836
1837   /* begin reading */
1838   idle_source = g_idle_source_new ();
1839   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1840   g_source_set_callback (idle_source,
1841                          _g_dbus_worker_do_initial_read,
1842                          _g_dbus_worker_ref (worker),
1843                          (GDestroyNotify) _g_dbus_worker_unref);
1844   g_source_attach (idle_source, worker->shared_thread_data->context);
1845   g_source_unref (idle_source);
1846
1847   return worker;
1848 }
1849
1850 /* ---------------------------------------------------------------------------------------------------- */
1851
1852 /* can be called from any thread
1853  *
1854  * write_lock is not held on entry
1855  * output_pending may be anything
1856  */
1857 void
1858 _g_dbus_worker_close (GDBusWorker         *worker,
1859                       GCancellable        *cancellable,
1860                       GSimpleAsyncResult  *result)
1861 {
1862   CloseData *close_data;
1863
1864   close_data = g_slice_new0 (CloseData);
1865   close_data->worker = _g_dbus_worker_ref (worker);
1866   close_data->cancellable =
1867       (cancellable == NULL ? NULL : g_object_ref (cancellable));
1868   close_data->result = (result == NULL ? NULL : g_object_ref (result));
1869
1870   /* Don't set worker->close_expected here - we're in the wrong thread.
1871    * It'll be set before the actual close happens.
1872    */
1873   g_cancellable_cancel (worker->cancellable);
1874   g_mutex_lock (&worker->write_lock);
1875   schedule_writing_unlocked (worker, NULL, NULL, close_data);
1876   g_mutex_unlock (&worker->write_lock);
1877 }
1878
1879 /* This can be called from any thread - frees worker. Note that
1880  * callbacks might still happen if called from another thread than the
1881  * worker - use your own synchronization primitive in the callbacks.
1882  *
1883  * write_lock is not held on entry
1884  * output_pending may be anything
1885  */
1886 void
1887 _g_dbus_worker_stop (GDBusWorker *worker)
1888 {
1889   g_atomic_int_set (&worker->stopped, TRUE);
1890
1891   /* Cancel any pending operations and schedule a close of the underlying I/O
1892    * stream in the worker thread
1893    */
1894   _g_dbus_worker_close (worker, NULL, NULL);
1895
1896   /* _g_dbus_worker_close holds a ref until after an idle in the worker
1897    * thread has run, so we no longer need to unref in an idle like in
1898    * commit 322e25b535
1899    */
1900   _g_dbus_worker_unref (worker);
1901 }
1902
1903 /* ---------------------------------------------------------------------------------------------------- */
1904
1905 /* can be called from any thread (except the worker thread) - blocks
1906  * calling thread until all queued outgoing messages are written and
1907  * the transport has been flushed
1908  *
1909  * write_lock is not held on entry
1910  * output_pending may be anything
1911  */
1912 gboolean
1913 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1914                            GCancellable   *cancellable,
1915                            GError        **error)
1916 {
1917   gboolean ret;
1918   FlushData *data;
1919   guint64 pending_writes;
1920
1921   data = NULL;
1922   ret = TRUE;
1923
1924   g_mutex_lock (&worker->write_lock);
1925
1926   /* if the queue is empty, no write is in-flight and we haven't written
1927    * anything since the last flush, then there's nothing to wait for
1928    */
1929   pending_writes = g_queue_get_length (worker->write_queue);
1930
1931   /* if a write is in-flight, we shouldn't be satisfied until the first
1932    * flush operation that follows it
1933    */
1934   if (worker->output_pending == PENDING_WRITE)
1935     pending_writes += 1;
1936
1937   if (pending_writes > 0 ||
1938       worker->write_num_messages_written != worker->write_num_messages_flushed)
1939     {
1940       data = g_new0 (FlushData, 1);
1941       g_mutex_init (&data->mutex);
1942       g_cond_init (&data->cond);
1943       data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
1944       g_mutex_lock (&data->mutex);
1945
1946       schedule_writing_unlocked (worker, NULL, data, NULL);
1947     }
1948   g_mutex_unlock (&worker->write_lock);
1949
1950   if (data != NULL)
1951     {
1952       g_cond_wait (&data->cond, &data->mutex);
1953       g_mutex_unlock (&data->mutex);
1954
1955       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1956       g_cond_clear (&data->cond);
1957       g_mutex_clear (&data->mutex);
1958       if (data->error != NULL)
1959         {
1960           ret = FALSE;
1961           g_propagate_error (error, data->error);
1962         }
1963       g_free (data);
1964     }
1965
1966   return ret;
1967 }
1968
1969 /* ---------------------------------------------------------------------------------------------------- */
1970
1971 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1972 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1973 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1974 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1975 #define G_DBUS_DEBUG_CALL           (1<<4)
1976 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1977 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1978 #define G_DBUS_DEBUG_RETURN         (1<<7)
1979 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1980 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1981
1982 static gint _gdbus_debug_flags = 0;
1983
1984 gboolean
1985 _g_dbus_debug_authentication (void)
1986 {
1987   _g_dbus_initialize ();
1988   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1989 }
1990
1991 gboolean
1992 _g_dbus_debug_transport (void)
1993 {
1994   _g_dbus_initialize ();
1995   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1996 }
1997
1998 gboolean
1999 _g_dbus_debug_message (void)
2000 {
2001   _g_dbus_initialize ();
2002   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
2003 }
2004
2005 gboolean
2006 _g_dbus_debug_payload (void)
2007 {
2008   _g_dbus_initialize ();
2009   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
2010 }
2011
2012 gboolean
2013 _g_dbus_debug_call (void)
2014 {
2015   _g_dbus_initialize ();
2016   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
2017 }
2018
2019 gboolean
2020 _g_dbus_debug_signal (void)
2021 {
2022   _g_dbus_initialize ();
2023   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
2024 }
2025
2026 gboolean
2027 _g_dbus_debug_incoming (void)
2028 {
2029   _g_dbus_initialize ();
2030   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
2031 }
2032
2033 gboolean
2034 _g_dbus_debug_return (void)
2035 {
2036   _g_dbus_initialize ();
2037   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
2038 }
2039
2040 gboolean
2041 _g_dbus_debug_emission (void)
2042 {
2043   _g_dbus_initialize ();
2044   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
2045 }
2046
2047 gboolean
2048 _g_dbus_debug_address (void)
2049 {
2050   _g_dbus_initialize ();
2051   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
2052 }
2053
2054 G_LOCK_DEFINE_STATIC (print_lock);
2055
2056 void
2057 _g_dbus_debug_print_lock (void)
2058 {
2059   G_LOCK (print_lock);
2060 }
2061
2062 void
2063 _g_dbus_debug_print_unlock (void)
2064 {
2065   G_UNLOCK (print_lock);
2066 }
2067
2068 /*
2069  * _g_dbus_initialize:
2070  *
2071  * Does various one-time init things such as
2072  *
2073  *  - registering the G_DBUS_ERROR error domain
2074  *  - parses the G_DBUS_DEBUG environment variable
2075  */
2076 void
2077 _g_dbus_initialize (void)
2078 {
2079   static volatile gsize initialized = 0;
2080
2081   if (g_once_init_enter (&initialized))
2082     {
2083       volatile GQuark g_dbus_error_domain;
2084       const gchar *debug;
2085
2086       g_dbus_error_domain = G_DBUS_ERROR;
2087       (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
2088
2089       debug = g_getenv ("G_DBUS_DEBUG");
2090       if (debug != NULL)
2091         {
2092           const GDebugKey keys[] = {
2093             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
2094             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
2095             { "message",        G_DBUS_DEBUG_MESSAGE        },
2096             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
2097             { "call",           G_DBUS_DEBUG_CALL           },
2098             { "signal",         G_DBUS_DEBUG_SIGNAL         },
2099             { "incoming",       G_DBUS_DEBUG_INCOMING       },
2100             { "return",         G_DBUS_DEBUG_RETURN         },
2101             { "emission",       G_DBUS_DEBUG_EMISSION       },
2102             { "address",        G_DBUS_DEBUG_ADDRESS        }
2103           };
2104
2105           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
2106           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
2107             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
2108         }
2109
2110       g_once_init_leave (&initialized, 1);
2111     }
2112 }
2113
2114 /* ---------------------------------------------------------------------------------------------------- */
2115
2116 GVariantType *
2117 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
2118 {
2119   const GVariantType *arg_types[256];
2120   guint n;
2121
2122   if (args)
2123     for (n = 0; args[n] != NULL; n++)
2124       {
2125         /* DBus places a hard limit of 255 on signature length.
2126          * therefore number of args must be less than 256.
2127          */
2128         g_assert (n < 256);
2129
2130         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
2131
2132         if G_UNLIKELY (arg_types[n] == NULL)
2133           return NULL;
2134       }
2135   else
2136     n = 0;
2137
2138   return g_variant_type_new_tuple (arg_types, n);
2139 }
2140
2141 /* ---------------------------------------------------------------------------------------------------- */
2142
2143 #ifdef G_OS_WIN32
2144
2145 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
2146
2147 gchar *
2148 _g_dbus_win32_get_user_sid (void)
2149 {
2150   HANDLE h;
2151   TOKEN_USER *user;
2152   DWORD token_information_len;
2153   PSID psid;
2154   gchar *sid;
2155   gchar *ret;
2156
2157   ret = NULL;
2158   user = NULL;
2159   h = INVALID_HANDLE_VALUE;
2160
2161   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
2162     {
2163       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
2164       goto out;
2165     }
2166
2167   /* Get length of buffer */
2168   token_information_len = 0;
2169   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
2170     {
2171       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
2172         {
2173           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2174           goto out;
2175         }
2176     }
2177   user = g_malloc (token_information_len);
2178   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
2179     {
2180       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2181       goto out;
2182     }
2183
2184   psid = user->User.Sid;
2185   if (!IsValidSid (psid))
2186     {
2187       g_warning ("Invalid SID");
2188       goto out;
2189     }
2190
2191   if (!ConvertSidToStringSidA (psid, &sid))
2192     {
2193       g_warning ("Invalid SID");
2194       goto out;
2195     }
2196
2197   ret = g_strdup (sid);
2198   LocalFree (sid);
2199
2200 out:
2201   g_free (user);
2202   if (h != INVALID_HANDLE_VALUE)
2203     CloseHandle (h);
2204   return ret;
2205 }
2206 #endif
2207
2208 /* ---------------------------------------------------------------------------------------------------- */
2209
2210 gchar *
2211 _g_dbus_get_machine_id (GError **error)
2212 {
2213 #ifdef G_OS_WIN32
2214   HW_PROFILE_INFOA info;
2215   char *src, *dest, *res;
2216   int i;
2217
2218   if (!GetCurrentHwProfileA (&info))
2219     {
2220       char *message = g_win32_error_message (GetLastError ());
2221       g_set_error (error,
2222                    G_IO_ERROR,
2223                    G_IO_ERROR_FAILED,
2224                    _("Unable to get Hardware profile: %s"), message);
2225       g_free (message);
2226       return NULL;
2227     }
2228
2229   /* Form: {12340001-4980-1920-6788-123456789012} */
2230   src = &info.szHwProfileGuid[0];
2231
2232   res = g_malloc (32+1);
2233   dest = res;
2234
2235   src++; /* Skip { */
2236   for (i = 0; i < 8; i++)
2237     *dest++ = *src++;
2238   src++; /* Skip - */
2239   for (i = 0; i < 4; i++)
2240     *dest++ = *src++;
2241   src++; /* Skip - */
2242   for (i = 0; i < 4; i++)
2243     *dest++ = *src++;
2244   src++; /* Skip - */
2245   for (i = 0; i < 4; i++)
2246     *dest++ = *src++;
2247   src++; /* Skip - */
2248   for (i = 0; i < 12; i++)
2249     *dest++ = *src++;
2250   *dest = 0;
2251
2252   return res;
2253 #else
2254   gchar *ret;
2255   GError *first_error;
2256   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
2257   ret = NULL;
2258   first_error = NULL;
2259   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
2260                             &ret,
2261                             NULL,
2262                             &first_error) &&
2263       !g_file_get_contents ("/etc/machine-id",
2264                             &ret,
2265                             NULL,
2266                             NULL))
2267     {
2268       g_propagate_prefixed_error (error, first_error,
2269                                   _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
2270     }
2271   else
2272     {
2273       /* ignore the error from the first try, if any */
2274       g_clear_error (&first_error);
2275       /* TODO: validate value */
2276       g_strstrip (ret);
2277     }
2278   return ret;
2279 #endif
2280 }
2281
2282 /* ---------------------------------------------------------------------------------------------------- */
2283
2284 gchar *
2285 _g_dbus_enum_to_string (GType enum_type, gint value)
2286 {
2287   gchar *ret;
2288   GEnumClass *klass;
2289   GEnumValue *enum_value;
2290
2291   klass = g_type_class_ref (enum_type);
2292   enum_value = g_enum_get_value (klass, value);
2293   if (enum_value != NULL)
2294     ret = g_strdup (enum_value->value_nick);
2295   else
2296     ret = g_strdup_printf ("unknown (value %d)", value);
2297   g_type_class_unref (klass);
2298   return ret;
2299 }
2300
2301 /* ---------------------------------------------------------------------------------------------------- */
2302
2303 static void
2304 write_message_print_transport_debug (gssize bytes_written,
2305                                      MessageToWriteData *data)
2306 {
2307   if (G_LIKELY (!_g_dbus_debug_transport ()))
2308     goto out;
2309
2310   _g_dbus_debug_print_lock ();
2311   g_print ("========================================================================\n"
2312            "GDBus-debug:Transport:\n"
2313            "  >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
2314            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
2315            bytes_written,
2316            g_dbus_message_get_serial (data->message),
2317            data->blob_size,
2318            data->total_written,
2319            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
2320   _g_dbus_debug_print_unlock ();
2321  out:
2322   ;
2323 }
2324
2325 /* ---------------------------------------------------------------------------------------------------- */
2326
2327 static void
2328 read_message_print_transport_debug (gssize bytes_read,
2329                                     GDBusWorker *worker)
2330 {
2331   gsize size;
2332   gint32 serial;
2333   gint32 message_length;
2334
2335   if (G_LIKELY (!_g_dbus_debug_transport ()))
2336     goto out;
2337
2338   size = bytes_read + worker->read_buffer_cur_size;
2339   serial = 0;
2340   message_length = 0;
2341   if (size >= 16)
2342     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
2343   if (size >= 1)
2344     {
2345       switch (worker->read_buffer[0])
2346         {
2347         case 'l':
2348           if (size >= 12)
2349             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
2350           break;
2351         case 'B':
2352           if (size >= 12)
2353             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
2354           break;
2355         default:
2356           /* an error will be set elsewhere if this happens */
2357           goto out;
2358         }
2359     }
2360
2361     _g_dbus_debug_print_lock ();
2362   g_print ("========================================================================\n"
2363            "GDBus-debug:Transport:\n"
2364            "  <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
2365            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
2366            bytes_read,
2367            serial,
2368            message_length,
2369            worker->read_buffer_cur_size,
2370            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
2371   _g_dbus_debug_print_unlock ();
2372  out:
2373   ;
2374 }
2375
2376 /* ---------------------------------------------------------------------------------------------------- */
2377
2378 gboolean
2379 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
2380                                      GValue                *return_accu,
2381                                      const GValue          *handler_return,
2382                                      gpointer               dummy)
2383 {
2384   gboolean continue_emission;
2385   gboolean signal_return;
2386
2387   signal_return = g_value_get_boolean (handler_return);
2388   g_value_set_boolean (return_accu, signal_return);
2389   continue_emission = signal_return;
2390
2391   return continue_emission;
2392 }