GSocketControlMessage: clean up confusing code
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: David Zeuthen <davidz@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <stdlib.h>
26 #include <string.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30
31 #include "giotypes.h"
32 #include "gsocket.h"
33 #include "gdbusprivate.h"
34 #include "gdbusmessage.h"
35 #include "gdbuserror.h"
36 #include "gdbusintrospection.h"
37 #include "gasyncresult.h"
38 #include "gsimpleasyncresult.h"
39 #include "ginputstream.h"
40 #include "gmemoryinputstream.h"
41 #include "giostream.h"
42 #include "gsocketcontrolmessage.h"
43 #include "gsocketconnection.h"
44 #include "gsocketoutputstream.h"
45
46 #ifdef G_OS_UNIX
47 #include "gunixfdmessage.h"
48 #include "gunixconnection.h"
49 #include "gunixcredentialsmessage.h"
50 #endif
51
52 #ifdef G_OS_WIN32
53 #include <windows.h>
54 #endif
55
56 #include "glibintl.h"
57
58 /* ---------------------------------------------------------------------------------------------------- */
59
60 gchar *
61 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
62 {
63  guint n, m;
64  GString *ret;
65
66  ret = g_string_new (NULL);
67
68  for (n = 0; n < len; n += 16)
69    {
70      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
71
72      for (m = n; m < n + 16; m++)
73        {
74          if (m > n && (m%4) == 0)
75            g_string_append_c (ret, ' ');
76          if (m < len)
77            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
78          else
79            g_string_append (ret, "   ");
80        }
81
82      g_string_append (ret, "   ");
83
84      for (m = n; m < len && m < n + 16; m++)
85        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
86
87      g_string_append_c (ret, '\n');
88    }
89
90  return g_string_free (ret, FALSE);
91 }
92
93 /* ---------------------------------------------------------------------------------------------------- */
94
95 /* Unfortunately ancillary messages are discarded when reading from a
96  * socket using the GSocketInputStream abstraction. So we provide a
97  * very GInputStream-ish API that uses GSocket in this case (very
98  * similar to GSocketInputStream).
99  */
100
101 typedef struct
102 {
103   GSocket *socket;
104   GCancellable *cancellable;
105
106   void *buffer;
107   gsize count;
108
109   GSocketControlMessage ***messages;
110   gint *num_messages;
111
112   GSimpleAsyncResult *simple;
113
114   gboolean from_mainloop;
115 } ReadWithControlData;
116
117 static void
118 read_with_control_data_free (ReadWithControlData *data)
119 {
120   g_object_unref (data->socket);
121   if (data->cancellable != NULL)
122     g_object_unref (data->cancellable);
123   g_object_unref (data->simple);
124   g_free (data);
125 }
126
127 static gboolean
128 _g_socket_read_with_control_messages_ready (GSocket      *socket,
129                                             GIOCondition  condition,
130                                             gpointer      user_data)
131 {
132   ReadWithControlData *data = user_data;
133   GError *error;
134   gssize result;
135   GInputVector vector;
136
137   error = NULL;
138   vector.buffer = data->buffer;
139   vector.size = data->count;
140   result = g_socket_receive_message (data->socket,
141                                      NULL, /* address */
142                                      &vector,
143                                      1,
144                                      data->messages,
145                                      data->num_messages,
146                                      NULL,
147                                      data->cancellable,
148                                      &error);
149   if (result >= 0)
150     {
151       g_simple_async_result_set_op_res_gssize (data->simple, result);
152     }
153   else
154     {
155       g_assert (error != NULL);
156       g_simple_async_result_set_from_error (data->simple, error);
157       g_error_free (error);
158     }
159
160   if (data->from_mainloop)
161     g_simple_async_result_complete (data->simple);
162   else
163     g_simple_async_result_complete_in_idle (data->simple);
164
165   return FALSE;
166 }
167
168 static void
169 _g_socket_read_with_control_messages (GSocket                 *socket,
170                                       void                    *buffer,
171                                       gsize                    count,
172                                       GSocketControlMessage ***messages,
173                                       gint                    *num_messages,
174                                       gint                     io_priority,
175                                       GCancellable            *cancellable,
176                                       GAsyncReadyCallback      callback,
177                                       gpointer                 user_data)
178 {
179   ReadWithControlData *data;
180
181   data = g_new0 (ReadWithControlData, 1);
182   data->socket = g_object_ref (socket);
183   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
184   data->buffer = buffer;
185   data->count = count;
186   data->messages = messages;
187   data->num_messages = num_messages;
188
189   data->simple = g_simple_async_result_new (G_OBJECT (socket),
190                                             callback,
191                                             user_data,
192                                             _g_socket_read_with_control_messages);
193
194   if (!g_socket_condition_check (socket, G_IO_IN))
195     {
196       GSource *source;
197       data->from_mainloop = TRUE;
198       source = g_socket_create_source (data->socket,
199                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
200                                        cancellable);
201       g_source_set_callback (source,
202                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
203                              data,
204                              (GDestroyNotify) read_with_control_data_free);
205       g_source_attach (source, g_main_context_get_thread_default ());
206       g_source_unref (source);
207     }
208   else
209     {
210       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
211       read_with_control_data_free (data);
212     }
213 }
214
215 static gssize
216 _g_socket_read_with_control_messages_finish (GSocket       *socket,
217                                              GAsyncResult  *result,
218                                              GError       **error)
219 {
220   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
221
222   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
223   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
224
225   if (g_simple_async_result_propagate_error (simple, error))
226       return -1;
227   else
228     return g_simple_async_result_get_op_res_gssize (simple);
229 }
230
231 /* ---------------------------------------------------------------------------------------------------- */
232
233 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
234
235 static GPtrArray *ensured_classes = NULL;
236
237 static void
238 ensure_type (GType gtype)
239 {
240   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
241 }
242
243 static void
244 released_required_types (void)
245 {
246   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
247   g_ptr_array_unref (ensured_classes);
248   ensured_classes = NULL;
249 }
250
251 static void
252 ensure_required_types (void)
253 {
254   g_assert (ensured_classes == NULL);
255   ensured_classes = g_ptr_array_new ();
256   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
257   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
258 }
259 /* ---------------------------------------------------------------------------------------------------- */
260
261 G_LOCK_DEFINE_STATIC (shared_thread_lock);
262
263 typedef struct
264 {
265   gint num_users;
266   GThread *thread;
267   GMainContext *context;
268   GMainLoop *loop;
269 } SharedThreadData;
270
271 static SharedThreadData *shared_thread_data = NULL;
272
273 static gpointer
274 gdbus_shared_thread_func (gpointer data)
275 {
276   g_main_context_push_thread_default (shared_thread_data->context);
277   g_main_loop_run (shared_thread_data->loop);
278   g_main_context_pop_thread_default (shared_thread_data->context);
279   return NULL;
280 }
281
282 typedef void (*GDBusSharedThreadFunc) (gpointer user_data);
283
284 typedef struct
285 {
286   GDBusSharedThreadFunc func;
287   gpointer              user_data;
288   gboolean              done;
289 } CallerData;
290
291 static gboolean
292 invoke_caller (gpointer user_data)
293 {
294   CallerData *data = user_data;
295   data->func (data->user_data);
296   data->done = TRUE;
297   return FALSE;
298 }
299
300 /* ---------------------------------------------------------------------------------------------------- */
301
302 static void
303 _g_dbus_shared_thread_ref (GDBusSharedThreadFunc func,
304                            gpointer              user_data)
305 {
306   GError *error;
307   GSource *idle_source;
308   CallerData *data;
309   gboolean release_types;
310
311   G_LOCK (shared_thread_lock);
312
313   release_types = FALSE;
314
315   if (shared_thread_data != NULL)
316     {
317       shared_thread_data->num_users += 1;
318       goto have_thread;
319     }
320
321   shared_thread_data = g_new0 (SharedThreadData, 1);
322   shared_thread_data->num_users = 1;
323
324   /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
325   ensure_required_types ();
326   release_types = TRUE;
327
328   error = NULL;
329   shared_thread_data->context = g_main_context_new ();
330   shared_thread_data->loop = g_main_loop_new (shared_thread_data->context, FALSE);
331   shared_thread_data->thread = g_thread_create (gdbus_shared_thread_func,
332                                                 NULL,
333                                                 TRUE,
334                                                 &error);
335   g_assert_no_error (error);
336
337  have_thread:
338
339   data = g_new0 (CallerData, 1);
340   data->func = func;
341   data->user_data = user_data;
342   data->done = FALSE;
343
344   idle_source = g_idle_source_new ();
345   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
346   g_source_set_callback (idle_source,
347                          invoke_caller,
348                          data,
349                          NULL);
350   g_source_attach (idle_source, shared_thread_data->context);
351   g_source_unref (idle_source);
352
353   /* wait for the user code to run.. hmm.. probably use a condition variable instead */
354   while (!data->done)
355     g_thread_yield ();
356
357   if (release_types)
358     released_required_types ();
359
360   g_free (data);
361
362   G_UNLOCK (shared_thread_lock);
363 }
364
365 static void
366 _g_dbus_shared_thread_unref (void)
367 {
368   /* TODO: actually destroy the shared thread here */
369 #if 0
370   G_LOCK (shared_thread_lock);
371   g_assert (shared_thread_data != NULL);
372   shared_thread_data->num_users -= 1;
373   if (shared_thread_data->num_users == 0)
374     {
375       g_main_loop_quit (shared_thread_data->loop);
376       //g_thread_join (shared_thread_data->thread);
377       g_main_loop_unref (shared_thread_data->loop);
378       g_main_context_unref (shared_thread_data->context);
379       g_free (shared_thread_data);
380       shared_thread_data = NULL;
381       G_UNLOCK (shared_thread_lock);
382     }
383   else
384     {
385       G_UNLOCK (shared_thread_lock);
386     }
387 #endif
388 }
389
390 /* ---------------------------------------------------------------------------------------------------- */
391
392 struct GDBusWorker
393 {
394   volatile gint                       ref_count;
395
396   gboolean                            stopped;
397
398   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
399    * only affects messages received from the other peer (since GDBusServer is the
400    * only user) - we might want it to affect messages sent to the other peer too?
401    */
402   gboolean                            frozen;
403   GQueue                             *received_messages_while_frozen;
404
405   GIOStream                          *stream;
406   GDBusCapabilityFlags                capabilities;
407   GCancellable                       *cancellable;
408   GDBusWorkerMessageReceivedCallback  message_received_callback;
409   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
410   GDBusWorkerDisconnectedCallback     disconnected_callback;
411   gpointer                            user_data;
412
413   GThread                            *thread;
414
415   /* if not NULL, stream is GSocketConnection */
416   GSocket *socket;
417
418   /* used for reading */
419   GMutex                             *read_lock;
420   gchar                              *read_buffer;
421   gsize                               read_buffer_allocated_size;
422   gsize                               read_buffer_cur_size;
423   gsize                               read_buffer_bytes_wanted;
424   GUnixFDList                        *read_fd_list;
425   GSocketControlMessage             **read_ancillary_messages;
426   gint                                read_num_ancillary_messages;
427
428   /* used for writing */
429   GMutex                             *write_lock;
430   GQueue                             *write_queue;
431   gint                                num_writes_pending;
432   guint64                             write_num_messages_written;
433   GList                              *write_pending_flushes;
434 };
435
436 /* ---------------------------------------------------------------------------------------------------- */
437
438 typedef struct
439 {
440   GMutex *mutex;
441   GCond *cond;
442   guint64 number_to_wait_for;
443   GError *error;
444 } FlushData;
445
446 struct _MessageToWriteData ;
447 typedef struct _MessageToWriteData MessageToWriteData;
448
449 static void message_to_write_data_free (MessageToWriteData *data);
450
451 static void read_message_print_transport_debug (gssize bytes_read,
452                                                 GDBusWorker *worker);
453
454 static void write_message_print_transport_debug (gssize bytes_written,
455                                                  MessageToWriteData *data);
456
457 /* ---------------------------------------------------------------------------------------------------- */
458
459 static GDBusWorker *
460 _g_dbus_worker_ref (GDBusWorker *worker)
461 {
462   g_atomic_int_inc (&worker->ref_count);
463   return worker;
464 }
465
466 static void
467 _g_dbus_worker_unref (GDBusWorker *worker)
468 {
469   if (g_atomic_int_dec_and_test (&worker->ref_count))
470     {
471       g_assert (worker->write_pending_flushes == NULL);
472
473       _g_dbus_shared_thread_unref ();
474
475       g_object_unref (worker->stream);
476
477       g_mutex_free (worker->read_lock);
478       g_object_unref (worker->cancellable);
479       if (worker->read_fd_list != NULL)
480         g_object_unref (worker->read_fd_list);
481
482       g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
483       g_queue_free (worker->received_messages_while_frozen);
484
485       g_mutex_free (worker->write_lock);
486       g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
487       g_queue_free (worker->write_queue);
488
489       g_free (worker->read_buffer);
490
491       g_free (worker);
492     }
493 }
494
495 static void
496 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
497                                   gboolean      remote_peer_vanished,
498                                   GError       *error)
499 {
500   if (!worker->stopped)
501     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
502 }
503
504 static void
505 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
506                                       GDBusMessage *message)
507 {
508   if (!worker->stopped)
509     worker->message_received_callback (worker, message, worker->user_data);
510 }
511
512 static GDBusMessage *
513 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
514                                               GDBusMessage *message)
515 {
516   GDBusMessage *ret;
517   if (!worker->stopped)
518     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
519   else
520     ret = message;
521   return ret;
522 }
523
524 /* can only be called from private thread with read-lock held - takes ownership of @message */
525 static void
526 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
527                                                   GDBusMessage *message)
528 {
529   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
530     {
531       /* queue up */
532       g_queue_push_tail (worker->received_messages_while_frozen, message);
533     }
534   else
535     {
536       /* not frozen, nor anything in queue */
537       _g_dbus_worker_emit_message_received (worker, message);
538       g_object_unref (message);
539     }
540 }
541
542 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
543 static gboolean
544 unfreeze_in_idle_cb (gpointer user_data)
545 {
546   GDBusWorker *worker = user_data;
547   GDBusMessage *message;
548
549   g_mutex_lock (worker->read_lock);
550   if (worker->frozen)
551     {
552       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
553         {
554           _g_dbus_worker_emit_message_received (worker, message);
555           g_object_unref (message);
556         }
557       worker->frozen = FALSE;
558     }
559   else
560     {
561       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
562     }
563   g_mutex_unlock (worker->read_lock);
564   return FALSE;
565 }
566
567 /* can be called from any thread */
568 void
569 _g_dbus_worker_unfreeze (GDBusWorker *worker)
570 {
571   GSource *idle_source;
572   idle_source = g_idle_source_new ();
573   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
574   g_source_set_callback (idle_source,
575                          unfreeze_in_idle_cb,
576                          _g_dbus_worker_ref (worker),
577                          (GDestroyNotify) _g_dbus_worker_unref);
578   g_source_attach (idle_source, shared_thread_data->context);
579   g_source_unref (idle_source);
580 }
581
582 /* ---------------------------------------------------------------------------------------------------- */
583
584 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
585
586 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
587 static void
588 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
589                            GAsyncResult  *res,
590                            gpointer       user_data)
591 {
592   GDBusWorker *worker = user_data;
593   GError *error;
594   gssize bytes_read;
595
596   g_mutex_lock (worker->read_lock);
597
598   /* If already stopped, don't even process the reply */
599   if (worker->stopped)
600     goto out;
601
602   error = NULL;
603   if (worker->socket == NULL)
604     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
605                                              res,
606                                              &error);
607   else
608     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
609                                                               res,
610                                                               &error);
611   if (worker->read_num_ancillary_messages > 0)
612     {
613       gint n;
614       for (n = 0; n < worker->read_num_ancillary_messages; n++)
615         {
616           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
617
618           if (FALSE)
619             {
620             }
621 #ifdef G_OS_UNIX
622           else if (G_IS_UNIX_FD_MESSAGE (control_message))
623             {
624               GUnixFDMessage *fd_message;
625               gint *fds;
626               gint num_fds;
627
628               fd_message = G_UNIX_FD_MESSAGE (control_message);
629               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
630               if (worker->read_fd_list == NULL)
631                 {
632                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
633                 }
634               else
635                 {
636                   gint n;
637                   for (n = 0; n < num_fds; n++)
638                     {
639                       /* TODO: really want a append_steal() */
640                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
641                       close (fds[n]);
642                     }
643                 }
644               g_free (fds);
645             }
646           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
647             {
648               /* do nothing */
649             }
650 #endif
651           else
652             {
653               if (error == NULL)
654                 {
655                   g_set_error (&error,
656                                G_IO_ERROR,
657                                G_IO_ERROR_FAILED,
658                                "Unexpected ancillary message of type %s received from peer",
659                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
660                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
661                   g_error_free (error);
662                   g_object_unref (control_message);
663                   n++;
664                   while (n < worker->read_num_ancillary_messages)
665                     g_object_unref (worker->read_ancillary_messages[n++]);
666                   g_free (worker->read_ancillary_messages);
667                   goto out;
668                 }
669             }
670           g_object_unref (control_message);
671         }
672       g_free (worker->read_ancillary_messages);
673     }
674
675   if (bytes_read == -1)
676     {
677       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
678       g_error_free (error);
679       goto out;
680     }
681
682 #if 0
683   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
684            (gint) bytes_read,
685            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
686            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
687            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
688                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
689            worker->stream,
690            worker);
691 #endif
692
693   /* TODO: hmm, hmm... */
694   if (bytes_read == 0)
695     {
696       g_set_error (&error,
697                    G_IO_ERROR,
698                    G_IO_ERROR_FAILED,
699                    "Underlying GIOStream returned 0 bytes on an async read");
700       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
701       g_error_free (error);
702       goto out;
703     }
704
705   read_message_print_transport_debug (bytes_read, worker);
706
707   worker->read_buffer_cur_size += bytes_read;
708   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
709     {
710       /* OK, got what we asked for! */
711       if (worker->read_buffer_bytes_wanted == 16)
712         {
713           gssize message_len;
714           /* OK, got the header - determine how many more bytes are needed */
715           error = NULL;
716           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
717                                                      16,
718                                                      &error);
719           if (message_len == -1)
720             {
721               g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
722               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
723               g_error_free (error);
724               goto out;
725             }
726
727           worker->read_buffer_bytes_wanted = message_len;
728           _g_dbus_worker_do_read_unlocked (worker);
729         }
730       else
731         {
732           GDBusMessage *message;
733           error = NULL;
734
735           /* TODO: use connection->priv->auth to decode the message */
736
737           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
738                                                   worker->read_buffer_cur_size,
739                                                   worker->capabilities,
740                                                   &error);
741           if (message == NULL)
742             {
743               gchar *s;
744               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
745               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
746                          "The error is: %s\n"
747                          "The payload is as follows:\n"
748                          "%s\n",
749                          worker->read_buffer_cur_size,
750                          error->message,
751                          s);
752               g_free (s);
753               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
754               g_error_free (error);
755               goto out;
756             }
757
758 #ifdef G_OS_UNIX
759           if (worker->read_fd_list != NULL)
760             {
761               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
762               g_object_unref (worker->read_fd_list);
763               worker->read_fd_list = NULL;
764             }
765 #endif
766
767           if (G_UNLIKELY (_g_dbus_debug_message ()))
768             {
769               gchar *s;
770               _g_dbus_debug_print_lock ();
771               g_print ("========================================================================\n"
772                        "GDBus-debug:Message:\n"
773                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
774                        worker->read_buffer_cur_size);
775               s = g_dbus_message_print (message, 2);
776               g_print ("%s", s);
777               g_free (s);
778               if (G_UNLIKELY (_g_dbus_debug_payload ()))
779                 {
780                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
781                   g_print ("%s\n", s);
782                   g_free (s);
783                 }
784               _g_dbus_debug_print_unlock ();
785             }
786
787           /* yay, got a message, go deliver it */
788           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
789
790           /* start reading another message! */
791           worker->read_buffer_bytes_wanted = 0;
792           worker->read_buffer_cur_size = 0;
793           _g_dbus_worker_do_read_unlocked (worker);
794         }
795     }
796   else
797     {
798       /* didn't get all the bytes we requested - so repeat the request... */
799       _g_dbus_worker_do_read_unlocked (worker);
800     }
801
802  out:
803   g_mutex_unlock (worker->read_lock);
804
805   /* gives up the reference acquired when calling g_input_stream_read_async() */
806   _g_dbus_worker_unref (worker);
807 }
808
809 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
810 static void
811 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
812 {
813   /* if bytes_wanted is zero, it means start reading a message */
814   if (worker->read_buffer_bytes_wanted == 0)
815     {
816       worker->read_buffer_cur_size = 0;
817       worker->read_buffer_bytes_wanted = 16;
818     }
819
820   /* ensure we have a (big enough) buffer */
821   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
822     {
823       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
824       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
825       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
826     }
827
828   if (worker->socket == NULL)
829     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
830                                worker->read_buffer + worker->read_buffer_cur_size,
831                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
832                                G_PRIORITY_DEFAULT,
833                                worker->cancellable,
834                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
835                                _g_dbus_worker_ref (worker));
836   else
837     {
838       worker->read_ancillary_messages = NULL;
839       worker->read_num_ancillary_messages = 0;
840       _g_socket_read_with_control_messages (worker->socket,
841                                             worker->read_buffer + worker->read_buffer_cur_size,
842                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
843                                             &worker->read_ancillary_messages,
844                                             &worker->read_num_ancillary_messages,
845                                             G_PRIORITY_DEFAULT,
846                                             worker->cancellable,
847                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
848                                             _g_dbus_worker_ref (worker));
849     }
850 }
851
852 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
853 static void
854 _g_dbus_worker_do_read (GDBusWorker *worker)
855 {
856   g_mutex_lock (worker->read_lock);
857   _g_dbus_worker_do_read_unlocked (worker);
858   g_mutex_unlock (worker->read_lock);
859 }
860
861 /* ---------------------------------------------------------------------------------------------------- */
862
863 struct _MessageToWriteData
864 {
865   GDBusWorker  *worker;
866   GDBusMessage *message;
867   gchar        *blob;
868   gsize         blob_size;
869
870   gsize               total_written;
871   GSimpleAsyncResult *simple;
872
873 };
874
875 static void
876 message_to_write_data_free (MessageToWriteData *data)
877 {
878   _g_dbus_worker_unref (data->worker);
879   g_object_unref (data->message);
880   g_free (data->blob);
881   g_free (data);
882 }
883
884 /* ---------------------------------------------------------------------------------------------------- */
885
886 static void write_message_continue_writing (MessageToWriteData *data);
887
888 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
889 static void
890 write_message_async_cb (GObject      *source_object,
891                         GAsyncResult *res,
892                         gpointer      user_data)
893 {
894   MessageToWriteData *data = user_data;
895   GSimpleAsyncResult *simple;
896   gssize bytes_written;
897   GError *error;
898
899   /* Note: we can't access data->simple after calling g_async_result_complete () because the
900    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
901    */
902   simple = data->simple;
903
904   error = NULL;
905   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
906                                                 res,
907                                                 &error);
908   if (bytes_written == -1)
909     {
910       g_simple_async_result_set_from_error (simple, error);
911       g_error_free (error);
912       g_simple_async_result_complete (simple);
913       g_object_unref (simple);
914       goto out;
915     }
916   g_assert (bytes_written > 0); /* zero is never returned */
917
918   write_message_print_transport_debug (bytes_written, data);
919
920   data->total_written += bytes_written;
921   g_assert (data->total_written <= data->blob_size);
922   if (data->total_written == data->blob_size)
923     {
924       g_simple_async_result_complete (simple);
925       g_object_unref (simple);
926       goto out;
927     }
928
929   write_message_continue_writing (data);
930
931  out:
932   ;
933 }
934
935 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
936 static gboolean
937 on_socket_ready (GSocket      *socket,
938                  GIOCondition  condition,
939                  gpointer      user_data)
940 {
941   MessageToWriteData *data = user_data;
942   write_message_continue_writing (data);
943   return FALSE; /* remove source */
944 }
945
946 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
947 static void
948 write_message_continue_writing (MessageToWriteData *data)
949 {
950   GOutputStream *ostream;
951   GSimpleAsyncResult *simple;
952 #ifdef G_OS_UNIX
953   GUnixFDList *fd_list;
954 #endif
955
956   /* Note: we can't access data->simple after calling g_async_result_complete () because the
957    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
958    */
959   simple = data->simple;
960
961   ostream = g_io_stream_get_output_stream (data->worker->stream);
962 #ifdef G_OS_UNIX
963   fd_list = g_dbus_message_get_unix_fd_list (data->message);
964 #endif
965
966   g_assert (!g_output_stream_has_pending (ostream));
967   g_assert_cmpint (data->total_written, <, data->blob_size);
968
969   if (FALSE)
970     {
971     }
972 #ifdef G_OS_UNIX
973   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
974     {
975       GOutputVector vector;
976       GSocketControlMessage *control_message;
977       gssize bytes_written;
978       GError *error;
979
980       vector.buffer = data->blob;
981       vector.size = data->blob_size;
982
983       control_message = NULL;
984       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
985         {
986           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
987             {
988               g_simple_async_result_set_error (simple,
989                                                G_IO_ERROR,
990                                                G_IO_ERROR_FAILED,
991                                                "Tried sending a file descriptor but remote peer does not support this capability");
992               g_simple_async_result_complete (simple);
993               g_object_unref (simple);
994               goto out;
995             }
996           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
997         }
998
999       error = NULL;
1000       bytes_written = g_socket_send_message (data->worker->socket,
1001                                              NULL, /* address */
1002                                              &vector,
1003                                              1,
1004                                              control_message != NULL ? &control_message : NULL,
1005                                              control_message != NULL ? 1 : 0,
1006                                              G_SOCKET_MSG_NONE,
1007                                              data->worker->cancellable,
1008                                              &error);
1009       if (control_message != NULL)
1010         g_object_unref (control_message);
1011
1012       if (bytes_written == -1)
1013         {
1014           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1015           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1016             {
1017               GSource *source;
1018               source = g_socket_create_source (data->worker->socket,
1019                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
1020                                                data->worker->cancellable);
1021               g_source_set_callback (source,
1022                                      (GSourceFunc) on_socket_ready,
1023                                      data,
1024                                      NULL); /* GDestroyNotify */
1025               g_source_attach (source, g_main_context_get_thread_default ());
1026               g_source_unref (source);
1027               g_error_free (error);
1028               goto out;
1029             }
1030           g_simple_async_result_set_from_error (simple, error);
1031           g_error_free (error);
1032           g_simple_async_result_complete (simple);
1033           g_object_unref (simple);
1034           goto out;
1035         }
1036       g_assert (bytes_written > 0); /* zero is never returned */
1037
1038       write_message_print_transport_debug (bytes_written, data);
1039
1040       data->total_written += bytes_written;
1041       g_assert (data->total_written <= data->blob_size);
1042       if (data->total_written == data->blob_size)
1043         {
1044           g_simple_async_result_complete (simple);
1045           g_object_unref (simple);
1046           goto out;
1047         }
1048
1049       write_message_continue_writing (data);
1050     }
1051 #endif
1052   else
1053     {
1054 #ifdef G_OS_UNIX
1055       if (fd_list != NULL)
1056         {
1057           g_simple_async_result_set_error (simple,
1058                                            G_IO_ERROR,
1059                                            G_IO_ERROR_FAILED,
1060                                            "Tried sending a file descriptor on unsupported stream of type %s",
1061                                            g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1062           g_simple_async_result_complete (simple);
1063           g_object_unref (simple);
1064           goto out;
1065         }
1066 #endif
1067
1068       g_output_stream_write_async (ostream,
1069                                    (const gchar *) data->blob + data->total_written,
1070                                    data->blob_size - data->total_written,
1071                                    G_PRIORITY_DEFAULT,
1072                                    data->worker->cancellable,
1073                                    write_message_async_cb,
1074                                    data);
1075     }
1076  out:
1077   ;
1078 }
1079
1080 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1081 static void
1082 write_message_async (GDBusWorker         *worker,
1083                      MessageToWriteData  *data,
1084                      GAsyncReadyCallback  callback,
1085                      gpointer             user_data)
1086 {
1087   data->simple = g_simple_async_result_new (NULL,
1088                                             callback,
1089                                             user_data,
1090                                             write_message_async);
1091   data->total_written = 0;
1092   write_message_continue_writing (data);
1093 }
1094
1095 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1096 static gboolean
1097 write_message_finish (GAsyncResult   *res,
1098                       GError        **error)
1099 {
1100   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1101   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1102     return FALSE;
1103   else
1104     return TRUE;
1105 }
1106 /* ---------------------------------------------------------------------------------------------------- */
1107
1108 static void maybe_write_next_message (GDBusWorker *worker);
1109
1110 typedef struct
1111 {
1112   GDBusWorker *worker;
1113   GList *flushers;
1114 } FlushAsyncData;
1115
1116 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1117 static void
1118 ostream_flush_cb (GObject      *source_object,
1119                   GAsyncResult *res,
1120                   gpointer      user_data)
1121 {
1122   FlushAsyncData *data = user_data;
1123   GError *error;
1124   GList *l;
1125
1126   error = NULL;
1127   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1128                                 res,
1129                                 &error);
1130
1131   if (error == NULL)
1132     {
1133       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1134         {
1135           _g_dbus_debug_print_lock ();
1136           g_print ("========================================================================\n"
1137                    "GDBus-debug:Transport:\n"
1138                    "  ---- FLUSHED stream of type %s\n",
1139                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1140           _g_dbus_debug_print_unlock ();
1141         }
1142     }
1143
1144   g_assert (data->flushers != NULL);
1145   for (l = data->flushers; l != NULL; l = l->next)
1146     {
1147       FlushData *f = l->data;
1148
1149       f->error = error != NULL ? g_error_copy (error) : NULL;
1150
1151       g_mutex_lock (f->mutex);
1152       g_cond_signal (f->cond);
1153       g_mutex_unlock (f->mutex);
1154     }
1155   g_list_free (data->flushers);
1156
1157   if (error != NULL)
1158     g_error_free (error);
1159
1160   /* OK, cool, finally kick off the next write */
1161   maybe_write_next_message (data->worker);
1162
1163   _g_dbus_worker_unref (data->worker);
1164   g_free (data);
1165 }
1166
1167 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1168 static void
1169 message_written (GDBusWorker *worker,
1170                  MessageToWriteData *message_data)
1171 {
1172   GList *l;
1173   GList *ll;
1174   GList *flushers;
1175
1176   /* first log the fact that we wrote a message */
1177   if (G_UNLIKELY (_g_dbus_debug_message ()))
1178     {
1179       gchar *s;
1180       _g_dbus_debug_print_lock ();
1181       g_print ("========================================================================\n"
1182                "GDBus-debug:Message:\n"
1183                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1184                message_data->blob_size);
1185       s = g_dbus_message_print (message_data->message, 2);
1186       g_print ("%s", s);
1187       g_free (s);
1188       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1189         {
1190           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1191           g_print ("%s\n", s);
1192           g_free (s);
1193         }
1194       _g_dbus_debug_print_unlock ();
1195     }
1196
1197   /* then first wake up pending flushes and, if needed, flush the stream */
1198   flushers = NULL;
1199   g_mutex_lock (worker->write_lock);
1200   worker->write_num_messages_written += 1;
1201   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1202     {
1203       FlushData *f = l->data;
1204       ll = l->next;
1205
1206       if (f->number_to_wait_for == worker->write_num_messages_written)
1207         {
1208           flushers = g_list_append (flushers, f);
1209           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1210         }
1211     }
1212   g_mutex_unlock (worker->write_lock);
1213
1214   if (flushers != NULL)
1215     {
1216       FlushAsyncData *data;
1217       data = g_new0 (FlushAsyncData, 1);
1218       data->worker = _g_dbus_worker_ref (worker);
1219       data->flushers = flushers;
1220       /* flush the stream before writing the next message */
1221       g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
1222                                    G_PRIORITY_DEFAULT,
1223                                    worker->cancellable,
1224                                    ostream_flush_cb,
1225                                    data);
1226     }
1227   else
1228     {
1229       /* kick off the next write! */
1230       maybe_write_next_message (worker);
1231     }
1232 }
1233
1234 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1235 static void
1236 write_message_cb (GObject       *source_object,
1237                   GAsyncResult  *res,
1238                   gpointer       user_data)
1239 {
1240   MessageToWriteData *data = user_data;
1241   GError *error;
1242
1243   g_mutex_lock (data->worker->write_lock);
1244   data->worker->num_writes_pending -= 1;
1245   g_mutex_unlock (data->worker->write_lock);
1246
1247   error = NULL;
1248   if (!write_message_finish (res, &error))
1249     {
1250       /* TODO: handle */
1251       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1252       g_error_free (error);
1253     }
1254
1255   /* this function will also kick of the next write (it might need to
1256    * flush so writing the next message might happen much later
1257    * e.g. async)
1258    */
1259   message_written (data->worker, data);
1260
1261   message_to_write_data_free (data);
1262 }
1263
1264 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1265 static void
1266 maybe_write_next_message (GDBusWorker *worker)
1267 {
1268   MessageToWriteData *data;
1269
1270  write_next:
1271
1272   g_mutex_lock (worker->write_lock);
1273   data = g_queue_pop_head (worker->write_queue);
1274   if (data != NULL)
1275     worker->num_writes_pending += 1;
1276   g_mutex_unlock (worker->write_lock);
1277
1278   /* Note that write_lock is only used for protecting the @write_queue
1279    * and @num_writes_pending fields of the GDBusWorker struct ... which we
1280    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1281    *
1282    * Therefore, it's fine to drop it here when calling back into user
1283    * code and then writing the message out onto the GIOStream since this
1284    * function only runs on the worker thread.
1285    */
1286   if (data != NULL)
1287     {
1288       GDBusMessage *old_message;
1289       guchar *new_blob;
1290       gsize new_blob_size;
1291       GError *error;
1292
1293       old_message = data->message;
1294       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1295       if (data->message == old_message)
1296         {
1297           /* filters had no effect - do nothing */
1298         }
1299       else if (data->message == NULL)
1300         {
1301           /* filters dropped message */
1302           g_mutex_lock (worker->write_lock);
1303           worker->num_writes_pending -= 1;
1304           g_mutex_unlock (worker->write_lock);
1305           message_to_write_data_free (data);
1306           goto write_next;
1307         }
1308       else
1309         {
1310           /* filters altered the message -> reencode */
1311           error = NULL;
1312           new_blob = g_dbus_message_to_blob (data->message,
1313                                              &new_blob_size,
1314                                              worker->capabilities,
1315                                              &error);
1316           if (new_blob == NULL)
1317             {
1318               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1319                * the old message instead
1320                */
1321               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1322                          g_dbus_message_get_serial (data->message),
1323                          error->message);
1324               g_error_free (error);
1325             }
1326           else
1327             {
1328               g_free (data->blob);
1329               data->blob = (gchar *) new_blob;
1330               data->blob_size = new_blob_size;
1331             }
1332         }
1333
1334       write_message_async (worker,
1335                            data,
1336                            write_message_cb,
1337                            data);
1338     }
1339 }
1340
1341 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1342 static gboolean
1343 write_message_in_idle_cb (gpointer user_data)
1344 {
1345   GDBusWorker *worker = user_data;
1346   if (worker->num_writes_pending == 0)
1347     maybe_write_next_message (worker);
1348   return FALSE;
1349 }
1350
1351 /* ---------------------------------------------------------------------------------------------------- */
1352
1353 /* can be called from any thread - steals blob */
1354 void
1355 _g_dbus_worker_send_message (GDBusWorker    *worker,
1356                              GDBusMessage   *message,
1357                              gchar          *blob,
1358                              gsize           blob_len)
1359 {
1360   MessageToWriteData *data;
1361
1362   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1363   g_return_if_fail (blob != NULL);
1364   g_return_if_fail (blob_len > 16);
1365
1366   data = g_new0 (MessageToWriteData, 1);
1367   data->worker = _g_dbus_worker_ref (worker);
1368   data->message = g_object_ref (message);
1369   data->blob = blob; /* steal! */
1370   data->blob_size = blob_len;
1371
1372   g_mutex_lock (worker->write_lock);
1373   g_queue_push_tail (worker->write_queue, data);
1374   if (worker->num_writes_pending == 0)
1375     {
1376       GSource *idle_source;
1377       idle_source = g_idle_source_new ();
1378       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1379       g_source_set_callback (idle_source,
1380                              write_message_in_idle_cb,
1381                              _g_dbus_worker_ref (worker),
1382                              (GDestroyNotify) _g_dbus_worker_unref);
1383       g_source_attach (idle_source, shared_thread_data->context);
1384       g_source_unref (idle_source);
1385     }
1386   g_mutex_unlock (worker->write_lock);
1387 }
1388
1389 /* ---------------------------------------------------------------------------------------------------- */
1390
1391 static void
1392 _g_dbus_worker_thread_begin_func (gpointer user_data)
1393 {
1394   GDBusWorker *worker = user_data;
1395
1396   worker->thread = g_thread_self ();
1397
1398   /* begin reading */
1399   _g_dbus_worker_do_read (worker);
1400 }
1401
1402 GDBusWorker *
1403 _g_dbus_worker_new (GIOStream                              *stream,
1404                     GDBusCapabilityFlags                    capabilities,
1405                     gboolean                                initially_frozen,
1406                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1407                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1408                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1409                     gpointer                                user_data)
1410 {
1411   GDBusWorker *worker;
1412
1413   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1414   g_return_val_if_fail (message_received_callback != NULL, NULL);
1415   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1416   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1417
1418   worker = g_new0 (GDBusWorker, 1);
1419   worker->ref_count = 1;
1420
1421   worker->read_lock = g_mutex_new ();
1422   worker->message_received_callback = message_received_callback;
1423   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1424   worker->disconnected_callback = disconnected_callback;
1425   worker->user_data = user_data;
1426   worker->stream = g_object_ref (stream);
1427   worker->capabilities = capabilities;
1428   worker->cancellable = g_cancellable_new ();
1429
1430   worker->frozen = initially_frozen;
1431   worker->received_messages_while_frozen = g_queue_new ();
1432
1433   worker->write_lock = g_mutex_new ();
1434   worker->write_queue = g_queue_new ();
1435
1436   if (G_IS_SOCKET_CONNECTION (worker->stream))
1437     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1438
1439   _g_dbus_shared_thread_ref (_g_dbus_worker_thread_begin_func, worker);
1440
1441   return worker;
1442 }
1443
1444 /* ---------------------------------------------------------------------------------------------------- */
1445
1446 /* This can be called from any thread - frees worker - guarantees no callbacks
1447  * will ever be issued again
1448  */
1449 void
1450 _g_dbus_worker_stop (GDBusWorker *worker)
1451 {
1452   /* If we're called in the worker thread it means we are called from
1453    * a worker callback. This is fine, we just can't lock in that case since
1454    * we're already holding the lock...
1455    */
1456   if (g_thread_self () != worker->thread)
1457     g_mutex_lock (worker->read_lock);
1458   worker->stopped = TRUE;
1459   if (g_thread_self () != worker->thread)
1460     g_mutex_unlock (worker->read_lock);
1461
1462   g_cancellable_cancel (worker->cancellable);
1463   _g_dbus_worker_unref (worker);
1464 }
1465
1466 /* ---------------------------------------------------------------------------------------------------- */
1467
1468 /* can be called from any thread (except the worker thread) - blocks
1469  * calling thread until all queued outgoing messages are written and
1470  * the transport has been flushed
1471  */
1472 gboolean
1473 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1474                            GCancellable   *cancellable,
1475                            GError        **error)
1476 {
1477   gboolean ret;
1478   FlushData *data;
1479
1480   data = NULL;
1481   ret = TRUE;
1482
1483   /* if the queue is empty, there's nothing to wait for */
1484   g_mutex_lock (worker->write_lock);
1485   if (g_queue_get_length (worker->write_queue) > 0)
1486     {
1487       data = g_new0 (FlushData, 1);
1488       data->mutex = g_mutex_new ();
1489       data->cond = g_cond_new ();
1490       data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
1491       g_mutex_lock (data->mutex);
1492       worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
1493     }
1494   g_mutex_unlock (worker->write_lock);
1495
1496   if (data != NULL)
1497     {
1498       g_cond_wait (data->cond, data->mutex);
1499       g_mutex_unlock (data->mutex);
1500
1501       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1502       g_cond_free (data->cond);
1503       g_mutex_free (data->mutex);
1504       if (data->error != NULL)
1505         {
1506           ret = FALSE;
1507           g_propagate_error (error, data->error);
1508         }
1509       g_free (data);
1510     }
1511
1512   return ret;
1513 }
1514
1515 /* ---------------------------------------------------------------------------------------------------- */
1516
1517 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1518 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1519 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1520 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1521 #define G_DBUS_DEBUG_CALL           (1<<4)
1522 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1523 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1524 #define G_DBUS_DEBUG_RETURN         (1<<7)
1525 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1526 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1527
1528 static gint _gdbus_debug_flags = 0;
1529
1530 gboolean
1531 _g_dbus_debug_authentication (void)
1532 {
1533   _g_dbus_initialize ();
1534   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1535 }
1536
1537 gboolean
1538 _g_dbus_debug_transport (void)
1539 {
1540   _g_dbus_initialize ();
1541   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1542 }
1543
1544 gboolean
1545 _g_dbus_debug_message (void)
1546 {
1547   _g_dbus_initialize ();
1548   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1549 }
1550
1551 gboolean
1552 _g_dbus_debug_payload (void)
1553 {
1554   _g_dbus_initialize ();
1555   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1556 }
1557
1558 gboolean
1559 _g_dbus_debug_call (void)
1560 {
1561   _g_dbus_initialize ();
1562   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1563 }
1564
1565 gboolean
1566 _g_dbus_debug_signal (void)
1567 {
1568   _g_dbus_initialize ();
1569   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1570 }
1571
1572 gboolean
1573 _g_dbus_debug_incoming (void)
1574 {
1575   _g_dbus_initialize ();
1576   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1577 }
1578
1579 gboolean
1580 _g_dbus_debug_return (void)
1581 {
1582   _g_dbus_initialize ();
1583   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1584 }
1585
1586 gboolean
1587 _g_dbus_debug_emission (void)
1588 {
1589   _g_dbus_initialize ();
1590   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1591 }
1592
1593 gboolean
1594 _g_dbus_debug_address (void)
1595 {
1596   _g_dbus_initialize ();
1597   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1598 }
1599
1600 G_LOCK_DEFINE_STATIC (print_lock);
1601
1602 void
1603 _g_dbus_debug_print_lock (void)
1604 {
1605   G_LOCK (print_lock);
1606 }
1607
1608 void
1609 _g_dbus_debug_print_unlock (void)
1610 {
1611   G_UNLOCK (print_lock);
1612 }
1613
1614 /*
1615  * _g_dbus_initialize:
1616  *
1617  * Does various one-time init things such as
1618  *
1619  *  - registering the G_DBUS_ERROR error domain
1620  *  - parses the G_DBUS_DEBUG environment variable
1621  */
1622 void
1623 _g_dbus_initialize (void)
1624 {
1625   static volatile gsize initialized = 0;
1626
1627   if (g_once_init_enter (&initialized))
1628     {
1629       volatile GQuark g_dbus_error_domain;
1630       const gchar *debug;
1631
1632       g_dbus_error_domain = G_DBUS_ERROR;
1633
1634       debug = g_getenv ("G_DBUS_DEBUG");
1635       if (debug != NULL)
1636         {
1637           const GDebugKey keys[] = {
1638             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1639             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
1640             { "message",        G_DBUS_DEBUG_MESSAGE        },
1641             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
1642             { "call",           G_DBUS_DEBUG_CALL           },
1643             { "signal",         G_DBUS_DEBUG_SIGNAL         },
1644             { "incoming",       G_DBUS_DEBUG_INCOMING       },
1645             { "return",         G_DBUS_DEBUG_RETURN         },
1646             { "emission",       G_DBUS_DEBUG_EMISSION       },
1647             { "address",        G_DBUS_DEBUG_ADDRESS        }
1648           };
1649
1650           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1651           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1652             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1653         }
1654
1655       g_once_init_leave (&initialized, 1);
1656     }
1657 }
1658
1659 /* ---------------------------------------------------------------------------------------------------- */
1660
1661 GVariantType *
1662 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1663 {
1664   const GVariantType *arg_types[256];
1665   guint n;
1666
1667   if (args)
1668     for (n = 0; args[n] != NULL; n++)
1669       {
1670         /* DBus places a hard limit of 255 on signature length.
1671          * therefore number of args must be less than 256.
1672          */
1673         g_assert (n < 256);
1674
1675         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1676
1677         if G_UNLIKELY (arg_types[n] == NULL)
1678           return NULL;
1679       }
1680   else
1681     n = 0;
1682
1683   return g_variant_type_new_tuple (arg_types, n);
1684 }
1685
1686 /* ---------------------------------------------------------------------------------------------------- */
1687
1688 #ifdef G_OS_WIN32
1689
1690 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1691
1692 gchar *
1693 _g_dbus_win32_get_user_sid (void)
1694 {
1695   HANDLE h;
1696   TOKEN_USER *user;
1697   DWORD token_information_len;
1698   PSID psid;
1699   gchar *sid;
1700   gchar *ret;
1701
1702   ret = NULL;
1703   user = NULL;
1704   h = INVALID_HANDLE_VALUE;
1705
1706   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
1707     {
1708       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
1709       goto out;
1710     }
1711
1712   /* Get length of buffer */
1713   token_information_len = 0;
1714   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
1715     {
1716       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
1717         {
1718           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1719           goto out;
1720         }
1721     }
1722   user = g_malloc (token_information_len);
1723   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
1724     {
1725       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1726       goto out;
1727     }
1728
1729   psid = user->User.Sid;
1730   if (!IsValidSid (psid))
1731     {
1732       g_warning ("Invalid SID");
1733       goto out;
1734     }
1735
1736   if (!ConvertSidToStringSidA (psid, &sid))
1737     {
1738       g_warning ("Invalid SID");
1739       goto out;
1740     }
1741
1742   ret = g_strdup (sid);
1743   LocalFree (sid);
1744
1745 out:
1746   g_free (user);
1747   if (h != INVALID_HANDLE_VALUE)
1748     CloseHandle (h);
1749   return ret;
1750 }
1751 #endif
1752
1753 /* ---------------------------------------------------------------------------------------------------- */
1754
1755 gchar *
1756 _g_dbus_get_machine_id (GError **error)
1757 {
1758   gchar *ret;
1759   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
1760   ret = NULL;
1761   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
1762                             &ret,
1763                             NULL,
1764                             error))
1765     {
1766       g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: "));
1767     }
1768   else
1769     {
1770       /* TODO: validate value */
1771       g_strstrip (ret);
1772     }
1773   return ret;
1774 }
1775
1776 /* ---------------------------------------------------------------------------------------------------- */
1777
1778 gchar *
1779 _g_dbus_enum_to_string (GType enum_type, gint value)
1780 {
1781   gchar *ret;
1782   GEnumClass *klass;
1783   GEnumValue *enum_value;
1784
1785   klass = g_type_class_ref (enum_type);
1786   enum_value = g_enum_get_value (klass, value);
1787   if (enum_value != NULL)
1788     ret = g_strdup (enum_value->value_nick);
1789   else
1790     ret = g_strdup_printf ("unknown (value %d)", value);
1791   g_type_class_unref (klass);
1792   return ret;
1793 }
1794
1795 /* ---------------------------------------------------------------------------------------------------- */
1796
1797 static void
1798 write_message_print_transport_debug (gssize bytes_written,
1799                                      MessageToWriteData *data)
1800 {
1801   if (G_LIKELY (!_g_dbus_debug_transport ()))
1802     goto out;
1803
1804   _g_dbus_debug_print_lock ();
1805   g_print ("========================================================================\n"
1806            "GDBus-debug:Transport:\n"
1807            "  >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
1808            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
1809            bytes_written,
1810            g_dbus_message_get_serial (data->message),
1811            data->blob_size,
1812            data->total_written,
1813            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1814   _g_dbus_debug_print_unlock ();
1815  out:
1816   ;
1817 }
1818
1819 /* ---------------------------------------------------------------------------------------------------- */
1820
1821 static void
1822 read_message_print_transport_debug (gssize bytes_read,
1823                                     GDBusWorker *worker)
1824 {
1825   gsize size;
1826   gint32 serial;
1827   gint32 message_length;
1828
1829   if (G_LIKELY (!_g_dbus_debug_transport ()))
1830     goto out;
1831
1832   size = bytes_read + worker->read_buffer_cur_size;
1833   serial = 0;
1834   message_length = 0;
1835   if (size >= 16)
1836     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
1837   if (size >= 1)
1838     {
1839       switch (worker->read_buffer[0])
1840         {
1841         case 'l':
1842           if (size >= 12)
1843             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
1844           break;
1845         case 'B':
1846           if (size >= 12)
1847             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
1848           break;
1849         default:
1850           /* an error will be set elsewhere if this happens */
1851           goto out;
1852         }
1853     }
1854
1855     _g_dbus_debug_print_lock ();
1856   g_print ("========================================================================\n"
1857            "GDBus-debug:Transport:\n"
1858            "  <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
1859            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
1860            bytes_read,
1861            serial,
1862            message_length,
1863            worker->read_buffer_cur_size,
1864            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
1865   _g_dbus_debug_print_unlock ();
1866  out:
1867   ;
1868 }