Add work-around for Bug 627724
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: David Zeuthen <davidz@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <stdlib.h>
26 #include <string.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30
31 #include "giotypes.h"
32 #include "gsocket.h"
33 #include "gdbusprivate.h"
34 #include "gdbusmessage.h"
35 #include "gdbuserror.h"
36 #include "gdbusintrospection.h"
37 #include "gasyncresult.h"
38 #include "gsimpleasyncresult.h"
39 #include "ginputstream.h"
40 #include "giostream.h"
41 #include "gsocketcontrolmessage.h"
42 #include "gsocketconnection.h"
43 #include "gsocketoutputstream.h"
44
45 #ifdef G_OS_UNIX
46 #include "gunixfdmessage.h"
47 #include "gunixconnection.h"
48 #include "gunixcredentialsmessage.h"
49 #endif
50
51 #ifdef G_OS_WIN32
52 #include <windows.h>
53 #endif
54
55 #include "glibintl.h"
56
57 /* ---------------------------------------------------------------------------------------------------- */
58
59 gchar *
60 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
61 {
62  guint n, m;
63  GString *ret;
64
65  ret = g_string_new (NULL);
66
67  for (n = 0; n < len; n += 16)
68    {
69      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
70
71      for (m = n; m < n + 16; m++)
72        {
73          if (m > n && (m%4) == 0)
74            g_string_append_c (ret, ' ');
75          if (m < len)
76            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
77          else
78            g_string_append (ret, "   ");
79        }
80
81      g_string_append (ret, "   ");
82
83      for (m = n; m < len && m < n + 16; m++)
84        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
85
86      g_string_append_c (ret, '\n');
87    }
88
89  return g_string_free (ret, FALSE);
90 }
91
92 /* ---------------------------------------------------------------------------------------------------- */
93
94 /* Unfortunately ancillary messages are discarded when reading from a
95  * socket using the GSocketInputStream abstraction. So we provide a
96  * very GInputStream-ish API that uses GSocket in this case (very
97  * similar to GSocketInputStream).
98  */
99
100 typedef struct
101 {
102   GSocket *socket;
103   GCancellable *cancellable;
104
105   void *buffer;
106   gsize count;
107
108   GSocketControlMessage ***messages;
109   gint *num_messages;
110
111   GSimpleAsyncResult *simple;
112
113   gboolean from_mainloop;
114 } ReadWithControlData;
115
116 static void
117 read_with_control_data_free (ReadWithControlData *data)
118 {
119   g_object_unref (data->socket);
120   if (data->cancellable != NULL)
121     g_object_unref (data->cancellable);
122   g_object_unref (data->simple);
123   g_free (data);
124 }
125
126 static gboolean
127 _g_socket_read_with_control_messages_ready (GSocket      *socket,
128                                             GIOCondition  condition,
129                                             gpointer      user_data)
130 {
131   ReadWithControlData *data = user_data;
132   GError *error;
133   gssize result;
134   GInputVector vector;
135
136   error = NULL;
137   vector.buffer = data->buffer;
138   vector.size = data->count;
139   result = g_socket_receive_message (data->socket,
140                                      NULL, /* address */
141                                      &vector,
142                                      1,
143                                      data->messages,
144                                      data->num_messages,
145                                      NULL,
146                                      data->cancellable,
147                                      &error);
148   if (result >= 0)
149     {
150       g_simple_async_result_set_op_res_gssize (data->simple, result);
151     }
152   else
153     {
154       g_assert (error != NULL);
155       g_simple_async_result_set_from_error (data->simple, error);
156       g_error_free (error);
157     }
158
159   if (data->from_mainloop)
160     g_simple_async_result_complete (data->simple);
161   else
162     g_simple_async_result_complete_in_idle (data->simple);
163
164   return FALSE;
165 }
166
167 static void
168 _g_socket_read_with_control_messages (GSocket                 *socket,
169                                       void                    *buffer,
170                                       gsize                    count,
171                                       GSocketControlMessage ***messages,
172                                       gint                    *num_messages,
173                                       gint                     io_priority,
174                                       GCancellable            *cancellable,
175                                       GAsyncReadyCallback      callback,
176                                       gpointer                 user_data)
177 {
178   ReadWithControlData *data;
179
180   data = g_new0 (ReadWithControlData, 1);
181   data->socket = g_object_ref (socket);
182   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
183   data->buffer = buffer;
184   data->count = count;
185   data->messages = messages;
186   data->num_messages = num_messages;
187
188   data->simple = g_simple_async_result_new (G_OBJECT (socket),
189                                             callback,
190                                             user_data,
191                                             _g_socket_read_with_control_messages);
192
193   if (!g_socket_condition_check (socket, G_IO_IN))
194     {
195       GSource *source;
196       data->from_mainloop = TRUE;
197       source = g_socket_create_source (data->socket,
198                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
199                                        cancellable);
200       g_source_set_callback (source,
201                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
202                              data,
203                              (GDestroyNotify) read_with_control_data_free);
204       g_source_attach (source, g_main_context_get_thread_default ());
205       g_source_unref (source);
206     }
207   else
208     {
209       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
210       read_with_control_data_free (data);
211     }
212 }
213
214 static gssize
215 _g_socket_read_with_control_messages_finish (GSocket       *socket,
216                                              GAsyncResult  *result,
217                                              GError       **error)
218 {
219   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
220
221   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
222   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
223
224   if (g_simple_async_result_propagate_error (simple, error))
225       return -1;
226   else
227     return g_simple_async_result_get_op_res_gssize (simple);
228 }
229
230 /* ---------------------------------------------------------------------------------------------------- */
231
232 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
233
234 static GPtrArray *ensured_classes = NULL;
235
236 static void
237 ensure_type (GType gtype)
238 {
239   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
240 }
241
242 static void
243 released_required_types (void)
244 {
245   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
246   g_ptr_array_unref (ensured_classes);
247   ensured_classes = NULL;
248 }
249
250 static void
251 ensure_required_types (void)
252 {
253   g_assert (ensured_classes == NULL);
254   ensured_classes = g_ptr_array_new ();
255   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
256   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
257 }
258 /* ---------------------------------------------------------------------------------------------------- */
259
260 G_LOCK_DEFINE_STATIC (shared_thread_lock);
261
262 typedef struct
263 {
264   gint num_users;
265   GThread *thread;
266   GMainContext *context;
267   GMainLoop *loop;
268 } SharedThreadData;
269
270 static SharedThreadData *shared_thread_data = NULL;
271
272 static gpointer
273 gdbus_shared_thread_func (gpointer data)
274 {
275   g_main_context_push_thread_default (shared_thread_data->context);
276   g_main_loop_run (shared_thread_data->loop);
277   g_main_context_pop_thread_default (shared_thread_data->context);
278   return NULL;
279 }
280
281 typedef void (*GDBusSharedThreadFunc) (gpointer user_data);
282
283 typedef struct
284 {
285   GDBusSharedThreadFunc func;
286   gpointer              user_data;
287   gboolean              done;
288 } CallerData;
289
290 static gboolean
291 invoke_caller (gpointer user_data)
292 {
293   CallerData *data = user_data;
294   data->func (data->user_data);
295   data->done = TRUE;
296   return FALSE;
297 }
298
299 /* ---------------------------------------------------------------------------------------------------- */
300
301 static void
302 _g_dbus_shared_thread_ref (GDBusSharedThreadFunc func,
303                            gpointer              user_data)
304 {
305   GError *error;
306   GSource *idle_source;
307   CallerData *data;
308   gboolean release_types;
309
310   G_LOCK (shared_thread_lock);
311
312   release_types = FALSE;
313
314   if (shared_thread_data != NULL)
315     {
316       shared_thread_data->num_users += 1;
317       goto have_thread;
318     }
319
320   shared_thread_data = g_new0 (SharedThreadData, 1);
321   shared_thread_data->num_users = 1;
322
323   /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
324   ensure_required_types ();
325   release_types = TRUE;
326
327   error = NULL;
328   shared_thread_data->context = g_main_context_new ();
329   shared_thread_data->loop = g_main_loop_new (shared_thread_data->context, FALSE);
330   shared_thread_data->thread = g_thread_create (gdbus_shared_thread_func,
331                                                 NULL,
332                                                 TRUE,
333                                                 &error);
334   g_assert_no_error (error);
335
336  have_thread:
337
338   data = g_new0 (CallerData, 1);
339   data->func = func;
340   data->user_data = user_data;
341   data->done = FALSE;
342
343   idle_source = g_idle_source_new ();
344   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
345   g_source_set_callback (idle_source,
346                          invoke_caller,
347                          data,
348                          NULL);
349   g_source_attach (idle_source, shared_thread_data->context);
350   g_source_unref (idle_source);
351
352   /* wait for the user code to run.. hmm.. probably use a condition variable instead */
353   while (!data->done)
354     g_thread_yield ();
355
356   if (release_types)
357     released_required_types ();
358
359   g_free (data);
360
361   G_UNLOCK (shared_thread_lock);
362 }
363
364 static void
365 _g_dbus_shared_thread_unref (void)
366 {
367   /* TODO: actually destroy the shared thread here */
368 #if 0
369   G_LOCK (shared_thread_lock);
370   g_assert (shared_thread_data != NULL);
371   shared_thread_data->num_users -= 1;
372   if (shared_thread_data->num_users == 0)
373     {
374       g_main_loop_quit (shared_thread_data->loop);
375       //g_thread_join (shared_thread_data->thread);
376       g_main_loop_unref (shared_thread_data->loop);
377       g_main_context_unref (shared_thread_data->context);
378       g_free (shared_thread_data);
379       shared_thread_data = NULL;
380       G_UNLOCK (shared_thread_lock);
381     }
382   else
383     {
384       G_UNLOCK (shared_thread_lock);
385     }
386 #endif
387 }
388
389 /* ---------------------------------------------------------------------------------------------------- */
390
391 struct GDBusWorker
392 {
393   volatile gint                       ref_count;
394
395   gboolean                            stopped;
396
397   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
398    * only affects messages received from the other peer (since GDBusServer is the
399    * only user) - we might want it to affect messages sent to the other peer too?
400    */
401   gboolean                            frozen;
402   GQueue                             *received_messages_while_frozen;
403
404   GIOStream                          *stream;
405   GDBusCapabilityFlags                capabilities;
406   GCancellable                       *cancellable;
407   GDBusWorkerMessageReceivedCallback  message_received_callback;
408   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
409   GDBusWorkerDisconnectedCallback     disconnected_callback;
410   gpointer                            user_data;
411
412   GThread                            *thread;
413
414   /* if not NULL, stream is GSocketConnection */
415   GSocket *socket;
416
417   /* used for reading */
418   GMutex                             *read_lock;
419   gchar                              *read_buffer;
420   gsize                               read_buffer_allocated_size;
421   gsize                               read_buffer_cur_size;
422   gsize                               read_buffer_bytes_wanted;
423   GUnixFDList                        *read_fd_list;
424   GSocketControlMessage             **read_ancillary_messages;
425   gint                                read_num_ancillary_messages;
426
427   /* used for writing */
428   GMutex                             *write_lock;
429   GQueue                             *write_queue;
430   gint                                num_writes_pending;
431   guint64                             write_num_messages_written;
432   GList                              *write_pending_flushes;
433 };
434
435 /* ---------------------------------------------------------------------------------------------------- */
436
437 typedef struct
438 {
439   GMutex *mutex;
440   GCond *cond;
441   guint64 number_to_wait_for;
442   GError *error;
443 } FlushData;
444
445 struct _MessageToWriteData ;
446 typedef struct _MessageToWriteData MessageToWriteData;
447
448 static void message_to_write_data_free (MessageToWriteData *data);
449
450 static void read_message_print_transport_debug (gssize bytes_read,
451                                                 GDBusWorker *worker);
452
453 static void write_message_print_transport_debug (gssize bytes_written,
454                                                  MessageToWriteData *data);
455
456 /* ---------------------------------------------------------------------------------------------------- */
457
458 static GDBusWorker *
459 _g_dbus_worker_ref (GDBusWorker *worker)
460 {
461   g_atomic_int_inc (&worker->ref_count);
462   return worker;
463 }
464
465 static void
466 _g_dbus_worker_unref (GDBusWorker *worker)
467 {
468   if (g_atomic_int_dec_and_test (&worker->ref_count))
469     {
470       g_assert (worker->write_pending_flushes == NULL);
471
472       _g_dbus_shared_thread_unref ();
473
474       g_object_unref (worker->stream);
475
476       g_mutex_free (worker->read_lock);
477       g_object_unref (worker->cancellable);
478       if (worker->read_fd_list != NULL)
479         g_object_unref (worker->read_fd_list);
480
481       g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
482       g_queue_free (worker->received_messages_while_frozen);
483
484       g_mutex_free (worker->write_lock);
485       g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
486       g_queue_free (worker->write_queue);
487
488       g_free (worker->read_buffer);
489
490       g_free (worker);
491     }
492 }
493
494 static void
495 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
496                                   gboolean      remote_peer_vanished,
497                                   GError       *error)
498 {
499   if (!worker->stopped)
500     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
501 }
502
503 static void
504 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
505                                       GDBusMessage *message)
506 {
507   if (!worker->stopped)
508     worker->message_received_callback (worker, message, worker->user_data);
509 }
510
511 static GDBusMessage *
512 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
513                                               GDBusMessage *message)
514 {
515   GDBusMessage *ret;
516   if (!worker->stopped)
517     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
518   else
519     ret = message;
520   return ret;
521 }
522
523 /* can only be called from private thread with read-lock held - takes ownership of @message */
524 static void
525 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
526                                                   GDBusMessage *message)
527 {
528   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
529     {
530       /* queue up */
531       g_queue_push_tail (worker->received_messages_while_frozen, message);
532     }
533   else
534     {
535       /* not frozen, nor anything in queue */
536       _g_dbus_worker_emit_message_received (worker, message);
537       g_object_unref (message);
538     }
539 }
540
541 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
542 static gboolean
543 unfreeze_in_idle_cb (gpointer user_data)
544 {
545   GDBusWorker *worker = user_data;
546   GDBusMessage *message;
547
548   g_mutex_lock (worker->read_lock);
549   if (worker->frozen)
550     {
551       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
552         {
553           _g_dbus_worker_emit_message_received (worker, message);
554           g_object_unref (message);
555         }
556       worker->frozen = FALSE;
557     }
558   else
559     {
560       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
561     }
562   g_mutex_unlock (worker->read_lock);
563   return FALSE;
564 }
565
566 /* can be called from any thread */
567 void
568 _g_dbus_worker_unfreeze (GDBusWorker *worker)
569 {
570   GSource *idle_source;
571   idle_source = g_idle_source_new ();
572   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
573   g_source_set_callback (idle_source,
574                          unfreeze_in_idle_cb,
575                          _g_dbus_worker_ref (worker),
576                          (GDestroyNotify) _g_dbus_worker_unref);
577   g_source_attach (idle_source, shared_thread_data->context);
578   g_source_unref (idle_source);
579 }
580
581 /* ---------------------------------------------------------------------------------------------------- */
582
583 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
584
585 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
586 static void
587 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
588                            GAsyncResult  *res,
589                            gpointer       user_data)
590 {
591   GDBusWorker *worker = user_data;
592   GError *error;
593   gssize bytes_read;
594
595   g_mutex_lock (worker->read_lock);
596
597   /* If already stopped, don't even process the reply */
598   if (worker->stopped)
599     goto out;
600
601   error = NULL;
602   if (worker->socket == NULL)
603     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
604                                              res,
605                                              &error);
606   else
607     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
608                                                               res,
609                                                               &error);
610   if (worker->read_num_ancillary_messages > 0)
611     {
612       gint n;
613       for (n = 0; n < worker->read_num_ancillary_messages; n++)
614         {
615           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
616
617           if (FALSE)
618             {
619             }
620 #ifdef G_OS_UNIX
621           else if (G_IS_UNIX_FD_MESSAGE (control_message))
622             {
623               GUnixFDMessage *fd_message;
624               gint *fds;
625               gint num_fds;
626
627               fd_message = G_UNIX_FD_MESSAGE (control_message);
628               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
629               if (worker->read_fd_list == NULL)
630                 {
631                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
632                 }
633               else
634                 {
635                   gint n;
636                   for (n = 0; n < num_fds; n++)
637                     {
638                       /* TODO: really want a append_steal() */
639                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
640                       close (fds[n]);
641                     }
642                 }
643               g_free (fds);
644             }
645           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
646             {
647               /* do nothing */
648             }
649 #endif
650           else
651             {
652               if (error == NULL)
653                 {
654                   g_set_error (&error,
655                                G_IO_ERROR,
656                                G_IO_ERROR_FAILED,
657                                "Unexpected ancillary message of type %s received from peer",
658                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
659                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
660                   g_error_free (error);
661                   g_object_unref (control_message);
662                   n++;
663                   while (n < worker->read_num_ancillary_messages)
664                     g_object_unref (worker->read_ancillary_messages[n++]);
665                   g_free (worker->read_ancillary_messages);
666                   goto out;
667                 }
668             }
669           g_object_unref (control_message);
670         }
671       g_free (worker->read_ancillary_messages);
672     }
673
674   if (bytes_read == -1)
675     {
676       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
677       g_error_free (error);
678       goto out;
679     }
680
681 #if 0
682   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
683            (gint) bytes_read,
684            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
685            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
686            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
687                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
688            worker->stream,
689            worker);
690 #endif
691
692   /* TODO: hmm, hmm... */
693   if (bytes_read == 0)
694     {
695       g_set_error (&error,
696                    G_IO_ERROR,
697                    G_IO_ERROR_FAILED,
698                    "Underlying GIOStream returned 0 bytes on an async read");
699       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
700       g_error_free (error);
701       goto out;
702     }
703
704   read_message_print_transport_debug (bytes_read, worker);
705
706   worker->read_buffer_cur_size += bytes_read;
707   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
708     {
709       /* OK, got what we asked for! */
710       if (worker->read_buffer_bytes_wanted == 16)
711         {
712           gssize message_len;
713           /* OK, got the header - determine how many more bytes are needed */
714           error = NULL;
715           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
716                                                      16,
717                                                      &error);
718           if (message_len == -1)
719             {
720               g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
721               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
722               g_error_free (error);
723               goto out;
724             }
725
726           worker->read_buffer_bytes_wanted = message_len;
727           _g_dbus_worker_do_read_unlocked (worker);
728         }
729       else
730         {
731           GDBusMessage *message;
732           error = NULL;
733
734           /* TODO: use connection->priv->auth to decode the message */
735
736           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
737                                                   worker->read_buffer_cur_size,
738                                                   worker->capabilities,
739                                                   &error);
740           if (message == NULL)
741             {
742               gchar *s;
743               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
744               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
745                          "The error is: %s\n"
746                          "The payload is as follows:\n"
747                          "%s\n",
748                          worker->read_buffer_cur_size,
749                          error->message,
750                          s);
751               g_free (s);
752               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
753               g_error_free (error);
754               goto out;
755             }
756
757 #ifdef G_OS_UNIX
758           if (worker->read_fd_list != NULL)
759             {
760               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
761               g_object_unref (worker->read_fd_list);
762               worker->read_fd_list = NULL;
763             }
764 #endif
765
766           if (G_UNLIKELY (_g_dbus_debug_message ()))
767             {
768               gchar *s;
769               _g_dbus_debug_print_lock ();
770               g_print ("========================================================================\n"
771                        "GDBus-debug:Message:\n"
772                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
773                        worker->read_buffer_cur_size);
774               s = g_dbus_message_print (message, 2);
775               g_print ("%s", s);
776               g_free (s);
777               if (G_UNLIKELY (_g_dbus_debug_payload ()))
778                 {
779                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
780                   g_print ("%s\n", s);
781                   g_free (s);
782                 }
783               _g_dbus_debug_print_unlock ();
784             }
785
786           /* yay, got a message, go deliver it */
787           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
788
789           /* start reading another message! */
790           worker->read_buffer_bytes_wanted = 0;
791           worker->read_buffer_cur_size = 0;
792           _g_dbus_worker_do_read_unlocked (worker);
793         }
794     }
795   else
796     {
797       /* didn't get all the bytes we requested - so repeat the request... */
798       _g_dbus_worker_do_read_unlocked (worker);
799     }
800
801  out:
802   g_mutex_unlock (worker->read_lock);
803
804   /* gives up the reference acquired when calling g_input_stream_read_async() */
805   _g_dbus_worker_unref (worker);
806 }
807
808 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
809 static void
810 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
811 {
812   /* if bytes_wanted is zero, it means start reading a message */
813   if (worker->read_buffer_bytes_wanted == 0)
814     {
815       worker->read_buffer_cur_size = 0;
816       worker->read_buffer_bytes_wanted = 16;
817     }
818
819   /* ensure we have a (big enough) buffer */
820   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
821     {
822       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
823       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
824       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
825     }
826
827   if (worker->socket == NULL)
828     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
829                                worker->read_buffer + worker->read_buffer_cur_size,
830                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
831                                G_PRIORITY_DEFAULT,
832                                worker->cancellable,
833                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
834                                _g_dbus_worker_ref (worker));
835   else
836     {
837       worker->read_ancillary_messages = NULL;
838       worker->read_num_ancillary_messages = 0;
839       _g_socket_read_with_control_messages (worker->socket,
840                                             worker->read_buffer + worker->read_buffer_cur_size,
841                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
842                                             &worker->read_ancillary_messages,
843                                             &worker->read_num_ancillary_messages,
844                                             G_PRIORITY_DEFAULT,
845                                             worker->cancellable,
846                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
847                                             _g_dbus_worker_ref (worker));
848     }
849 }
850
851 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
852 static void
853 _g_dbus_worker_do_read (GDBusWorker *worker)
854 {
855   g_mutex_lock (worker->read_lock);
856   _g_dbus_worker_do_read_unlocked (worker);
857   g_mutex_unlock (worker->read_lock);
858 }
859
860 /* ---------------------------------------------------------------------------------------------------- */
861
862 struct _MessageToWriteData
863 {
864   GDBusWorker  *worker;
865   GDBusMessage *message;
866   gchar        *blob;
867   gsize         blob_size;
868
869   gsize               total_written;
870   GSimpleAsyncResult *simple;
871
872 };
873
874 static void
875 message_to_write_data_free (MessageToWriteData *data)
876 {
877   _g_dbus_worker_unref (data->worker);
878   g_object_unref (data->message);
879   g_free (data->blob);
880   g_free (data);
881 }
882
883 /* ---------------------------------------------------------------------------------------------------- */
884
885 static void write_message_continue_writing (MessageToWriteData *data);
886
887 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
888 static void
889 write_message_async_cb (GObject      *source_object,
890                         GAsyncResult *res,
891                         gpointer      user_data)
892 {
893   MessageToWriteData *data = user_data;
894   GSimpleAsyncResult *simple;
895   gssize bytes_written;
896   GError *error;
897
898   /* Note: we can't access data->simple after calling g_async_result_complete () because the
899    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
900    */
901   simple = data->simple;
902
903   error = NULL;
904   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
905                                                 res,
906                                                 &error);
907   if (bytes_written == -1)
908     {
909       g_simple_async_result_set_from_error (simple, error);
910       g_error_free (error);
911       g_simple_async_result_complete (simple);
912       g_object_unref (simple);
913       goto out;
914     }
915   g_assert (bytes_written > 0); /* zero is never returned */
916
917   write_message_print_transport_debug (bytes_written, data);
918
919   data->total_written += bytes_written;
920   g_assert (data->total_written <= data->blob_size);
921   if (data->total_written == data->blob_size)
922     {
923       g_simple_async_result_complete (simple);
924       g_object_unref (simple);
925       goto out;
926     }
927
928   write_message_continue_writing (data);
929
930  out:
931   ;
932 }
933
934 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
935 static gboolean
936 on_socket_ready (GSocket      *socket,
937                  GIOCondition  condition,
938                  gpointer      user_data)
939 {
940   MessageToWriteData *data = user_data;
941   write_message_continue_writing (data);
942   return FALSE; /* remove source */
943 }
944
945 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
946 static void
947 write_message_continue_writing (MessageToWriteData *data)
948 {
949   GOutputStream *ostream;
950   GSimpleAsyncResult *simple;
951 #ifdef G_OS_UNIX
952   GUnixFDList *fd_list;
953 #endif
954
955   /* Note: we can't access data->simple after calling g_async_result_complete () because the
956    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
957    */
958   simple = data->simple;
959
960   ostream = g_io_stream_get_output_stream (data->worker->stream);
961 #ifdef G_OS_UNIX
962   fd_list = g_dbus_message_get_unix_fd_list (data->message);
963 #endif
964
965   g_assert (!g_output_stream_has_pending (ostream));
966   g_assert_cmpint (data->total_written, <, data->blob_size);
967
968   if (FALSE)
969     {
970     }
971 #ifdef G_OS_UNIX
972   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
973     {
974       GOutputVector vector;
975       GSocketControlMessage *control_message;
976       gssize bytes_written;
977       GError *error;
978
979       vector.buffer = data->blob;
980       vector.size = data->blob_size;
981
982       control_message = NULL;
983       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
984         {
985           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
986             {
987               g_simple_async_result_set_error (simple,
988                                                G_IO_ERROR,
989                                                G_IO_ERROR_FAILED,
990                                                "Tried sending a file descriptor but remote peer does not support this capability");
991               g_simple_async_result_complete (simple);
992               g_object_unref (simple);
993               goto out;
994             }
995           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
996         }
997
998       error = NULL;
999       bytes_written = g_socket_send_message (data->worker->socket,
1000                                              NULL, /* address */
1001                                              &vector,
1002                                              1,
1003                                              control_message != NULL ? &control_message : NULL,
1004                                              control_message != NULL ? 1 : 0,
1005                                              G_SOCKET_MSG_NONE,
1006                                              data->worker->cancellable,
1007                                              &error);
1008       if (control_message != NULL)
1009         g_object_unref (control_message);
1010
1011       if (bytes_written == -1)
1012         {
1013           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1014           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1015             {
1016               GSource *source;
1017               source = g_socket_create_source (data->worker->socket,
1018                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
1019                                                data->worker->cancellable);
1020               g_source_set_callback (source,
1021                                      (GSourceFunc) on_socket_ready,
1022                                      data,
1023                                      NULL); /* GDestroyNotify */
1024               g_source_attach (source, g_main_context_get_thread_default ());
1025               g_source_unref (source);
1026               g_error_free (error);
1027               goto out;
1028             }
1029           g_simple_async_result_set_from_error (simple, error);
1030           g_error_free (error);
1031           g_simple_async_result_complete (simple);
1032           g_object_unref (simple);
1033           goto out;
1034         }
1035       g_assert (bytes_written > 0); /* zero is never returned */
1036
1037       write_message_print_transport_debug (bytes_written, data);
1038
1039       data->total_written += bytes_written;
1040       g_assert (data->total_written <= data->blob_size);
1041       if (data->total_written == data->blob_size)
1042         {
1043           g_simple_async_result_complete (simple);
1044           g_object_unref (simple);
1045           goto out;
1046         }
1047
1048       write_message_continue_writing (data);
1049     }
1050 #endif
1051   else
1052     {
1053 #ifdef G_OS_UNIX
1054       if (fd_list != NULL)
1055         {
1056           g_simple_async_result_set_error (simple,
1057                                            G_IO_ERROR,
1058                                            G_IO_ERROR_FAILED,
1059                                            "Tried sending a file descriptor on unsupported stream of type %s",
1060                                            g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1061           g_simple_async_result_complete (simple);
1062           g_object_unref (simple);
1063           goto out;
1064         }
1065 #endif
1066
1067       g_output_stream_write_async (ostream,
1068                                    (const gchar *) data->blob + data->total_written,
1069                                    data->blob_size - data->total_written,
1070                                    G_PRIORITY_DEFAULT,
1071                                    data->worker->cancellable,
1072                                    write_message_async_cb,
1073                                    data);
1074     }
1075  out:
1076   ;
1077 }
1078
1079 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1080 static void
1081 write_message_async (GDBusWorker         *worker,
1082                      MessageToWriteData  *data,
1083                      GAsyncReadyCallback  callback,
1084                      gpointer             user_data)
1085 {
1086   data->simple = g_simple_async_result_new (NULL,
1087                                             callback,
1088                                             user_data,
1089                                             write_message_async);
1090   data->total_written = 0;
1091   write_message_continue_writing (data);
1092 }
1093
1094 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1095 static gboolean
1096 write_message_finish (GAsyncResult   *res,
1097                       GError        **error)
1098 {
1099   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1100   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1101     return FALSE;
1102   else
1103     return TRUE;
1104 }
1105 /* ---------------------------------------------------------------------------------------------------- */
1106
1107 static void maybe_write_next_message (GDBusWorker *worker);
1108
1109 typedef struct
1110 {
1111   GDBusWorker *worker;
1112   GList *flushers;
1113 } FlushAsyncData;
1114
1115 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1116 static void
1117 ostream_flush_cb (GObject      *source_object,
1118                   GAsyncResult *res,
1119                   gpointer      user_data)
1120 {
1121   FlushAsyncData *data = user_data;
1122   GError *error;
1123   GList *l;
1124
1125   error = NULL;
1126   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1127                                 res,
1128                                 &error);
1129
1130   if (error == NULL)
1131     {
1132       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1133         {
1134           _g_dbus_debug_print_lock ();
1135           g_print ("========================================================================\n"
1136                    "GDBus-debug:Transport:\n"
1137                    "  ---- FLUSHED stream of type %s\n",
1138                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1139           _g_dbus_debug_print_unlock ();
1140         }
1141     }
1142
1143   g_assert (data->flushers != NULL);
1144   for (l = data->flushers; l != NULL; l = l->next)
1145     {
1146       FlushData *f = l->data;
1147
1148       f->error = error != NULL ? g_error_copy (error) : NULL;
1149
1150       g_mutex_lock (f->mutex);
1151       g_cond_signal (f->cond);
1152       g_mutex_unlock (f->mutex);
1153     }
1154   g_list_free (data->flushers);
1155
1156   if (error != NULL)
1157     g_error_free (error);
1158
1159   /* OK, cool, finally kick off the next write */
1160   maybe_write_next_message (data->worker);
1161
1162   _g_dbus_worker_unref (data->worker);
1163   g_free (data);
1164 }
1165
1166 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1167 static void
1168 message_written (GDBusWorker *worker,
1169                  MessageToWriteData *message_data)
1170 {
1171   GList *l;
1172   GList *ll;
1173   GList *flushers;
1174
1175   /* first log the fact that we wrote a message */
1176   if (G_UNLIKELY (_g_dbus_debug_message ()))
1177     {
1178       gchar *s;
1179       _g_dbus_debug_print_lock ();
1180       g_print ("========================================================================\n"
1181                "GDBus-debug:Message:\n"
1182                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1183                message_data->blob_size);
1184       s = g_dbus_message_print (message_data->message, 2);
1185       g_print ("%s", s);
1186       g_free (s);
1187       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1188         {
1189           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1190           g_print ("%s\n", s);
1191           g_free (s);
1192         }
1193       _g_dbus_debug_print_unlock ();
1194     }
1195
1196   /* then first wake up pending flushes and, if needed, flush the stream */
1197   flushers = NULL;
1198   g_mutex_lock (worker->write_lock);
1199   worker->write_num_messages_written += 1;
1200   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1201     {
1202       FlushData *f = l->data;
1203       ll = l->next;
1204
1205       if (f->number_to_wait_for == worker->write_num_messages_written)
1206         {
1207           flushers = g_list_append (flushers, f);
1208           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1209         }
1210     }
1211   g_mutex_unlock (worker->write_lock);
1212
1213   if (flushers != NULL)
1214     {
1215       FlushAsyncData *data;
1216       data = g_new0 (FlushAsyncData, 1);
1217       data->worker = _g_dbus_worker_ref (worker);
1218       data->flushers = flushers;
1219       /* flush the stream before writing the next message */
1220       g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
1221                                    G_PRIORITY_DEFAULT,
1222                                    worker->cancellable,
1223                                    ostream_flush_cb,
1224                                    data);
1225     }
1226   else
1227     {
1228       /* kick off the next write! */
1229       maybe_write_next_message (worker);
1230     }
1231 }
1232
1233 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1234 static void
1235 write_message_cb (GObject       *source_object,
1236                   GAsyncResult  *res,
1237                   gpointer       user_data)
1238 {
1239   MessageToWriteData *data = user_data;
1240   GError *error;
1241
1242   g_mutex_lock (data->worker->write_lock);
1243   data->worker->num_writes_pending -= 1;
1244   g_mutex_unlock (data->worker->write_lock);
1245
1246   error = NULL;
1247   if (!write_message_finish (res, &error))
1248     {
1249       /* TODO: handle */
1250       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1251       g_error_free (error);
1252     }
1253
1254   /* this function will also kick of the next write (it might need to
1255    * flush so writing the next message might happen much later
1256    * e.g. async)
1257    */
1258   message_written (data->worker, data);
1259
1260   message_to_write_data_free (data);
1261 }
1262
1263 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1264 static void
1265 maybe_write_next_message (GDBusWorker *worker)
1266 {
1267   MessageToWriteData *data;
1268
1269  write_next:
1270
1271   g_mutex_lock (worker->write_lock);
1272   data = g_queue_pop_head (worker->write_queue);
1273   if (data != NULL)
1274     worker->num_writes_pending += 1;
1275   g_mutex_unlock (worker->write_lock);
1276
1277   /* Note that write_lock is only used for protecting the @write_queue
1278    * and @num_writes_pending fields of the GDBusWorker struct ... which we
1279    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1280    *
1281    * Therefore, it's fine to drop it here when calling back into user
1282    * code and then writing the message out onto the GIOStream since this
1283    * function only runs on the worker thread.
1284    */
1285   if (data != NULL)
1286     {
1287       GDBusMessage *old_message;
1288       guchar *new_blob;
1289       gsize new_blob_size;
1290       GError *error;
1291
1292       old_message = data->message;
1293       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1294       if (data->message == old_message)
1295         {
1296           /* filters had no effect - do nothing */
1297         }
1298       else if (data->message == NULL)
1299         {
1300           /* filters dropped message */
1301           g_mutex_lock (worker->write_lock);
1302           worker->num_writes_pending -= 1;
1303           g_mutex_unlock (worker->write_lock);
1304           message_to_write_data_free (data);
1305           goto write_next;
1306         }
1307       else
1308         {
1309           /* filters altered the message -> reencode */
1310           error = NULL;
1311           new_blob = g_dbus_message_to_blob (data->message,
1312                                              &new_blob_size,
1313                                              worker->capabilities,
1314                                              &error);
1315           if (new_blob == NULL)
1316             {
1317               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1318                * the old message instead
1319                */
1320               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1321                          g_dbus_message_get_serial (data->message),
1322                          error->message);
1323               g_error_free (error);
1324             }
1325           else
1326             {
1327               g_free (data->blob);
1328               data->blob = (gchar *) new_blob;
1329               data->blob_size = new_blob_size;
1330             }
1331         }
1332
1333       write_message_async (worker,
1334                            data,
1335                            write_message_cb,
1336                            data);
1337     }
1338 }
1339
1340 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1341 static gboolean
1342 write_message_in_idle_cb (gpointer user_data)
1343 {
1344   GDBusWorker *worker = user_data;
1345   if (worker->num_writes_pending == 0)
1346     maybe_write_next_message (worker);
1347   return FALSE;
1348 }
1349
1350 /* ---------------------------------------------------------------------------------------------------- */
1351
1352 /* can be called from any thread - steals blob */
1353 void
1354 _g_dbus_worker_send_message (GDBusWorker    *worker,
1355                              GDBusMessage   *message,
1356                              gchar          *blob,
1357                              gsize           blob_len)
1358 {
1359   MessageToWriteData *data;
1360
1361   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1362   g_return_if_fail (blob != NULL);
1363   g_return_if_fail (blob_len > 16);
1364
1365   data = g_new0 (MessageToWriteData, 1);
1366   data->worker = _g_dbus_worker_ref (worker);
1367   data->message = g_object_ref (message);
1368   data->blob = blob; /* steal! */
1369   data->blob_size = blob_len;
1370
1371   g_mutex_lock (worker->write_lock);
1372   g_queue_push_tail (worker->write_queue, data);
1373   if (worker->num_writes_pending == 0)
1374     {
1375       GSource *idle_source;
1376       idle_source = g_idle_source_new ();
1377       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1378       g_source_set_callback (idle_source,
1379                              write_message_in_idle_cb,
1380                              _g_dbus_worker_ref (worker),
1381                              (GDestroyNotify) _g_dbus_worker_unref);
1382       g_source_attach (idle_source, shared_thread_data->context);
1383       g_source_unref (idle_source);
1384     }
1385   g_mutex_unlock (worker->write_lock);
1386 }
1387
1388 /* ---------------------------------------------------------------------------------------------------- */
1389
1390 static void
1391 _g_dbus_worker_thread_begin_func (gpointer user_data)
1392 {
1393   GDBusWorker *worker = user_data;
1394
1395   worker->thread = g_thread_self ();
1396
1397   /* begin reading */
1398   _g_dbus_worker_do_read (worker);
1399 }
1400
1401 GDBusWorker *
1402 _g_dbus_worker_new (GIOStream                              *stream,
1403                     GDBusCapabilityFlags                    capabilities,
1404                     gboolean                                initially_frozen,
1405                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1406                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1407                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1408                     gpointer                                user_data)
1409 {
1410   GDBusWorker *worker;
1411
1412   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1413   g_return_val_if_fail (message_received_callback != NULL, NULL);
1414   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1415   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1416
1417   worker = g_new0 (GDBusWorker, 1);
1418   worker->ref_count = 1;
1419
1420   worker->read_lock = g_mutex_new ();
1421   worker->message_received_callback = message_received_callback;
1422   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1423   worker->disconnected_callback = disconnected_callback;
1424   worker->user_data = user_data;
1425   worker->stream = g_object_ref (stream);
1426   worker->capabilities = capabilities;
1427   worker->cancellable = g_cancellable_new ();
1428
1429   worker->frozen = initially_frozen;
1430   worker->received_messages_while_frozen = g_queue_new ();
1431
1432   worker->write_lock = g_mutex_new ();
1433   worker->write_queue = g_queue_new ();
1434
1435   if (G_IS_SOCKET_CONNECTION (worker->stream))
1436     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1437
1438   _g_dbus_shared_thread_ref (_g_dbus_worker_thread_begin_func, worker);
1439
1440   return worker;
1441 }
1442
1443 /* ---------------------------------------------------------------------------------------------------- */
1444
1445 /* This can be called from any thread - frees worker - guarantees no callbacks
1446  * will ever be issued again
1447  */
1448 void
1449 _g_dbus_worker_stop (GDBusWorker *worker)
1450 {
1451   /* If we're called in the worker thread it means we are called from
1452    * a worker callback. This is fine, we just can't lock in that case since
1453    * we're already holding the lock...
1454    */
1455   if (g_thread_self () != worker->thread)
1456     g_mutex_lock (worker->read_lock);
1457   worker->stopped = TRUE;
1458   if (g_thread_self () != worker->thread)
1459     g_mutex_unlock (worker->read_lock);
1460
1461   g_cancellable_cancel (worker->cancellable);
1462   _g_dbus_worker_unref (worker);
1463 }
1464
1465 /* ---------------------------------------------------------------------------------------------------- */
1466
1467 /* can be called from any thread (except the worker thread) - blocks
1468  * calling thread until all queued outgoing messages are written and
1469  * the transport has been flushed
1470  */
1471 gboolean
1472 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1473                            GCancellable   *cancellable,
1474                            GError        **error)
1475 {
1476   gboolean ret;
1477   FlushData *data;
1478
1479   data = NULL;
1480   ret = TRUE;
1481
1482   /* if the queue is empty, there's nothing to wait for */
1483   g_mutex_lock (worker->write_lock);
1484   if (g_queue_get_length (worker->write_queue) > 0)
1485     {
1486       data = g_new0 (FlushData, 1);
1487       data->mutex = g_mutex_new ();
1488       data->cond = g_cond_new ();
1489       data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
1490       g_mutex_lock (data->mutex);
1491       worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
1492     }
1493   g_mutex_unlock (worker->write_lock);
1494
1495   if (data != NULL)
1496     {
1497       g_cond_wait (data->cond, data->mutex);
1498       g_mutex_unlock (data->mutex);
1499
1500       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1501       g_cond_free (data->cond);
1502       g_mutex_free (data->mutex);
1503       if (data->error != NULL)
1504         {
1505           ret = FALSE;
1506           g_propagate_error (error, data->error);
1507         }
1508       g_free (data);
1509     }
1510
1511   return ret;
1512 }
1513
1514 /* ---------------------------------------------------------------------------------------------------- */
1515
1516 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1517 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1518 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1519 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1520 #define G_DBUS_DEBUG_CALL           (1<<4)
1521 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1522 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1523 #define G_DBUS_DEBUG_RETURN         (1<<7)
1524 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1525 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1526
1527 static gint _gdbus_debug_flags = 0;
1528
1529 gboolean
1530 _g_dbus_debug_authentication (void)
1531 {
1532   _g_dbus_initialize ();
1533   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1534 }
1535
1536 gboolean
1537 _g_dbus_debug_transport (void)
1538 {
1539   _g_dbus_initialize ();
1540   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1541 }
1542
1543 gboolean
1544 _g_dbus_debug_message (void)
1545 {
1546   _g_dbus_initialize ();
1547   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1548 }
1549
1550 gboolean
1551 _g_dbus_debug_payload (void)
1552 {
1553   _g_dbus_initialize ();
1554   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1555 }
1556
1557 gboolean
1558 _g_dbus_debug_call (void)
1559 {
1560   _g_dbus_initialize ();
1561   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1562 }
1563
1564 gboolean
1565 _g_dbus_debug_signal (void)
1566 {
1567   _g_dbus_initialize ();
1568   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1569 }
1570
1571 gboolean
1572 _g_dbus_debug_incoming (void)
1573 {
1574   _g_dbus_initialize ();
1575   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1576 }
1577
1578 gboolean
1579 _g_dbus_debug_return (void)
1580 {
1581   _g_dbus_initialize ();
1582   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1583 }
1584
1585 gboolean
1586 _g_dbus_debug_emission (void)
1587 {
1588   _g_dbus_initialize ();
1589   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1590 }
1591
1592 gboolean
1593 _g_dbus_debug_address (void)
1594 {
1595   _g_dbus_initialize ();
1596   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1597 }
1598
1599 G_LOCK_DEFINE_STATIC (print_lock);
1600
1601 void
1602 _g_dbus_debug_print_lock (void)
1603 {
1604   G_LOCK (print_lock);
1605 }
1606
1607 void
1608 _g_dbus_debug_print_unlock (void)
1609 {
1610   G_UNLOCK (print_lock);
1611 }
1612
1613 /*
1614  * _g_dbus_initialize:
1615  *
1616  * Does various one-time init things such as
1617  *
1618  *  - registering the G_DBUS_ERROR error domain
1619  *  - parses the G_DBUS_DEBUG environment variable
1620  */
1621 void
1622 _g_dbus_initialize (void)
1623 {
1624   static volatile gsize initialized = 0;
1625
1626   if (g_once_init_enter (&initialized))
1627     {
1628       volatile GQuark g_dbus_error_domain;
1629       const gchar *debug;
1630
1631       g_dbus_error_domain = G_DBUS_ERROR;
1632
1633       debug = g_getenv ("G_DBUS_DEBUG");
1634       if (debug != NULL)
1635         {
1636           const GDebugKey keys[] = {
1637             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1638             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
1639             { "message",        G_DBUS_DEBUG_MESSAGE        },
1640             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
1641             { "call",           G_DBUS_DEBUG_CALL           },
1642             { "signal",         G_DBUS_DEBUG_SIGNAL         },
1643             { "incoming",       G_DBUS_DEBUG_INCOMING       },
1644             { "return",         G_DBUS_DEBUG_RETURN         },
1645             { "emission",       G_DBUS_DEBUG_EMISSION       },
1646             { "address",        G_DBUS_DEBUG_ADDRESS        }
1647           };
1648
1649           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1650           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1651             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1652         }
1653
1654       g_once_init_leave (&initialized, 1);
1655     }
1656 }
1657
1658 /* ---------------------------------------------------------------------------------------------------- */
1659
1660 GVariantType *
1661 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1662 {
1663   const GVariantType *arg_types[256];
1664   guint n;
1665
1666   if (args)
1667     for (n = 0; args[n] != NULL; n++)
1668       {
1669         /* DBus places a hard limit of 255 on signature length.
1670          * therefore number of args must be less than 256.
1671          */
1672         g_assert (n < 256);
1673
1674         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1675
1676         if G_UNLIKELY (arg_types[n] == NULL)
1677           return NULL;
1678       }
1679   else
1680     n = 0;
1681
1682   return g_variant_type_new_tuple (arg_types, n);
1683 }
1684
1685 /* ---------------------------------------------------------------------------------------------------- */
1686
1687 #ifdef G_OS_WIN32
1688
1689 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1690
1691 gchar *
1692 _g_dbus_win32_get_user_sid (void)
1693 {
1694   HANDLE h;
1695   TOKEN_USER *user;
1696   DWORD token_information_len;
1697   PSID psid;
1698   gchar *sid;
1699   gchar *ret;
1700
1701   ret = NULL;
1702   user = NULL;
1703   h = INVALID_HANDLE_VALUE;
1704
1705   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
1706     {
1707       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
1708       goto out;
1709     }
1710
1711   /* Get length of buffer */
1712   token_information_len = 0;
1713   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
1714     {
1715       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
1716         {
1717           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1718           goto out;
1719         }
1720     }
1721   user = g_malloc (token_information_len);
1722   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
1723     {
1724       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1725       goto out;
1726     }
1727
1728   psid = user->User.Sid;
1729   if (!IsValidSid (psid))
1730     {
1731       g_warning ("Invalid SID");
1732       goto out;
1733     }
1734
1735   if (!ConvertSidToStringSidA (psid, &sid))
1736     {
1737       g_warning ("Invalid SID");
1738       goto out;
1739     }
1740
1741   ret = g_strdup (sid);
1742   LocalFree (sid);
1743
1744 out:
1745   g_free (user);
1746   if (h != INVALID_HANDLE_VALUE)
1747     CloseHandle (h);
1748   return ret;
1749 }
1750 #endif
1751
1752 /* ---------------------------------------------------------------------------------------------------- */
1753
1754 gchar *
1755 _g_dbus_get_machine_id (GError **error)
1756 {
1757   gchar *ret;
1758   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
1759   ret = NULL;
1760   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
1761                             &ret,
1762                             NULL,
1763                             error))
1764     {
1765       g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: "));
1766     }
1767   else
1768     {
1769       /* TODO: validate value */
1770       g_strstrip (ret);
1771     }
1772   return ret;
1773 }
1774
1775 /* ---------------------------------------------------------------------------------------------------- */
1776
1777 gchar *
1778 _g_dbus_enum_to_string (GType enum_type, gint value)
1779 {
1780   gchar *ret;
1781   GEnumClass *klass;
1782   GEnumValue *enum_value;
1783
1784   klass = g_type_class_ref (enum_type);
1785   enum_value = g_enum_get_value (klass, value);
1786   if (enum_value != NULL)
1787     ret = g_strdup (enum_value->value_nick);
1788   else
1789     ret = g_strdup_printf ("unknown (value %d)", value);
1790   g_type_class_unref (klass);
1791   return ret;
1792 }
1793
1794 /* ---------------------------------------------------------------------------------------------------- */
1795
1796 static void
1797 write_message_print_transport_debug (gssize bytes_written,
1798                                      MessageToWriteData *data)
1799 {
1800   if (G_LIKELY (!_g_dbus_debug_transport ()))
1801     goto out;
1802
1803   _g_dbus_debug_print_lock ();
1804   g_print ("========================================================================\n"
1805            "GDBus-debug:Transport:\n"
1806            "  >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
1807            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
1808            bytes_written,
1809            g_dbus_message_get_serial (data->message),
1810            data->blob_size,
1811            data->total_written,
1812            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1813   _g_dbus_debug_print_unlock ();
1814  out:
1815   ;
1816 }
1817
1818 /* ---------------------------------------------------------------------------------------------------- */
1819
1820 static void
1821 read_message_print_transport_debug (gssize bytes_read,
1822                                     GDBusWorker *worker)
1823 {
1824   gsize size;
1825   gint32 serial;
1826   gint32 message_length;
1827
1828   if (G_LIKELY (!_g_dbus_debug_transport ()))
1829     goto out;
1830
1831   size = bytes_read + worker->read_buffer_cur_size;
1832   serial = 0;
1833   message_length = 0;
1834   if (size >= 16)
1835     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
1836   if (size >= 1)
1837     {
1838       switch (worker->read_buffer[0])
1839         {
1840         case 'l':
1841           if (size >= 12)
1842             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
1843           break;
1844         case 'B':
1845           if (size >= 12)
1846             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
1847           break;
1848         default:
1849           /* an error will be set elsewhere if this happens */
1850           goto out;
1851         }
1852     }
1853
1854     _g_dbus_debug_print_lock ();
1855   g_print ("========================================================================\n"
1856            "GDBus-debug:Transport:\n"
1857            "  <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
1858            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
1859            bytes_read,
1860            serial,
1861            message_length,
1862            worker->read_buffer_cur_size,
1863            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
1864   _g_dbus_debug_print_unlock ();
1865  out:
1866   ;
1867 }