Merge remote branch 'gvdb/master'
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: David Zeuthen <davidz@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <stdlib.h>
26 #include <string.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30
31 #include "giotypes.h"
32 #include "gsocket.h"
33 #include "gdbusprivate.h"
34 #include "gdbusmessage.h"
35 #include "gdbuserror.h"
36 #include "gdbusintrospection.h"
37 #include "gasyncresult.h"
38 #include "gsimpleasyncresult.h"
39 #include "ginputstream.h"
40 #include "gmemoryinputstream.h"
41 #include "giostream.h"
42 #include "gsocketcontrolmessage.h"
43 #include "gsocketconnection.h"
44 #include "gsocketoutputstream.h"
45
46 #ifdef G_OS_UNIX
47 #include "gunixfdmessage.h"
48 #include "gunixconnection.h"
49 #include "gunixcredentialsmessage.h"
50 #endif
51
52 #ifdef G_OS_WIN32
53 #include <windows.h>
54 #endif
55
56 #include "glibintl.h"
57
58 /* ---------------------------------------------------------------------------------------------------- */
59
60 gchar *
61 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
62 {
63  guint n, m;
64  GString *ret;
65
66  ret = g_string_new (NULL);
67
68  for (n = 0; n < len; n += 16)
69    {
70      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
71
72      for (m = n; m < n + 16; m++)
73        {
74          if (m > n && (m%4) == 0)
75            g_string_append_c (ret, ' ');
76          if (m < len)
77            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
78          else
79            g_string_append (ret, "   ");
80        }
81
82      g_string_append (ret, "   ");
83
84      for (m = n; m < len && m < n + 16; m++)
85        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
86
87      g_string_append_c (ret, '\n');
88    }
89
90  return g_string_free (ret, FALSE);
91 }
92
93 /* ---------------------------------------------------------------------------------------------------- */
94
95 /* Unfortunately ancillary messages are discarded when reading from a
96  * socket using the GSocketInputStream abstraction. So we provide a
97  * very GInputStream-ish API that uses GSocket in this case (very
98  * similar to GSocketInputStream).
99  */
100
101 typedef struct
102 {
103   GSocket *socket;
104   GCancellable *cancellable;
105
106   void *buffer;
107   gsize count;
108
109   GSocketControlMessage ***messages;
110   gint *num_messages;
111
112   GSimpleAsyncResult *simple;
113
114   gboolean from_mainloop;
115 } ReadWithControlData;
116
117 static void
118 read_with_control_data_free (ReadWithControlData *data)
119 {
120   g_object_unref (data->socket);
121   if (data->cancellable != NULL)
122     g_object_unref (data->cancellable);
123   g_object_unref (data->simple);
124   g_free (data);
125 }
126
127 static gboolean
128 _g_socket_read_with_control_messages_ready (GSocket      *socket,
129                                             GIOCondition  condition,
130                                             gpointer      user_data)
131 {
132   ReadWithControlData *data = user_data;
133   GError *error;
134   gssize result;
135   GInputVector vector;
136
137   error = NULL;
138   vector.buffer = data->buffer;
139   vector.size = data->count;
140   result = g_socket_receive_message (data->socket,
141                                      NULL, /* address */
142                                      &vector,
143                                      1,
144                                      data->messages,
145                                      data->num_messages,
146                                      NULL,
147                                      data->cancellable,
148                                      &error);
149   if (result >= 0)
150     {
151       g_simple_async_result_set_op_res_gssize (data->simple, result);
152     }
153   else
154     {
155       g_assert (error != NULL);
156       g_simple_async_result_set_from_error (data->simple, error);
157       g_error_free (error);
158     }
159
160   if (data->from_mainloop)
161     g_simple_async_result_complete (data->simple);
162   else
163     g_simple_async_result_complete_in_idle (data->simple);
164
165   return FALSE;
166 }
167
168 static void
169 _g_socket_read_with_control_messages (GSocket                 *socket,
170                                       void                    *buffer,
171                                       gsize                    count,
172                                       GSocketControlMessage ***messages,
173                                       gint                    *num_messages,
174                                       gint                     io_priority,
175                                       GCancellable            *cancellable,
176                                       GAsyncReadyCallback      callback,
177                                       gpointer                 user_data)
178 {
179   ReadWithControlData *data;
180
181   data = g_new0 (ReadWithControlData, 1);
182   data->socket = g_object_ref (socket);
183   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
184   data->buffer = buffer;
185   data->count = count;
186   data->messages = messages;
187   data->num_messages = num_messages;
188
189   data->simple = g_simple_async_result_new (G_OBJECT (socket),
190                                             callback,
191                                             user_data,
192                                             _g_socket_read_with_control_messages);
193
194   if (!g_socket_condition_check (socket, G_IO_IN))
195     {
196       GSource *source;
197       data->from_mainloop = TRUE;
198       source = g_socket_create_source (data->socket,
199                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
200                                        cancellable);
201       g_source_set_callback (source,
202                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
203                              data,
204                              (GDestroyNotify) read_with_control_data_free);
205       g_source_attach (source, g_main_context_get_thread_default ());
206       g_source_unref (source);
207     }
208   else
209     {
210       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
211       read_with_control_data_free (data);
212     }
213 }
214
215 static gssize
216 _g_socket_read_with_control_messages_finish (GSocket       *socket,
217                                              GAsyncResult  *result,
218                                              GError       **error)
219 {
220   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
221
222   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
223   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
224
225   if (g_simple_async_result_propagate_error (simple, error))
226       return -1;
227   else
228     return g_simple_async_result_get_op_res_gssize (simple);
229 }
230
231 /* ---------------------------------------------------------------------------------------------------- */
232
233 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
234
235 static GPtrArray *ensured_classes = NULL;
236
237 static void
238 ensure_type (GType gtype)
239 {
240   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
241 }
242
243 static void
244 released_required_types (void)
245 {
246   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
247   g_ptr_array_unref (ensured_classes);
248   ensured_classes = NULL;
249 }
250
251 static void
252 ensure_required_types (void)
253 {
254   g_assert (ensured_classes == NULL);
255   ensured_classes = g_ptr_array_new ();
256   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
257   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
258 }
259 /* ---------------------------------------------------------------------------------------------------- */
260
261 G_LOCK_DEFINE_STATIC (shared_thread_lock);
262
263 typedef struct
264 {
265   gint num_users;
266   GThread *thread;
267   GMainContext *context;
268   GMainLoop *loop;
269 } SharedThreadData;
270
271 static SharedThreadData *shared_thread_data = NULL;
272
273 static gpointer
274 gdbus_shared_thread_func (gpointer data)
275 {
276   g_main_context_push_thread_default (shared_thread_data->context);
277   g_main_loop_run (shared_thread_data->loop);
278   g_main_context_pop_thread_default (shared_thread_data->context);
279   return NULL;
280 }
281
282 typedef void (*GDBusSharedThreadFunc) (gpointer user_data);
283
284 typedef struct
285 {
286   GDBusSharedThreadFunc func;
287   gpointer              user_data;
288   gboolean              done;
289 } CallerData;
290
291 static gboolean
292 invoke_caller (gpointer user_data)
293 {
294   CallerData *data = user_data;
295   data->func (data->user_data);
296   data->done = TRUE;
297   return FALSE;
298 }
299
300 /* ---------------------------------------------------------------------------------------------------- */
301
302 static void
303 _g_dbus_shared_thread_ref (GDBusSharedThreadFunc func,
304                            gpointer              user_data)
305 {
306   GError *error;
307   GSource *idle_source;
308   CallerData *data;
309   gboolean release_types;
310
311   G_LOCK (shared_thread_lock);
312
313   release_types = FALSE;
314
315   if (shared_thread_data != NULL)
316     {
317       shared_thread_data->num_users += 1;
318       goto have_thread;
319     }
320
321   shared_thread_data = g_new0 (SharedThreadData, 1);
322   shared_thread_data->num_users = 1;
323
324   /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
325   ensure_required_types ();
326   release_types = TRUE;
327
328   error = NULL;
329   shared_thread_data->context = g_main_context_new ();
330   shared_thread_data->loop = g_main_loop_new (shared_thread_data->context, FALSE);
331   shared_thread_data->thread = g_thread_create (gdbus_shared_thread_func,
332                                                 NULL,
333                                                 TRUE,
334                                                 &error);
335   g_assert_no_error (error);
336
337  have_thread:
338
339   data = g_new0 (CallerData, 1);
340   data->func = func;
341   data->user_data = user_data;
342   data->done = FALSE;
343
344   idle_source = g_idle_source_new ();
345   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
346   g_source_set_callback (idle_source,
347                          invoke_caller,
348                          data,
349                          NULL);
350   g_source_attach (idle_source, shared_thread_data->context);
351   g_source_unref (idle_source);
352
353   /* wait for the user code to run.. hmm.. probably use a condition variable instead */
354   while (!data->done)
355     g_thread_yield ();
356
357   if (release_types)
358     released_required_types ();
359
360   g_free (data);
361
362   G_UNLOCK (shared_thread_lock);
363 }
364
365 static void
366 _g_dbus_shared_thread_unref (void)
367 {
368   /* TODO: actually destroy the shared thread here */
369 #if 0
370   G_LOCK (shared_thread_lock);
371   g_assert (shared_thread_data != NULL);
372   shared_thread_data->num_users -= 1;
373   if (shared_thread_data->num_users == 0)
374     {
375       g_main_loop_quit (shared_thread_data->loop);
376       //g_thread_join (shared_thread_data->thread);
377       g_main_loop_unref (shared_thread_data->loop);
378       g_main_context_unref (shared_thread_data->context);
379       g_free (shared_thread_data);
380       shared_thread_data = NULL;
381       G_UNLOCK (shared_thread_lock);
382     }
383   else
384     {
385       G_UNLOCK (shared_thread_lock);
386     }
387 #endif
388 }
389
390 /* ---------------------------------------------------------------------------------------------------- */
391
392 struct GDBusWorker
393 {
394   volatile gint                       ref_count;
395
396   gboolean                            stopped;
397
398   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
399    * only affects messages received from the other peer (since GDBusServer is the
400    * only user) - we might want it to affect messages sent to the other peer too?
401    */
402   gboolean                            frozen;
403   GQueue                             *received_messages_while_frozen;
404
405   GIOStream                          *stream;
406   GDBusCapabilityFlags                capabilities;
407   GCancellable                       *cancellable;
408   GDBusWorkerMessageReceivedCallback  message_received_callback;
409   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
410   GDBusWorkerDisconnectedCallback     disconnected_callback;
411   gpointer                            user_data;
412
413   GThread                            *thread;
414
415   /* if not NULL, stream is GSocketConnection */
416   GSocket *socket;
417
418   /* used for reading */
419   GMutex                             *read_lock;
420   gchar                              *read_buffer;
421   gsize                               read_buffer_allocated_size;
422   gsize                               read_buffer_cur_size;
423   gsize                               read_buffer_bytes_wanted;
424   GUnixFDList                        *read_fd_list;
425   GSocketControlMessage             **read_ancillary_messages;
426   gint                                read_num_ancillary_messages;
427
428   /* used for writing */
429   GMutex                             *write_lock;
430   GQueue                             *write_queue;
431   gint                                num_writes_pending;
432   guint64                             write_num_messages_written;
433   GList                              *write_pending_flushes;
434 };
435
436 /* ---------------------------------------------------------------------------------------------------- */
437
438 typedef struct
439 {
440   GMutex *mutex;
441   GCond *cond;
442   guint64 number_to_wait_for;
443   GError *error;
444 } FlushData;
445
446 struct _MessageToWriteData ;
447 typedef struct _MessageToWriteData MessageToWriteData;
448
449 static void message_to_write_data_free (MessageToWriteData *data);
450
451 static void read_message_print_transport_debug (gssize bytes_read,
452                                                 GDBusWorker *worker);
453
454 static void write_message_print_transport_debug (gssize bytes_written,
455                                                  MessageToWriteData *data);
456
457 /* ---------------------------------------------------------------------------------------------------- */
458
459 static GDBusWorker *
460 _g_dbus_worker_ref (GDBusWorker *worker)
461 {
462   g_atomic_int_inc (&worker->ref_count);
463   return worker;
464 }
465
466 static void
467 _g_dbus_worker_unref (GDBusWorker *worker)
468 {
469   if (g_atomic_int_dec_and_test (&worker->ref_count))
470     {
471       g_assert (worker->write_pending_flushes == NULL);
472
473       _g_dbus_shared_thread_unref ();
474
475       g_object_unref (worker->stream);
476
477       g_mutex_free (worker->read_lock);
478       g_object_unref (worker->cancellable);
479       if (worker->read_fd_list != NULL)
480         g_object_unref (worker->read_fd_list);
481
482       g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
483       g_queue_free (worker->received_messages_while_frozen);
484
485       g_mutex_free (worker->write_lock);
486       g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
487       g_queue_free (worker->write_queue);
488
489       g_free (worker->read_buffer);
490
491       g_free (worker);
492     }
493 }
494
495 static void
496 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
497                                   gboolean      remote_peer_vanished,
498                                   GError       *error)
499 {
500   if (!worker->stopped)
501     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
502 }
503
504 static void
505 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
506                                       GDBusMessage *message)
507 {
508   if (!worker->stopped)
509     worker->message_received_callback (worker, message, worker->user_data);
510 }
511
512 static GDBusMessage *
513 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
514                                               GDBusMessage *message)
515 {
516   GDBusMessage *ret;
517   if (!worker->stopped)
518     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
519   else
520     ret = message;
521   return ret;
522 }
523
524 /* can only be called from private thread with read-lock held - takes ownership of @message */
525 static void
526 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
527                                                   GDBusMessage *message)
528 {
529   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
530     {
531       /* queue up */
532       g_queue_push_tail (worker->received_messages_while_frozen, message);
533     }
534   else
535     {
536       /* not frozen, nor anything in queue */
537       _g_dbus_worker_emit_message_received (worker, message);
538       g_object_unref (message);
539     }
540 }
541
542 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
543 static gboolean
544 unfreeze_in_idle_cb (gpointer user_data)
545 {
546   GDBusWorker *worker = user_data;
547   GDBusMessage *message;
548
549   g_mutex_lock (worker->read_lock);
550   if (worker->frozen)
551     {
552       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
553         {
554           _g_dbus_worker_emit_message_received (worker, message);
555           g_object_unref (message);
556         }
557       worker->frozen = FALSE;
558     }
559   else
560     {
561       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
562     }
563   g_mutex_unlock (worker->read_lock);
564   return FALSE;
565 }
566
567 /* can be called from any thread */
568 void
569 _g_dbus_worker_unfreeze (GDBusWorker *worker)
570 {
571   GSource *idle_source;
572   idle_source = g_idle_source_new ();
573   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
574   g_source_set_callback (idle_source,
575                          unfreeze_in_idle_cb,
576                          _g_dbus_worker_ref (worker),
577                          (GDestroyNotify) _g_dbus_worker_unref);
578   g_source_attach (idle_source, shared_thread_data->context);
579   g_source_unref (idle_source);
580 }
581
582 /* ---------------------------------------------------------------------------------------------------- */
583
584 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
585
586 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
587 static void
588 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
589                            GAsyncResult  *res,
590                            gpointer       user_data)
591 {
592   GDBusWorker *worker = user_data;
593   GError *error;
594   gssize bytes_read;
595
596   g_mutex_lock (worker->read_lock);
597
598   /* If already stopped, don't even process the reply */
599   if (worker->stopped)
600     goto out;
601
602   error = NULL;
603   if (worker->socket == NULL)
604     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
605                                              res,
606                                              &error);
607   else
608     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
609                                                               res,
610                                                               &error);
611   if (worker->read_num_ancillary_messages > 0)
612     {
613       gint n;
614       for (n = 0; n < worker->read_num_ancillary_messages; n++)
615         {
616           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
617
618           if (FALSE)
619             {
620             }
621 #ifdef G_OS_UNIX
622           else if (G_IS_UNIX_FD_MESSAGE (control_message))
623             {
624               GUnixFDMessage *fd_message;
625               gint *fds;
626               gint num_fds;
627
628               fd_message = G_UNIX_FD_MESSAGE (control_message);
629               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
630               if (worker->read_fd_list == NULL)
631                 {
632                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
633                 }
634               else
635                 {
636                   gint n;
637                   for (n = 0; n < num_fds; n++)
638                     {
639                       /* TODO: really want a append_steal() */
640                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
641                       close (fds[n]);
642                     }
643                 }
644               g_free (fds);
645             }
646           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
647             {
648               /* do nothing */
649             }
650 #endif
651           else
652             {
653               if (error == NULL)
654                 {
655                   g_set_error (&error,
656                                G_IO_ERROR,
657                                G_IO_ERROR_FAILED,
658                                "Unexpected ancillary message of type %s received from peer",
659                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
660                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
661                   g_error_free (error);
662                   g_object_unref (control_message);
663                   n++;
664                   while (n < worker->read_num_ancillary_messages)
665                     g_object_unref (worker->read_ancillary_messages[n++]);
666                   g_free (worker->read_ancillary_messages);
667                   goto out;
668                 }
669             }
670           g_object_unref (control_message);
671         }
672       g_free (worker->read_ancillary_messages);
673     }
674
675   if (bytes_read == -1)
676     {
677       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
678       g_error_free (error);
679       goto out;
680     }
681
682 #if 0
683   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
684            (gint) bytes_read,
685            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
686            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
687            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
688                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
689            worker->stream,
690            worker);
691 #endif
692
693   /* TODO: hmm, hmm... */
694   if (bytes_read == 0)
695     {
696       g_set_error (&error,
697                    G_IO_ERROR,
698                    G_IO_ERROR_FAILED,
699                    "Underlying GIOStream returned 0 bytes on an async read");
700       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
701       g_error_free (error);
702       goto out;
703     }
704
705   read_message_print_transport_debug (bytes_read, worker);
706
707   worker->read_buffer_cur_size += bytes_read;
708   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
709     {
710       /* OK, got what we asked for! */
711       if (worker->read_buffer_bytes_wanted == 16)
712         {
713           gssize message_len;
714           /* OK, got the header - determine how many more bytes are needed */
715           error = NULL;
716           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
717                                                      16,
718                                                      &error);
719           if (message_len == -1)
720             {
721               g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
722               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
723               g_error_free (error);
724               goto out;
725             }
726
727           worker->read_buffer_bytes_wanted = message_len;
728           _g_dbus_worker_do_read_unlocked (worker);
729         }
730       else
731         {
732           GDBusMessage *message;
733           error = NULL;
734
735           /* TODO: use connection->priv->auth to decode the message */
736
737           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
738                                                   worker->read_buffer_cur_size,
739                                                   worker->capabilities,
740                                                   &error);
741           if (message == NULL)
742             {
743               gchar *s;
744               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
745               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
746                          "The error is: %s\n"
747                          "The payload is as follows:\n"
748                          "%s\n",
749                          worker->read_buffer_cur_size,
750                          error->message,
751                          s);
752               g_free (s);
753               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
754               g_error_free (error);
755               goto out;
756             }
757
758 #ifdef G_OS_UNIX
759           if (worker->read_fd_list != NULL)
760             {
761               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
762               g_object_unref (worker->read_fd_list);
763               worker->read_fd_list = NULL;
764             }
765 #endif
766
767           if (G_UNLIKELY (_g_dbus_debug_message ()))
768             {
769               gchar *s;
770               _g_dbus_debug_print_lock ();
771               g_print ("========================================================================\n"
772                        "GDBus-debug:Message:\n"
773                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
774                        worker->read_buffer_cur_size);
775               s = g_dbus_message_print (message, 2);
776               g_print ("%s", s);
777               g_free (s);
778               if (G_UNLIKELY (_g_dbus_debug_payload ()))
779                 {
780                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
781                   g_print ("%s\n", s);
782                   g_free (s);
783                 }
784               _g_dbus_debug_print_unlock ();
785             }
786
787           /* yay, got a message, go deliver it */
788           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
789
790           /* start reading another message! */
791           worker->read_buffer_bytes_wanted = 0;
792           worker->read_buffer_cur_size = 0;
793           _g_dbus_worker_do_read_unlocked (worker);
794         }
795     }
796   else
797     {
798       /* didn't get all the bytes we requested - so repeat the request... */
799       _g_dbus_worker_do_read_unlocked (worker);
800     }
801
802  out:
803   g_mutex_unlock (worker->read_lock);
804
805   /* gives up the reference acquired when calling g_input_stream_read_async() */
806   _g_dbus_worker_unref (worker);
807 }
808
809 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
810 static void
811 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
812 {
813   /* if bytes_wanted is zero, it means start reading a message */
814   if (worker->read_buffer_bytes_wanted == 0)
815     {
816       worker->read_buffer_cur_size = 0;
817       worker->read_buffer_bytes_wanted = 16;
818     }
819
820   /* ensure we have a (big enough) buffer */
821   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
822     {
823       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
824       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
825       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
826     }
827
828   if (worker->socket == NULL)
829     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
830                                worker->read_buffer + worker->read_buffer_cur_size,
831                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
832                                G_PRIORITY_DEFAULT,
833                                worker->cancellable,
834                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
835                                _g_dbus_worker_ref (worker));
836   else
837     {
838       worker->read_ancillary_messages = NULL;
839       worker->read_num_ancillary_messages = 0;
840       _g_socket_read_with_control_messages (worker->socket,
841                                             worker->read_buffer + worker->read_buffer_cur_size,
842                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
843                                             &worker->read_ancillary_messages,
844                                             &worker->read_num_ancillary_messages,
845                                             G_PRIORITY_DEFAULT,
846                                             worker->cancellable,
847                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
848                                             _g_dbus_worker_ref (worker));
849     }
850 }
851
852 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
853 static void
854 _g_dbus_worker_do_read (GDBusWorker *worker)
855 {
856   g_mutex_lock (worker->read_lock);
857   _g_dbus_worker_do_read_unlocked (worker);
858   g_mutex_unlock (worker->read_lock);
859 }
860
861 /* ---------------------------------------------------------------------------------------------------- */
862
863 struct _MessageToWriteData
864 {
865   GDBusWorker  *worker;
866   GDBusMessage *message;
867   gchar        *blob;
868   gsize         blob_size;
869
870   gsize               total_written;
871   GSimpleAsyncResult *simple;
872
873 };
874
875 static void
876 message_to_write_data_free (MessageToWriteData *data)
877 {
878   _g_dbus_worker_unref (data->worker);
879   if (data->message)
880     g_object_unref (data->message);
881   g_free (data->blob);
882   g_free (data);
883 }
884
885 /* ---------------------------------------------------------------------------------------------------- */
886
887 static void write_message_continue_writing (MessageToWriteData *data);
888
889 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
890 static void
891 write_message_async_cb (GObject      *source_object,
892                         GAsyncResult *res,
893                         gpointer      user_data)
894 {
895   MessageToWriteData *data = user_data;
896   GSimpleAsyncResult *simple;
897   gssize bytes_written;
898   GError *error;
899
900   /* Note: we can't access data->simple after calling g_async_result_complete () because the
901    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
902    */
903   simple = data->simple;
904
905   error = NULL;
906   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
907                                                 res,
908                                                 &error);
909   if (bytes_written == -1)
910     {
911       g_simple_async_result_set_from_error (simple, error);
912       g_error_free (error);
913       g_simple_async_result_complete (simple);
914       g_object_unref (simple);
915       goto out;
916     }
917   g_assert (bytes_written > 0); /* zero is never returned */
918
919   write_message_print_transport_debug (bytes_written, data);
920
921   data->total_written += bytes_written;
922   g_assert (data->total_written <= data->blob_size);
923   if (data->total_written == data->blob_size)
924     {
925       g_simple_async_result_complete (simple);
926       g_object_unref (simple);
927       goto out;
928     }
929
930   write_message_continue_writing (data);
931
932  out:
933   ;
934 }
935
936 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
937 static gboolean
938 on_socket_ready (GSocket      *socket,
939                  GIOCondition  condition,
940                  gpointer      user_data)
941 {
942   MessageToWriteData *data = user_data;
943   write_message_continue_writing (data);
944   return FALSE; /* remove source */
945 }
946
947 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
948 static void
949 write_message_continue_writing (MessageToWriteData *data)
950 {
951   GOutputStream *ostream;
952   GSimpleAsyncResult *simple;
953 #ifdef G_OS_UNIX
954   GUnixFDList *fd_list;
955 #endif
956
957   /* Note: we can't access data->simple after calling g_async_result_complete () because the
958    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
959    */
960   simple = data->simple;
961
962   ostream = g_io_stream_get_output_stream (data->worker->stream);
963 #ifdef G_OS_UNIX
964   fd_list = g_dbus_message_get_unix_fd_list (data->message);
965 #endif
966
967   g_assert (!g_output_stream_has_pending (ostream));
968   g_assert_cmpint (data->total_written, <, data->blob_size);
969
970   if (FALSE)
971     {
972     }
973 #ifdef G_OS_UNIX
974   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
975     {
976       GOutputVector vector;
977       GSocketControlMessage *control_message;
978       gssize bytes_written;
979       GError *error;
980
981       vector.buffer = data->blob;
982       vector.size = data->blob_size;
983
984       control_message = NULL;
985       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
986         {
987           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
988             {
989               g_simple_async_result_set_error (simple,
990                                                G_IO_ERROR,
991                                                G_IO_ERROR_FAILED,
992                                                "Tried sending a file descriptor but remote peer does not support this capability");
993               g_simple_async_result_complete (simple);
994               g_object_unref (simple);
995               goto out;
996             }
997           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
998         }
999
1000       error = NULL;
1001       bytes_written = g_socket_send_message (data->worker->socket,
1002                                              NULL, /* address */
1003                                              &vector,
1004                                              1,
1005                                              control_message != NULL ? &control_message : NULL,
1006                                              control_message != NULL ? 1 : 0,
1007                                              G_SOCKET_MSG_NONE,
1008                                              data->worker->cancellable,
1009                                              &error);
1010       if (control_message != NULL)
1011         g_object_unref (control_message);
1012
1013       if (bytes_written == -1)
1014         {
1015           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1016           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1017             {
1018               GSource *source;
1019               source = g_socket_create_source (data->worker->socket,
1020                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
1021                                                data->worker->cancellable);
1022               g_source_set_callback (source,
1023                                      (GSourceFunc) on_socket_ready,
1024                                      data,
1025                                      NULL); /* GDestroyNotify */
1026               g_source_attach (source, g_main_context_get_thread_default ());
1027               g_source_unref (source);
1028               g_error_free (error);
1029               goto out;
1030             }
1031           g_simple_async_result_set_from_error (simple, error);
1032           g_error_free (error);
1033           g_simple_async_result_complete (simple);
1034           g_object_unref (simple);
1035           goto out;
1036         }
1037       g_assert (bytes_written > 0); /* zero is never returned */
1038
1039       write_message_print_transport_debug (bytes_written, data);
1040
1041       data->total_written += bytes_written;
1042       g_assert (data->total_written <= data->blob_size);
1043       if (data->total_written == data->blob_size)
1044         {
1045           g_simple_async_result_complete (simple);
1046           g_object_unref (simple);
1047           goto out;
1048         }
1049
1050       write_message_continue_writing (data);
1051     }
1052 #endif
1053   else
1054     {
1055 #ifdef G_OS_UNIX
1056       if (fd_list != NULL)
1057         {
1058           g_simple_async_result_set_error (simple,
1059                                            G_IO_ERROR,
1060                                            G_IO_ERROR_FAILED,
1061                                            "Tried sending a file descriptor on unsupported stream of type %s",
1062                                            g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1063           g_simple_async_result_complete (simple);
1064           g_object_unref (simple);
1065           goto out;
1066         }
1067 #endif
1068
1069       g_output_stream_write_async (ostream,
1070                                    (const gchar *) data->blob + data->total_written,
1071                                    data->blob_size - data->total_written,
1072                                    G_PRIORITY_DEFAULT,
1073                                    data->worker->cancellable,
1074                                    write_message_async_cb,
1075                                    data);
1076     }
1077  out:
1078   ;
1079 }
1080
1081 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1082 static void
1083 write_message_async (GDBusWorker         *worker,
1084                      MessageToWriteData  *data,
1085                      GAsyncReadyCallback  callback,
1086                      gpointer             user_data)
1087 {
1088   data->simple = g_simple_async_result_new (NULL,
1089                                             callback,
1090                                             user_data,
1091                                             write_message_async);
1092   data->total_written = 0;
1093   write_message_continue_writing (data);
1094 }
1095
1096 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1097 static gboolean
1098 write_message_finish (GAsyncResult   *res,
1099                       GError        **error)
1100 {
1101   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1102   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1103     return FALSE;
1104   else
1105     return TRUE;
1106 }
1107 /* ---------------------------------------------------------------------------------------------------- */
1108
1109 static void maybe_write_next_message (GDBusWorker *worker);
1110
1111 typedef struct
1112 {
1113   GDBusWorker *worker;
1114   GList *flushers;
1115 } FlushAsyncData;
1116
1117 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1118 static void
1119 ostream_flush_cb (GObject      *source_object,
1120                   GAsyncResult *res,
1121                   gpointer      user_data)
1122 {
1123   FlushAsyncData *data = user_data;
1124   GError *error;
1125   GList *l;
1126
1127   error = NULL;
1128   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1129                                 res,
1130                                 &error);
1131
1132   if (error == NULL)
1133     {
1134       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1135         {
1136           _g_dbus_debug_print_lock ();
1137           g_print ("========================================================================\n"
1138                    "GDBus-debug:Transport:\n"
1139                    "  ---- FLUSHED stream of type %s\n",
1140                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1141           _g_dbus_debug_print_unlock ();
1142         }
1143     }
1144
1145   g_assert (data->flushers != NULL);
1146   for (l = data->flushers; l != NULL; l = l->next)
1147     {
1148       FlushData *f = l->data;
1149
1150       f->error = error != NULL ? g_error_copy (error) : NULL;
1151
1152       g_mutex_lock (f->mutex);
1153       g_cond_signal (f->cond);
1154       g_mutex_unlock (f->mutex);
1155     }
1156   g_list_free (data->flushers);
1157
1158   if (error != NULL)
1159     g_error_free (error);
1160
1161   /* OK, cool, finally kick off the next write */
1162   maybe_write_next_message (data->worker);
1163
1164   _g_dbus_worker_unref (data->worker);
1165   g_free (data);
1166 }
1167
1168 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1169 static void
1170 message_written (GDBusWorker *worker,
1171                  MessageToWriteData *message_data)
1172 {
1173   GList *l;
1174   GList *ll;
1175   GList *flushers;
1176
1177   /* first log the fact that we wrote a message */
1178   if (G_UNLIKELY (_g_dbus_debug_message ()))
1179     {
1180       gchar *s;
1181       _g_dbus_debug_print_lock ();
1182       g_print ("========================================================================\n"
1183                "GDBus-debug:Message:\n"
1184                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1185                message_data->blob_size);
1186       s = g_dbus_message_print (message_data->message, 2);
1187       g_print ("%s", s);
1188       g_free (s);
1189       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1190         {
1191           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1192           g_print ("%s\n", s);
1193           g_free (s);
1194         }
1195       _g_dbus_debug_print_unlock ();
1196     }
1197
1198   /* then first wake up pending flushes and, if needed, flush the stream */
1199   flushers = NULL;
1200   g_mutex_lock (worker->write_lock);
1201   worker->write_num_messages_written += 1;
1202   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1203     {
1204       FlushData *f = l->data;
1205       ll = l->next;
1206
1207       if (f->number_to_wait_for == worker->write_num_messages_written)
1208         {
1209           flushers = g_list_append (flushers, f);
1210           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1211         }
1212     }
1213   g_mutex_unlock (worker->write_lock);
1214
1215   if (flushers != NULL)
1216     {
1217       FlushAsyncData *data;
1218       data = g_new0 (FlushAsyncData, 1);
1219       data->worker = _g_dbus_worker_ref (worker);
1220       data->flushers = flushers;
1221       /* flush the stream before writing the next message */
1222       g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
1223                                    G_PRIORITY_DEFAULT,
1224                                    worker->cancellable,
1225                                    ostream_flush_cb,
1226                                    data);
1227     }
1228   else
1229     {
1230       /* kick off the next write! */
1231       maybe_write_next_message (worker);
1232     }
1233 }
1234
1235 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1236 static void
1237 write_message_cb (GObject       *source_object,
1238                   GAsyncResult  *res,
1239                   gpointer       user_data)
1240 {
1241   MessageToWriteData *data = user_data;
1242   GError *error;
1243
1244   g_mutex_lock (data->worker->write_lock);
1245   data->worker->num_writes_pending -= 1;
1246   g_mutex_unlock (data->worker->write_lock);
1247
1248   error = NULL;
1249   if (!write_message_finish (res, &error))
1250     {
1251       /* TODO: handle */
1252       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1253       g_error_free (error);
1254     }
1255
1256   /* this function will also kick of the next write (it might need to
1257    * flush so writing the next message might happen much later
1258    * e.g. async)
1259    */
1260   message_written (data->worker, data);
1261
1262   message_to_write_data_free (data);
1263 }
1264
1265 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1266 static void
1267 maybe_write_next_message (GDBusWorker *worker)
1268 {
1269   MessageToWriteData *data;
1270
1271  write_next:
1272
1273   g_mutex_lock (worker->write_lock);
1274   data = g_queue_pop_head (worker->write_queue);
1275   if (data != NULL)
1276     worker->num_writes_pending += 1;
1277   g_mutex_unlock (worker->write_lock);
1278
1279   /* Note that write_lock is only used for protecting the @write_queue
1280    * and @num_writes_pending fields of the GDBusWorker struct ... which we
1281    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1282    *
1283    * Therefore, it's fine to drop it here when calling back into user
1284    * code and then writing the message out onto the GIOStream since this
1285    * function only runs on the worker thread.
1286    */
1287   if (data != NULL)
1288     {
1289       GDBusMessage *old_message;
1290       guchar *new_blob;
1291       gsize new_blob_size;
1292       GError *error;
1293
1294       old_message = data->message;
1295       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1296       if (data->message == old_message)
1297         {
1298           /* filters had no effect - do nothing */
1299         }
1300       else if (data->message == NULL)
1301         {
1302           /* filters dropped message */
1303           g_mutex_lock (worker->write_lock);
1304           worker->num_writes_pending -= 1;
1305           g_mutex_unlock (worker->write_lock);
1306           message_to_write_data_free (data);
1307           goto write_next;
1308         }
1309       else
1310         {
1311           /* filters altered the message -> reencode */
1312           error = NULL;
1313           new_blob = g_dbus_message_to_blob (data->message,
1314                                              &new_blob_size,
1315                                              worker->capabilities,
1316                                              &error);
1317           if (new_blob == NULL)
1318             {
1319               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1320                * the old message instead
1321                */
1322               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1323                          g_dbus_message_get_serial (data->message),
1324                          error->message);
1325               g_error_free (error);
1326             }
1327           else
1328             {
1329               g_free (data->blob);
1330               data->blob = (gchar *) new_blob;
1331               data->blob_size = new_blob_size;
1332             }
1333         }
1334
1335       write_message_async (worker,
1336                            data,
1337                            write_message_cb,
1338                            data);
1339     }
1340 }
1341
1342 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1343 static gboolean
1344 write_message_in_idle_cb (gpointer user_data)
1345 {
1346   GDBusWorker *worker = user_data;
1347   if (worker->num_writes_pending == 0)
1348     maybe_write_next_message (worker);
1349   return FALSE;
1350 }
1351
1352 /* ---------------------------------------------------------------------------------------------------- */
1353
1354 /* can be called from any thread - steals blob */
1355 void
1356 _g_dbus_worker_send_message (GDBusWorker    *worker,
1357                              GDBusMessage   *message,
1358                              gchar          *blob,
1359                              gsize           blob_len)
1360 {
1361   MessageToWriteData *data;
1362
1363   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1364   g_return_if_fail (blob != NULL);
1365   g_return_if_fail (blob_len > 16);
1366
1367   data = g_new0 (MessageToWriteData, 1);
1368   data->worker = _g_dbus_worker_ref (worker);
1369   data->message = g_object_ref (message);
1370   data->blob = blob; /* steal! */
1371   data->blob_size = blob_len;
1372
1373   g_mutex_lock (worker->write_lock);
1374   g_queue_push_tail (worker->write_queue, data);
1375   if (worker->num_writes_pending == 0)
1376     {
1377       GSource *idle_source;
1378       idle_source = g_idle_source_new ();
1379       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1380       g_source_set_callback (idle_source,
1381                              write_message_in_idle_cb,
1382                              _g_dbus_worker_ref (worker),
1383                              (GDestroyNotify) _g_dbus_worker_unref);
1384       g_source_attach (idle_source, shared_thread_data->context);
1385       g_source_unref (idle_source);
1386     }
1387   g_mutex_unlock (worker->write_lock);
1388 }
1389
1390 /* ---------------------------------------------------------------------------------------------------- */
1391
1392 static void
1393 _g_dbus_worker_thread_begin_func (gpointer user_data)
1394 {
1395   GDBusWorker *worker = user_data;
1396
1397   worker->thread = g_thread_self ();
1398
1399   /* begin reading */
1400   _g_dbus_worker_do_read (worker);
1401 }
1402
1403 GDBusWorker *
1404 _g_dbus_worker_new (GIOStream                              *stream,
1405                     GDBusCapabilityFlags                    capabilities,
1406                     gboolean                                initially_frozen,
1407                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1408                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1409                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1410                     gpointer                                user_data)
1411 {
1412   GDBusWorker *worker;
1413
1414   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1415   g_return_val_if_fail (message_received_callback != NULL, NULL);
1416   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1417   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1418
1419   worker = g_new0 (GDBusWorker, 1);
1420   worker->ref_count = 1;
1421
1422   worker->read_lock = g_mutex_new ();
1423   worker->message_received_callback = message_received_callback;
1424   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1425   worker->disconnected_callback = disconnected_callback;
1426   worker->user_data = user_data;
1427   worker->stream = g_object_ref (stream);
1428   worker->capabilities = capabilities;
1429   worker->cancellable = g_cancellable_new ();
1430
1431   worker->frozen = initially_frozen;
1432   worker->received_messages_while_frozen = g_queue_new ();
1433
1434   worker->write_lock = g_mutex_new ();
1435   worker->write_queue = g_queue_new ();
1436
1437   if (G_IS_SOCKET_CONNECTION (worker->stream))
1438     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1439
1440   _g_dbus_shared_thread_ref (_g_dbus_worker_thread_begin_func, worker);
1441
1442   return worker;
1443 }
1444
1445 /* ---------------------------------------------------------------------------------------------------- */
1446
1447 /* This can be called from any thread - frees worker. Note that
1448  * callbacks might still happen if called from another thread than the
1449  * worker - use your own synchronization primitive in the callbacks.
1450  */
1451 void
1452 _g_dbus_worker_stop (GDBusWorker *worker)
1453 {
1454   worker->stopped = TRUE;
1455   g_cancellable_cancel (worker->cancellable);
1456   _g_dbus_worker_unref (worker);
1457 }
1458
1459 /* ---------------------------------------------------------------------------------------------------- */
1460
1461 /* can be called from any thread (except the worker thread) - blocks
1462  * calling thread until all queued outgoing messages are written and
1463  * the transport has been flushed
1464  */
1465 gboolean
1466 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1467                            GCancellable   *cancellable,
1468                            GError        **error)
1469 {
1470   gboolean ret;
1471   FlushData *data;
1472
1473   data = NULL;
1474   ret = TRUE;
1475
1476   /* if the queue is empty, there's nothing to wait for */
1477   g_mutex_lock (worker->write_lock);
1478   if (g_queue_get_length (worker->write_queue) > 0)
1479     {
1480       data = g_new0 (FlushData, 1);
1481       data->mutex = g_mutex_new ();
1482       data->cond = g_cond_new ();
1483       data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
1484       g_mutex_lock (data->mutex);
1485       worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
1486     }
1487   g_mutex_unlock (worker->write_lock);
1488
1489   if (data != NULL)
1490     {
1491       g_cond_wait (data->cond, data->mutex);
1492       g_mutex_unlock (data->mutex);
1493
1494       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1495       g_cond_free (data->cond);
1496       g_mutex_free (data->mutex);
1497       if (data->error != NULL)
1498         {
1499           ret = FALSE;
1500           g_propagate_error (error, data->error);
1501         }
1502       g_free (data);
1503     }
1504
1505   return ret;
1506 }
1507
1508 /* ---------------------------------------------------------------------------------------------------- */
1509
1510 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1511 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1512 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1513 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1514 #define G_DBUS_DEBUG_CALL           (1<<4)
1515 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1516 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1517 #define G_DBUS_DEBUG_RETURN         (1<<7)
1518 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1519 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1520
1521 static gint _gdbus_debug_flags = 0;
1522
1523 gboolean
1524 _g_dbus_debug_authentication (void)
1525 {
1526   _g_dbus_initialize ();
1527   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1528 }
1529
1530 gboolean
1531 _g_dbus_debug_transport (void)
1532 {
1533   _g_dbus_initialize ();
1534   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1535 }
1536
1537 gboolean
1538 _g_dbus_debug_message (void)
1539 {
1540   _g_dbus_initialize ();
1541   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1542 }
1543
1544 gboolean
1545 _g_dbus_debug_payload (void)
1546 {
1547   _g_dbus_initialize ();
1548   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1549 }
1550
1551 gboolean
1552 _g_dbus_debug_call (void)
1553 {
1554   _g_dbus_initialize ();
1555   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1556 }
1557
1558 gboolean
1559 _g_dbus_debug_signal (void)
1560 {
1561   _g_dbus_initialize ();
1562   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1563 }
1564
1565 gboolean
1566 _g_dbus_debug_incoming (void)
1567 {
1568   _g_dbus_initialize ();
1569   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1570 }
1571
1572 gboolean
1573 _g_dbus_debug_return (void)
1574 {
1575   _g_dbus_initialize ();
1576   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1577 }
1578
1579 gboolean
1580 _g_dbus_debug_emission (void)
1581 {
1582   _g_dbus_initialize ();
1583   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1584 }
1585
1586 gboolean
1587 _g_dbus_debug_address (void)
1588 {
1589   _g_dbus_initialize ();
1590   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1591 }
1592
1593 G_LOCK_DEFINE_STATIC (print_lock);
1594
1595 void
1596 _g_dbus_debug_print_lock (void)
1597 {
1598   G_LOCK (print_lock);
1599 }
1600
1601 void
1602 _g_dbus_debug_print_unlock (void)
1603 {
1604   G_UNLOCK (print_lock);
1605 }
1606
1607 /*
1608  * _g_dbus_initialize:
1609  *
1610  * Does various one-time init things such as
1611  *
1612  *  - registering the G_DBUS_ERROR error domain
1613  *  - parses the G_DBUS_DEBUG environment variable
1614  */
1615 void
1616 _g_dbus_initialize (void)
1617 {
1618   static volatile gsize initialized = 0;
1619
1620   if (g_once_init_enter (&initialized))
1621     {
1622       volatile GQuark g_dbus_error_domain;
1623       const gchar *debug;
1624
1625       g_dbus_error_domain = G_DBUS_ERROR;
1626
1627       debug = g_getenv ("G_DBUS_DEBUG");
1628       if (debug != NULL)
1629         {
1630           const GDebugKey keys[] = {
1631             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1632             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
1633             { "message",        G_DBUS_DEBUG_MESSAGE        },
1634             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
1635             { "call",           G_DBUS_DEBUG_CALL           },
1636             { "signal",         G_DBUS_DEBUG_SIGNAL         },
1637             { "incoming",       G_DBUS_DEBUG_INCOMING       },
1638             { "return",         G_DBUS_DEBUG_RETURN         },
1639             { "emission",       G_DBUS_DEBUG_EMISSION       },
1640             { "address",        G_DBUS_DEBUG_ADDRESS        }
1641           };
1642
1643           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1644           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1645             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1646         }
1647
1648       g_once_init_leave (&initialized, 1);
1649     }
1650 }
1651
1652 /* ---------------------------------------------------------------------------------------------------- */
1653
1654 GVariantType *
1655 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1656 {
1657   const GVariantType *arg_types[256];
1658   guint n;
1659
1660   if (args)
1661     for (n = 0; args[n] != NULL; n++)
1662       {
1663         /* DBus places a hard limit of 255 on signature length.
1664          * therefore number of args must be less than 256.
1665          */
1666         g_assert (n < 256);
1667
1668         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1669
1670         if G_UNLIKELY (arg_types[n] == NULL)
1671           return NULL;
1672       }
1673   else
1674     n = 0;
1675
1676   return g_variant_type_new_tuple (arg_types, n);
1677 }
1678
1679 /* ---------------------------------------------------------------------------------------------------- */
1680
1681 #ifdef G_OS_WIN32
1682
1683 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1684
1685 gchar *
1686 _g_dbus_win32_get_user_sid (void)
1687 {
1688   HANDLE h;
1689   TOKEN_USER *user;
1690   DWORD token_information_len;
1691   PSID psid;
1692   gchar *sid;
1693   gchar *ret;
1694
1695   ret = NULL;
1696   user = NULL;
1697   h = INVALID_HANDLE_VALUE;
1698
1699   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
1700     {
1701       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
1702       goto out;
1703     }
1704
1705   /* Get length of buffer */
1706   token_information_len = 0;
1707   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
1708     {
1709       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
1710         {
1711           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1712           goto out;
1713         }
1714     }
1715   user = g_malloc (token_information_len);
1716   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
1717     {
1718       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1719       goto out;
1720     }
1721
1722   psid = user->User.Sid;
1723   if (!IsValidSid (psid))
1724     {
1725       g_warning ("Invalid SID");
1726       goto out;
1727     }
1728
1729   if (!ConvertSidToStringSidA (psid, &sid))
1730     {
1731       g_warning ("Invalid SID");
1732       goto out;
1733     }
1734
1735   ret = g_strdup (sid);
1736   LocalFree (sid);
1737
1738 out:
1739   g_free (user);
1740   if (h != INVALID_HANDLE_VALUE)
1741     CloseHandle (h);
1742   return ret;
1743 }
1744 #endif
1745
1746 /* ---------------------------------------------------------------------------------------------------- */
1747
1748 gchar *
1749 _g_dbus_get_machine_id (GError **error)
1750 {
1751   gchar *ret;
1752   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
1753   ret = NULL;
1754   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
1755                             &ret,
1756                             NULL,
1757                             error))
1758     {
1759       g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: "));
1760     }
1761   else
1762     {
1763       /* TODO: validate value */
1764       g_strstrip (ret);
1765     }
1766   return ret;
1767 }
1768
1769 /* ---------------------------------------------------------------------------------------------------- */
1770
1771 gchar *
1772 _g_dbus_enum_to_string (GType enum_type, gint value)
1773 {
1774   gchar *ret;
1775   GEnumClass *klass;
1776   GEnumValue *enum_value;
1777
1778   klass = g_type_class_ref (enum_type);
1779   enum_value = g_enum_get_value (klass, value);
1780   if (enum_value != NULL)
1781     ret = g_strdup (enum_value->value_nick);
1782   else
1783     ret = g_strdup_printf ("unknown (value %d)", value);
1784   g_type_class_unref (klass);
1785   return ret;
1786 }
1787
1788 /* ---------------------------------------------------------------------------------------------------- */
1789
1790 static void
1791 write_message_print_transport_debug (gssize bytes_written,
1792                                      MessageToWriteData *data)
1793 {
1794   if (G_LIKELY (!_g_dbus_debug_transport ()))
1795     goto out;
1796
1797   _g_dbus_debug_print_lock ();
1798   g_print ("========================================================================\n"
1799            "GDBus-debug:Transport:\n"
1800            "  >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
1801            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
1802            bytes_written,
1803            g_dbus_message_get_serial (data->message),
1804            data->blob_size,
1805            data->total_written,
1806            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1807   _g_dbus_debug_print_unlock ();
1808  out:
1809   ;
1810 }
1811
1812 /* ---------------------------------------------------------------------------------------------------- */
1813
1814 static void
1815 read_message_print_transport_debug (gssize bytes_read,
1816                                     GDBusWorker *worker)
1817 {
1818   gsize size;
1819   gint32 serial;
1820   gint32 message_length;
1821
1822   if (G_LIKELY (!_g_dbus_debug_transport ()))
1823     goto out;
1824
1825   size = bytes_read + worker->read_buffer_cur_size;
1826   serial = 0;
1827   message_length = 0;
1828   if (size >= 16)
1829     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
1830   if (size >= 1)
1831     {
1832       switch (worker->read_buffer[0])
1833         {
1834         case 'l':
1835           if (size >= 12)
1836             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
1837           break;
1838         case 'B':
1839           if (size >= 12)
1840             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
1841           break;
1842         default:
1843           /* an error will be set elsewhere if this happens */
1844           goto out;
1845         }
1846     }
1847
1848     _g_dbus_debug_print_lock ();
1849   g_print ("========================================================================\n"
1850            "GDBus-debug:Transport:\n"
1851            "  <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
1852            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
1853            bytes_read,
1854            serial,
1855            message_length,
1856            worker->read_buffer_cur_size,
1857            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
1858   _g_dbus_debug_print_unlock ();
1859  out:
1860   ;
1861 }