gdbus: Avoid blocking on worker thread in connection initialization
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: David Zeuthen <davidz@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <stdlib.h>
26 #include <string.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30
31 #include "giotypes.h"
32 #include "gsocket.h"
33 #include "gdbusprivate.h"
34 #include "gdbusmessage.h"
35 #include "gdbuserror.h"
36 #include "gdbusintrospection.h"
37 #include "gasyncresult.h"
38 #include "gsimpleasyncresult.h"
39 #include "ginputstream.h"
40 #include "gmemoryinputstream.h"
41 #include "giostream.h"
42 #include "gsocketcontrolmessage.h"
43 #include "gsocketconnection.h"
44 #include "gsocketoutputstream.h"
45
46 #ifdef G_OS_UNIX
47 #include "gunixfdmessage.h"
48 #include "gunixconnection.h"
49 #include "gunixcredentialsmessage.h"
50 #endif
51
52 #ifdef G_OS_WIN32
53 #include <windows.h>
54 #endif
55
56 #include "glibintl.h"
57
58 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
59
60 /* ---------------------------------------------------------------------------------------------------- */
61
62 gchar *
63 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
64 {
65  guint n, m;
66  GString *ret;
67
68  ret = g_string_new (NULL);
69
70  for (n = 0; n < len; n += 16)
71    {
72      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
73
74      for (m = n; m < n + 16; m++)
75        {
76          if (m > n && (m%4) == 0)
77            g_string_append_c (ret, ' ');
78          if (m < len)
79            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
80          else
81            g_string_append (ret, "   ");
82        }
83
84      g_string_append (ret, "   ");
85
86      for (m = n; m < len && m < n + 16; m++)
87        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
88
89      g_string_append_c (ret, '\n');
90    }
91
92  return g_string_free (ret, FALSE);
93 }
94
95 /* ---------------------------------------------------------------------------------------------------- */
96
97 /* Unfortunately ancillary messages are discarded when reading from a
98  * socket using the GSocketInputStream abstraction. So we provide a
99  * very GInputStream-ish API that uses GSocket in this case (very
100  * similar to GSocketInputStream).
101  */
102
103 typedef struct
104 {
105   GSocket *socket;
106   GCancellable *cancellable;
107
108   void *buffer;
109   gsize count;
110
111   GSocketControlMessage ***messages;
112   gint *num_messages;
113
114   GSimpleAsyncResult *simple;
115
116   gboolean from_mainloop;
117 } ReadWithControlData;
118
119 static void
120 read_with_control_data_free (ReadWithControlData *data)
121 {
122   g_object_unref (data->socket);
123   if (data->cancellable != NULL)
124     g_object_unref (data->cancellable);
125   g_object_unref (data->simple);
126   g_free (data);
127 }
128
129 static gboolean
130 _g_socket_read_with_control_messages_ready (GSocket      *socket,
131                                             GIOCondition  condition,
132                                             gpointer      user_data)
133 {
134   ReadWithControlData *data = user_data;
135   GError *error;
136   gssize result;
137   GInputVector vector;
138
139   error = NULL;
140   vector.buffer = data->buffer;
141   vector.size = data->count;
142   result = g_socket_receive_message (data->socket,
143                                      NULL, /* address */
144                                      &vector,
145                                      1,
146                                      data->messages,
147                                      data->num_messages,
148                                      NULL,
149                                      data->cancellable,
150                                      &error);
151   if (result >= 0)
152     {
153       g_simple_async_result_set_op_res_gssize (data->simple, result);
154     }
155   else
156     {
157       g_assert (error != NULL);
158       g_simple_async_result_take_error (data->simple, error);
159     }
160
161   if (data->from_mainloop)
162     g_simple_async_result_complete (data->simple);
163   else
164     g_simple_async_result_complete_in_idle (data->simple);
165
166   return FALSE;
167 }
168
169 static void
170 _g_socket_read_with_control_messages (GSocket                 *socket,
171                                       void                    *buffer,
172                                       gsize                    count,
173                                       GSocketControlMessage ***messages,
174                                       gint                    *num_messages,
175                                       gint                     io_priority,
176                                       GCancellable            *cancellable,
177                                       GAsyncReadyCallback      callback,
178                                       gpointer                 user_data)
179 {
180   ReadWithControlData *data;
181
182   data = g_new0 (ReadWithControlData, 1);
183   data->socket = g_object_ref (socket);
184   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
185   data->buffer = buffer;
186   data->count = count;
187   data->messages = messages;
188   data->num_messages = num_messages;
189
190   data->simple = g_simple_async_result_new (G_OBJECT (socket),
191                                             callback,
192                                             user_data,
193                                             _g_socket_read_with_control_messages);
194
195   if (!g_socket_condition_check (socket, G_IO_IN))
196     {
197       GSource *source;
198       data->from_mainloop = TRUE;
199       source = g_socket_create_source (data->socket,
200                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
201                                        cancellable);
202       g_source_set_callback (source,
203                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
204                              data,
205                              (GDestroyNotify) read_with_control_data_free);
206       g_source_attach (source, g_main_context_get_thread_default ());
207       g_source_unref (source);
208     }
209   else
210     {
211       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
212       read_with_control_data_free (data);
213     }
214 }
215
216 static gssize
217 _g_socket_read_with_control_messages_finish (GSocket       *socket,
218                                              GAsyncResult  *result,
219                                              GError       **error)
220 {
221   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
222
223   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
224   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
225
226   if (g_simple_async_result_propagate_error (simple, error))
227       return -1;
228   else
229     return g_simple_async_result_get_op_res_gssize (simple);
230 }
231
232 /* ---------------------------------------------------------------------------------------------------- */
233
234 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
235
236 static GPtrArray *ensured_classes = NULL;
237
238 static void
239 ensure_type (GType gtype)
240 {
241   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
242 }
243
244 static void
245 release_required_types (void)
246 {
247   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
248   g_ptr_array_unref (ensured_classes);
249   ensured_classes = NULL;
250 }
251
252 static void
253 ensure_required_types (void)
254 {
255   g_assert (ensured_classes == NULL);
256   ensured_classes = g_ptr_array_new ();
257   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
258   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
259 }
260 /* ---------------------------------------------------------------------------------------------------- */
261
262 typedef struct
263 {
264   volatile gint refcount;
265   GThread *thread;
266   GMainContext *context;
267   GMainLoop *loop;
268 } SharedThreadData;
269
270 static gpointer
271 gdbus_shared_thread_func (gpointer user_data)
272 {
273   SharedThreadData *data = user_data;
274
275   g_main_context_push_thread_default (data->context);
276   g_main_loop_run (data->loop);
277   g_main_context_pop_thread_default (data->context);
278
279   release_required_types ();
280
281   return NULL;
282 }
283
284 /* ---------------------------------------------------------------------------------------------------- */
285
286 static SharedThreadData *
287 _g_dbus_shared_thread_ref (void)
288 {
289   static gsize shared_thread_data = 0;
290   GError *error = NULL;
291   SharedThreadData *ret;
292
293   if (g_once_init_enter (&shared_thread_data))
294     {
295       SharedThreadData *data;
296
297       /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
298       ensure_required_types ();
299
300       data = g_new0 (SharedThreadData, 1);
301       data->refcount = 0;
302       
303       data->context = g_main_context_new ();
304       data->loop = g_main_loop_new (data->context, FALSE);
305       data->thread = g_thread_create (gdbus_shared_thread_func,
306                                       data,
307                                       TRUE,
308                                       &error);
309       g_assert_no_error (error);
310       /* We can cast between gsize and gpointer safely */
311       g_once_init_leave (&shared_thread_data, (gsize) data);
312     }
313
314   ret = (SharedThreadData*) shared_thread_data;
315   g_atomic_int_inc (&ret->refcount);
316   return ret;
317 }
318
319 static void
320 _g_dbus_shared_thread_unref (SharedThreadData *data)
321 {
322   /* TODO: actually destroy the shared thread here */
323 #if 0
324   g_assert (data != NULL);
325   if (g_atomic_int_dec_and_test (&data->refcount))
326     {
327       g_main_loop_quit (data->loop);
328       //g_thread_join (data->thread);
329       g_main_loop_unref (data->loop);
330       g_main_context_unref (data->context);
331     }
332 #endif
333 }
334
335 /* ---------------------------------------------------------------------------------------------------- */
336
337 struct GDBusWorker
338 {
339   volatile gint                       ref_count;
340
341   SharedThreadData                   *shared_thread_data;
342
343   gboolean                            stopped;
344
345   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
346    * only affects messages received from the other peer (since GDBusServer is the
347    * only user) - we might want it to affect messages sent to the other peer too?
348    */
349   gboolean                            frozen;
350   GDBusCapabilityFlags                capabilities;
351   GQueue                             *received_messages_while_frozen;
352
353   GIOStream                          *stream;
354   GCancellable                       *cancellable;
355   GDBusWorkerMessageReceivedCallback  message_received_callback;
356   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
357   GDBusWorkerDisconnectedCallback     disconnected_callback;
358   gpointer                            user_data;
359
360   /* if not NULL, stream is GSocketConnection */
361   GSocket *socket;
362
363   /* used for reading */
364   GMutex                             *read_lock;
365   gchar                              *read_buffer;
366   gsize                               read_buffer_allocated_size;
367   gsize                               read_buffer_cur_size;
368   gsize                               read_buffer_bytes_wanted;
369   GUnixFDList                        *read_fd_list;
370   GSocketControlMessage             **read_ancillary_messages;
371   gint                                read_num_ancillary_messages;
372
373   /* used for writing */
374   gint                                num_writes_pending;
375   GMutex                             *write_lock;
376   GQueue                             *write_queue;
377   guint64                             write_num_messages_written;
378   GList                              *write_pending_flushes;
379   gboolean                            flush_pending;
380 };
381
382 /* ---------------------------------------------------------------------------------------------------- */
383
384 typedef struct
385 {
386   GMutex *mutex;
387   GCond *cond;
388   guint64 number_to_wait_for;
389   GError *error;
390 } FlushData;
391
392 struct _MessageToWriteData ;
393 typedef struct _MessageToWriteData MessageToWriteData;
394
395 static void message_to_write_data_free (MessageToWriteData *data);
396
397 static void read_message_print_transport_debug (gssize bytes_read,
398                                                 GDBusWorker *worker);
399
400 static void write_message_print_transport_debug (gssize bytes_written,
401                                                  MessageToWriteData *data);
402
403 /* ---------------------------------------------------------------------------------------------------- */
404
405 static GDBusWorker *
406 _g_dbus_worker_ref (GDBusWorker *worker)
407 {
408   g_atomic_int_inc (&worker->ref_count);
409   return worker;
410 }
411
412 static void
413 _g_dbus_worker_unref (GDBusWorker *worker)
414 {
415   if (g_atomic_int_dec_and_test (&worker->ref_count))
416     {
417       g_assert (worker->write_pending_flushes == NULL);
418
419       _g_dbus_shared_thread_unref (worker->shared_thread_data);
420
421       g_object_unref (worker->stream);
422
423       g_mutex_free (worker->read_lock);
424       g_object_unref (worker->cancellable);
425       if (worker->read_fd_list != NULL)
426         g_object_unref (worker->read_fd_list);
427
428       g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
429       g_queue_free (worker->received_messages_while_frozen);
430
431       g_mutex_free (worker->write_lock);
432       g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
433       g_queue_free (worker->write_queue);
434
435       g_free (worker->read_buffer);
436
437       g_free (worker);
438     }
439 }
440
441 static void
442 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
443                                   gboolean      remote_peer_vanished,
444                                   GError       *error)
445 {
446   if (!worker->stopped)
447     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
448 }
449
450 static void
451 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
452                                       GDBusMessage *message)
453 {
454   if (!worker->stopped)
455     worker->message_received_callback (worker, message, worker->user_data);
456 }
457
458 static GDBusMessage *
459 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
460                                               GDBusMessage *message)
461 {
462   GDBusMessage *ret;
463   if (!worker->stopped)
464     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
465   else
466     ret = message;
467   return ret;
468 }
469
470 /* can only be called from private thread with read-lock held - takes ownership of @message */
471 static void
472 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
473                                                   GDBusMessage *message)
474 {
475   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
476     {
477       /* queue up */
478       g_queue_push_tail (worker->received_messages_while_frozen, message);
479     }
480   else
481     {
482       /* not frozen, nor anything in queue */
483       _g_dbus_worker_emit_message_received (worker, message);
484       g_object_unref (message);
485     }
486 }
487
488 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
489 static gboolean
490 unfreeze_in_idle_cb (gpointer user_data)
491 {
492   GDBusWorker *worker = user_data;
493   GDBusMessage *message;
494
495   g_mutex_lock (worker->read_lock);
496   if (worker->frozen)
497     {
498       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
499         {
500           _g_dbus_worker_emit_message_received (worker, message);
501           g_object_unref (message);
502         }
503       worker->frozen = FALSE;
504     }
505   else
506     {
507       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
508     }
509   g_mutex_unlock (worker->read_lock);
510   return FALSE;
511 }
512
513 /* can be called from any thread */
514 void
515 _g_dbus_worker_unfreeze (GDBusWorker *worker)
516 {
517   GSource *idle_source;
518   idle_source = g_idle_source_new ();
519   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
520   g_source_set_callback (idle_source,
521                          unfreeze_in_idle_cb,
522                          _g_dbus_worker_ref (worker),
523                          (GDestroyNotify) _g_dbus_worker_unref);
524   g_source_attach (idle_source, worker->shared_thread_data->context);
525   g_source_unref (idle_source);
526 }
527
528 /* ---------------------------------------------------------------------------------------------------- */
529
530 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
531
532 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
533 static void
534 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
535                            GAsyncResult  *res,
536                            gpointer       user_data)
537 {
538   GDBusWorker *worker = user_data;
539   GError *error;
540   gssize bytes_read;
541
542   g_mutex_lock (worker->read_lock);
543
544   /* If already stopped, don't even process the reply */
545   if (worker->stopped)
546     goto out;
547
548   error = NULL;
549   if (worker->socket == NULL)
550     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
551                                              res,
552                                              &error);
553   else
554     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
555                                                               res,
556                                                               &error);
557   if (worker->read_num_ancillary_messages > 0)
558     {
559       gint n;
560       for (n = 0; n < worker->read_num_ancillary_messages; n++)
561         {
562           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
563
564           if (FALSE)
565             {
566             }
567 #ifdef G_OS_UNIX
568           else if (G_IS_UNIX_FD_MESSAGE (control_message))
569             {
570               GUnixFDMessage *fd_message;
571               gint *fds;
572               gint num_fds;
573
574               fd_message = G_UNIX_FD_MESSAGE (control_message);
575               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
576               if (worker->read_fd_list == NULL)
577                 {
578                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
579                 }
580               else
581                 {
582                   gint n;
583                   for (n = 0; n < num_fds; n++)
584                     {
585                       /* TODO: really want a append_steal() */
586                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
587                       close (fds[n]);
588                     }
589                 }
590               g_free (fds);
591             }
592           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
593             {
594               /* do nothing */
595             }
596 #endif
597           else
598             {
599               if (error == NULL)
600                 {
601                   g_set_error (&error,
602                                G_IO_ERROR,
603                                G_IO_ERROR_FAILED,
604                                "Unexpected ancillary message of type %s received from peer",
605                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
606                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
607                   g_error_free (error);
608                   g_object_unref (control_message);
609                   n++;
610                   while (n < worker->read_num_ancillary_messages)
611                     g_object_unref (worker->read_ancillary_messages[n++]);
612                   g_free (worker->read_ancillary_messages);
613                   goto out;
614                 }
615             }
616           g_object_unref (control_message);
617         }
618       g_free (worker->read_ancillary_messages);
619     }
620
621   if (bytes_read == -1)
622     {
623       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
624       g_error_free (error);
625       goto out;
626     }
627
628 #if 0
629   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
630            (gint) bytes_read,
631            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
632            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
633            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
634                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
635            worker->stream,
636            worker);
637 #endif
638
639   /* TODO: hmm, hmm... */
640   if (bytes_read == 0)
641     {
642       g_set_error (&error,
643                    G_IO_ERROR,
644                    G_IO_ERROR_FAILED,
645                    "Underlying GIOStream returned 0 bytes on an async read");
646       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
647       g_error_free (error);
648       goto out;
649     }
650
651   read_message_print_transport_debug (bytes_read, worker);
652
653   worker->read_buffer_cur_size += bytes_read;
654   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
655     {
656       /* OK, got what we asked for! */
657       if (worker->read_buffer_bytes_wanted == 16)
658         {
659           gssize message_len;
660           /* OK, got the header - determine how many more bytes are needed */
661           error = NULL;
662           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
663                                                      16,
664                                                      &error);
665           if (message_len == -1)
666             {
667               g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
668               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
669               g_error_free (error);
670               goto out;
671             }
672
673           worker->read_buffer_bytes_wanted = message_len;
674           _g_dbus_worker_do_read_unlocked (worker);
675         }
676       else
677         {
678           GDBusMessage *message;
679           error = NULL;
680
681           /* TODO: use connection->priv->auth to decode the message */
682
683           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
684                                                   worker->read_buffer_cur_size,
685                                                   worker->capabilities,
686                                                   &error);
687           if (message == NULL)
688             {
689               gchar *s;
690               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
691               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
692                          "The error is: %s\n"
693                          "The payload is as follows:\n"
694                          "%s\n",
695                          worker->read_buffer_cur_size,
696                          error->message,
697                          s);
698               g_free (s);
699               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
700               g_error_free (error);
701               goto out;
702             }
703
704 #ifdef G_OS_UNIX
705           if (worker->read_fd_list != NULL)
706             {
707               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
708               g_object_unref (worker->read_fd_list);
709               worker->read_fd_list = NULL;
710             }
711 #endif
712
713           if (G_UNLIKELY (_g_dbus_debug_message ()))
714             {
715               gchar *s;
716               _g_dbus_debug_print_lock ();
717               g_print ("========================================================================\n"
718                        "GDBus-debug:Message:\n"
719                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
720                        worker->read_buffer_cur_size);
721               s = g_dbus_message_print (message, 2);
722               g_print ("%s", s);
723               g_free (s);
724               if (G_UNLIKELY (_g_dbus_debug_payload ()))
725                 {
726                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
727                   g_print ("%s\n", s);
728                   g_free (s);
729                 }
730               _g_dbus_debug_print_unlock ();
731             }
732
733           /* yay, got a message, go deliver it */
734           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
735
736           /* start reading another message! */
737           worker->read_buffer_bytes_wanted = 0;
738           worker->read_buffer_cur_size = 0;
739           _g_dbus_worker_do_read_unlocked (worker);
740         }
741     }
742   else
743     {
744       /* didn't get all the bytes we requested - so repeat the request... */
745       _g_dbus_worker_do_read_unlocked (worker);
746     }
747
748  out:
749   g_mutex_unlock (worker->read_lock);
750
751   /* gives up the reference acquired when calling g_input_stream_read_async() */
752   _g_dbus_worker_unref (worker);
753 }
754
755 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
756 static void
757 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
758 {
759   /* if bytes_wanted is zero, it means start reading a message */
760   if (worker->read_buffer_bytes_wanted == 0)
761     {
762       worker->read_buffer_cur_size = 0;
763       worker->read_buffer_bytes_wanted = 16;
764     }
765
766   /* ensure we have a (big enough) buffer */
767   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
768     {
769       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
770       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
771       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
772     }
773
774   if (worker->socket == NULL)
775     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
776                                worker->read_buffer + worker->read_buffer_cur_size,
777                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
778                                G_PRIORITY_DEFAULT,
779                                worker->cancellable,
780                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
781                                _g_dbus_worker_ref (worker));
782   else
783     {
784       worker->read_ancillary_messages = NULL;
785       worker->read_num_ancillary_messages = 0;
786       _g_socket_read_with_control_messages (worker->socket,
787                                             worker->read_buffer + worker->read_buffer_cur_size,
788                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
789                                             &worker->read_ancillary_messages,
790                                             &worker->read_num_ancillary_messages,
791                                             G_PRIORITY_DEFAULT,
792                                             worker->cancellable,
793                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
794                                             _g_dbus_worker_ref (worker));
795     }
796 }
797
798 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
799 static gboolean
800 _g_dbus_worker_do_initial_read (gpointer data)
801 {
802   GDBusWorker *worker = data;
803   g_mutex_lock (worker->read_lock);
804   _g_dbus_worker_do_read_unlocked (worker);
805   g_mutex_unlock (worker->read_lock);
806   return FALSE;
807 }
808
809 /* ---------------------------------------------------------------------------------------------------- */
810
811 struct _MessageToWriteData
812 {
813   GDBusWorker  *worker;
814   GDBusMessage *message;
815   gchar        *blob;
816   gsize         blob_size;
817
818   gsize               total_written;
819   GSimpleAsyncResult *simple;
820
821 };
822
823 static void
824 message_to_write_data_free (MessageToWriteData *data)
825 {
826   _g_dbus_worker_unref (data->worker);
827   if (data->message)
828     g_object_unref (data->message);
829   g_free (data->blob);
830   g_free (data);
831 }
832
833 /* ---------------------------------------------------------------------------------------------------- */
834
835 static void write_message_continue_writing (MessageToWriteData *data);
836
837 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
838 static void
839 write_message_async_cb (GObject      *source_object,
840                         GAsyncResult *res,
841                         gpointer      user_data)
842 {
843   MessageToWriteData *data = user_data;
844   GSimpleAsyncResult *simple;
845   gssize bytes_written;
846   GError *error;
847
848   /* Note: we can't access data->simple after calling g_async_result_complete () because the
849    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
850    */
851   simple = data->simple;
852
853   error = NULL;
854   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
855                                                 res,
856                                                 &error);
857   if (bytes_written == -1)
858     {
859       g_simple_async_result_take_error (simple, error);
860       g_simple_async_result_complete (simple);
861       g_object_unref (simple);
862       goto out;
863     }
864   g_assert (bytes_written > 0); /* zero is never returned */
865
866   write_message_print_transport_debug (bytes_written, data);
867
868   data->total_written += bytes_written;
869   g_assert (data->total_written <= data->blob_size);
870   if (data->total_written == data->blob_size)
871     {
872       g_simple_async_result_complete (simple);
873       g_object_unref (simple);
874       goto out;
875     }
876
877   write_message_continue_writing (data);
878
879  out:
880   ;
881 }
882
883 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
884 static gboolean
885 on_socket_ready (GSocket      *socket,
886                  GIOCondition  condition,
887                  gpointer      user_data)
888 {
889   MessageToWriteData *data = user_data;
890   write_message_continue_writing (data);
891   return FALSE; /* remove source */
892 }
893
894 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
895 static void
896 write_message_continue_writing (MessageToWriteData *data)
897 {
898   GOutputStream *ostream;
899   GSimpleAsyncResult *simple;
900 #ifdef G_OS_UNIX
901   GUnixFDList *fd_list;
902 #endif
903
904   /* Note: we can't access data->simple after calling g_async_result_complete () because the
905    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
906    */
907   simple = data->simple;
908
909   ostream = g_io_stream_get_output_stream (data->worker->stream);
910 #ifdef G_OS_UNIX
911   fd_list = g_dbus_message_get_unix_fd_list (data->message);
912 #endif
913
914   g_assert (!g_output_stream_has_pending (ostream));
915   g_assert_cmpint (data->total_written, <, data->blob_size);
916
917   if (FALSE)
918     {
919     }
920 #ifdef G_OS_UNIX
921   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
922     {
923       GOutputVector vector;
924       GSocketControlMessage *control_message;
925       gssize bytes_written;
926       GError *error;
927
928       vector.buffer = data->blob;
929       vector.size = data->blob_size;
930
931       control_message = NULL;
932       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
933         {
934           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
935             {
936               g_simple_async_result_set_error (simple,
937                                                G_IO_ERROR,
938                                                G_IO_ERROR_FAILED,
939                                                "Tried sending a file descriptor but remote peer does not support this capability");
940               g_simple_async_result_complete (simple);
941               g_object_unref (simple);
942               goto out;
943             }
944           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
945         }
946
947       error = NULL;
948       bytes_written = g_socket_send_message (data->worker->socket,
949                                              NULL, /* address */
950                                              &vector,
951                                              1,
952                                              control_message != NULL ? &control_message : NULL,
953                                              control_message != NULL ? 1 : 0,
954                                              G_SOCKET_MSG_NONE,
955                                              data->worker->cancellable,
956                                              &error);
957       if (control_message != NULL)
958         g_object_unref (control_message);
959
960       if (bytes_written == -1)
961         {
962           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
963           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
964             {
965               GSource *source;
966               source = g_socket_create_source (data->worker->socket,
967                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
968                                                data->worker->cancellable);
969               g_source_set_callback (source,
970                                      (GSourceFunc) on_socket_ready,
971                                      data,
972                                      NULL); /* GDestroyNotify */
973               g_source_attach (source, g_main_context_get_thread_default ());
974               g_source_unref (source);
975               g_error_free (error);
976               goto out;
977             }
978           g_simple_async_result_take_error (simple, error);
979           g_simple_async_result_complete (simple);
980           g_object_unref (simple);
981           goto out;
982         }
983       g_assert (bytes_written > 0); /* zero is never returned */
984
985       write_message_print_transport_debug (bytes_written, data);
986
987       data->total_written += bytes_written;
988       g_assert (data->total_written <= data->blob_size);
989       if (data->total_written == data->blob_size)
990         {
991           g_simple_async_result_complete (simple);
992           g_object_unref (simple);
993           goto out;
994         }
995
996       write_message_continue_writing (data);
997     }
998 #endif
999   else
1000     {
1001 #ifdef G_OS_UNIX
1002       if (fd_list != NULL)
1003         {
1004           g_simple_async_result_set_error (simple,
1005                                            G_IO_ERROR,
1006                                            G_IO_ERROR_FAILED,
1007                                            "Tried sending a file descriptor on unsupported stream of type %s",
1008                                            g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1009           g_simple_async_result_complete (simple);
1010           g_object_unref (simple);
1011           goto out;
1012         }
1013 #endif
1014
1015       g_output_stream_write_async (ostream,
1016                                    (const gchar *) data->blob + data->total_written,
1017                                    data->blob_size - data->total_written,
1018                                    G_PRIORITY_DEFAULT,
1019                                    data->worker->cancellable,
1020                                    write_message_async_cb,
1021                                    data);
1022     }
1023  out:
1024   ;
1025 }
1026
1027 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1028 static void
1029 write_message_async (GDBusWorker         *worker,
1030                      MessageToWriteData  *data,
1031                      GAsyncReadyCallback  callback,
1032                      gpointer             user_data)
1033 {
1034   data->simple = g_simple_async_result_new (NULL,
1035                                             callback,
1036                                             user_data,
1037                                             write_message_async);
1038   data->total_written = 0;
1039   write_message_continue_writing (data);
1040 }
1041
1042 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1043 static gboolean
1044 write_message_finish (GAsyncResult   *res,
1045                       GError        **error)
1046 {
1047   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1048   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1049     return FALSE;
1050   else
1051     return TRUE;
1052 }
1053 /* ---------------------------------------------------------------------------------------------------- */
1054
1055 static void maybe_write_next_message (GDBusWorker *worker);
1056
1057 typedef struct
1058 {
1059   GDBusWorker *worker;
1060   GList *flushers;
1061 } FlushAsyncData;
1062
1063 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1064 static void
1065 ostream_flush_cb (GObject      *source_object,
1066                   GAsyncResult *res,
1067                   gpointer      user_data)
1068 {
1069   FlushAsyncData *data = user_data;
1070   GError *error;
1071   GList *l;
1072
1073   error = NULL;
1074   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1075                                 res,
1076                                 &error);
1077
1078   if (error == NULL)
1079     {
1080       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1081         {
1082           _g_dbus_debug_print_lock ();
1083           g_print ("========================================================================\n"
1084                    "GDBus-debug:Transport:\n"
1085                    "  ---- FLUSHED stream of type %s\n",
1086                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1087           _g_dbus_debug_print_unlock ();
1088         }
1089     }
1090
1091   g_assert (data->flushers != NULL);
1092   for (l = data->flushers; l != NULL; l = l->next)
1093     {
1094       FlushData *f = l->data;
1095
1096       f->error = error != NULL ? g_error_copy (error) : NULL;
1097
1098       g_mutex_lock (f->mutex);
1099       g_cond_signal (f->cond);
1100       g_mutex_unlock (f->mutex);
1101     }
1102   g_list_free (data->flushers);
1103
1104   if (error != NULL)
1105     g_error_free (error);
1106
1107   /* Make sure we tell folks that we don't have additional
1108      flushes pending */
1109   g_mutex_lock (data->worker->write_lock);
1110   data->worker->flush_pending = FALSE;
1111   g_mutex_unlock (data->worker->write_lock);
1112
1113   /* OK, cool, finally kick off the next write */
1114   maybe_write_next_message (data->worker);
1115
1116   _g_dbus_worker_unref (data->worker);
1117   g_free (data);
1118 }
1119
1120 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1121 static void
1122 message_written (GDBusWorker *worker,
1123                  MessageToWriteData *message_data)
1124 {
1125   GList *l;
1126   GList *ll;
1127   GList *flushers;
1128
1129   /* first log the fact that we wrote a message */
1130   if (G_UNLIKELY (_g_dbus_debug_message ()))
1131     {
1132       gchar *s;
1133       _g_dbus_debug_print_lock ();
1134       g_print ("========================================================================\n"
1135                "GDBus-debug:Message:\n"
1136                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1137                message_data->blob_size);
1138       s = g_dbus_message_print (message_data->message, 2);
1139       g_print ("%s", s);
1140       g_free (s);
1141       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1142         {
1143           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1144           g_print ("%s\n", s);
1145           g_free (s);
1146         }
1147       _g_dbus_debug_print_unlock ();
1148     }
1149
1150   /* then first wake up pending flushes and, if needed, flush the stream */
1151   flushers = NULL;
1152   g_mutex_lock (worker->write_lock);
1153   worker->write_num_messages_written += 1;
1154   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1155     {
1156       FlushData *f = l->data;
1157       ll = l->next;
1158
1159       if (f->number_to_wait_for == worker->write_num_messages_written)
1160         {
1161           flushers = g_list_append (flushers, f);
1162           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1163         }
1164     }
1165   if (flushers != NULL)
1166     {
1167       worker->flush_pending = TRUE;
1168     }
1169   g_mutex_unlock (worker->write_lock);
1170
1171   if (flushers != NULL)
1172     {
1173       FlushAsyncData *data;
1174       data = g_new0 (FlushAsyncData, 1);
1175       data->worker = _g_dbus_worker_ref (worker);
1176       data->flushers = flushers;
1177       /* flush the stream before writing the next message */
1178       g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
1179                                    G_PRIORITY_DEFAULT,
1180                                    worker->cancellable,
1181                                    ostream_flush_cb,
1182                                    data);
1183     }
1184   else
1185     {
1186       /* kick off the next write! */
1187       maybe_write_next_message (worker);
1188     }
1189 }
1190
1191 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1192 static void
1193 write_message_cb (GObject       *source_object,
1194                   GAsyncResult  *res,
1195                   gpointer       user_data)
1196 {
1197   MessageToWriteData *data = user_data;
1198   GError *error;
1199
1200   g_mutex_lock (data->worker->write_lock);
1201   data->worker->num_writes_pending -= 1;
1202   g_mutex_unlock (data->worker->write_lock);
1203
1204   error = NULL;
1205   if (!write_message_finish (res, &error))
1206     {
1207       /* TODO: handle */
1208       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1209       g_error_free (error);
1210     }
1211
1212   /* this function will also kick of the next write (it might need to
1213    * flush so writing the next message might happen much later
1214    * e.g. async)
1215    */
1216   message_written (data->worker, data);
1217
1218   message_to_write_data_free (data);
1219 }
1220
1221 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1222 static void
1223 maybe_write_next_message (GDBusWorker *worker)
1224 {
1225   MessageToWriteData *data;
1226
1227  write_next:
1228
1229   g_mutex_lock (worker->write_lock);
1230   data = g_queue_pop_head (worker->write_queue);
1231   if (data != NULL)
1232     worker->num_writes_pending += 1;
1233   g_mutex_unlock (worker->write_lock);
1234
1235   /* Note that write_lock is only used for protecting the @write_queue
1236    * and @num_writes_pending fields of the GDBusWorker struct ... which we
1237    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1238    *
1239    * Therefore, it's fine to drop it here when calling back into user
1240    * code and then writing the message out onto the GIOStream since this
1241    * function only runs on the worker thread.
1242    */
1243   if (data != NULL)
1244     {
1245       GDBusMessage *old_message;
1246       guchar *new_blob;
1247       gsize new_blob_size;
1248       GError *error;
1249
1250       old_message = data->message;
1251       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1252       if (data->message == old_message)
1253         {
1254           /* filters had no effect - do nothing */
1255         }
1256       else if (data->message == NULL)
1257         {
1258           /* filters dropped message */
1259           g_mutex_lock (worker->write_lock);
1260           worker->num_writes_pending -= 1;
1261           g_mutex_unlock (worker->write_lock);
1262           message_to_write_data_free (data);
1263           goto write_next;
1264         }
1265       else
1266         {
1267           /* filters altered the message -> reencode */
1268           error = NULL;
1269           new_blob = g_dbus_message_to_blob (data->message,
1270                                              &new_blob_size,
1271                                              worker->capabilities,
1272                                              &error);
1273           if (new_blob == NULL)
1274             {
1275               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1276                * the old message instead
1277                */
1278               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1279                          g_dbus_message_get_serial (data->message),
1280                          error->message);
1281               g_error_free (error);
1282             }
1283           else
1284             {
1285               g_free (data->blob);
1286               data->blob = (gchar *) new_blob;
1287               data->blob_size = new_blob_size;
1288             }
1289         }
1290
1291       write_message_async (worker,
1292                            data,
1293                            write_message_cb,
1294                            data);
1295     }
1296 }
1297
1298 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1299 static gboolean
1300 write_message_in_idle_cb (gpointer user_data)
1301 {
1302   GDBusWorker *worker = user_data;
1303   if (worker->num_writes_pending == 0 && !worker->flush_pending)
1304     maybe_write_next_message (worker);
1305   return FALSE;
1306 }
1307
1308 /* ---------------------------------------------------------------------------------------------------- */
1309
1310 /* can be called from any thread - steals blob */
1311 void
1312 _g_dbus_worker_send_message (GDBusWorker    *worker,
1313                              GDBusMessage   *message,
1314                              gchar          *blob,
1315                              gsize           blob_len)
1316 {
1317   MessageToWriteData *data;
1318
1319   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1320   g_return_if_fail (blob != NULL);
1321   g_return_if_fail (blob_len > 16);
1322
1323   data = g_new0 (MessageToWriteData, 1);
1324   data->worker = _g_dbus_worker_ref (worker);
1325   data->message = g_object_ref (message);
1326   data->blob = blob; /* steal! */
1327   data->blob_size = blob_len;
1328
1329   g_mutex_lock (worker->write_lock);
1330   g_queue_push_tail (worker->write_queue, data);
1331   if (worker->num_writes_pending == 0)
1332     {
1333       GSource *idle_source;
1334       idle_source = g_idle_source_new ();
1335       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1336       g_source_set_callback (idle_source,
1337                              write_message_in_idle_cb,
1338                              _g_dbus_worker_ref (worker),
1339                              (GDestroyNotify) _g_dbus_worker_unref);
1340       g_source_attach (idle_source, worker->shared_thread_data->context);
1341       g_source_unref (idle_source);
1342     }
1343   g_mutex_unlock (worker->write_lock);
1344 }
1345
1346 /* ---------------------------------------------------------------------------------------------------- */
1347
1348 GDBusWorker *
1349 _g_dbus_worker_new (GIOStream                              *stream,
1350                     GDBusCapabilityFlags                    capabilities,
1351                     gboolean                                initially_frozen,
1352                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1353                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1354                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1355                     gpointer                                user_data)
1356 {
1357   GDBusWorker *worker;
1358   GSource *idle_source;
1359
1360   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1361   g_return_val_if_fail (message_received_callback != NULL, NULL);
1362   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1363   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1364
1365   worker = g_new0 (GDBusWorker, 1);
1366   worker->ref_count = 1;
1367
1368   worker->read_lock = g_mutex_new ();
1369   worker->message_received_callback = message_received_callback;
1370   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1371   worker->disconnected_callback = disconnected_callback;
1372   worker->user_data = user_data;
1373   worker->stream = g_object_ref (stream);
1374   worker->capabilities = capabilities;
1375   worker->cancellable = g_cancellable_new ();
1376   worker->flush_pending = FALSE;
1377
1378   worker->frozen = initially_frozen;
1379   worker->received_messages_while_frozen = g_queue_new ();
1380
1381   worker->write_lock = g_mutex_new ();
1382   worker->write_queue = g_queue_new ();
1383
1384   if (G_IS_SOCKET_CONNECTION (worker->stream))
1385     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1386
1387   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1388
1389   /* begin reading */
1390   idle_source = g_idle_source_new ();
1391   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1392   g_source_set_callback (idle_source,
1393                          _g_dbus_worker_do_initial_read,
1394                          worker,
1395                          NULL);
1396   g_source_attach (idle_source, worker->shared_thread_data->context);
1397   g_source_unref (idle_source);
1398
1399   return worker;
1400 }
1401
1402 /* ---------------------------------------------------------------------------------------------------- */
1403
1404 /* This can be called from any thread - frees worker. Note that
1405  * callbacks might still happen if called from another thread than the
1406  * worker - use your own synchronization primitive in the callbacks.
1407  */
1408 void
1409 _g_dbus_worker_stop (GDBusWorker *worker)
1410 {
1411   worker->stopped = TRUE;
1412   g_cancellable_cancel (worker->cancellable);
1413   _g_dbus_worker_unref (worker);
1414 }
1415
1416 /* ---------------------------------------------------------------------------------------------------- */
1417
1418 /* can be called from any thread (except the worker thread) - blocks
1419  * calling thread until all queued outgoing messages are written and
1420  * the transport has been flushed
1421  */
1422 gboolean
1423 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1424                            GCancellable   *cancellable,
1425                            GError        **error)
1426 {
1427   gboolean ret;
1428   FlushData *data;
1429
1430   data = NULL;
1431   ret = TRUE;
1432
1433   /* if the queue is empty, there's nothing to wait for */
1434   g_mutex_lock (worker->write_lock);
1435   if (g_queue_get_length (worker->write_queue) > 0)
1436     {
1437       data = g_new0 (FlushData, 1);
1438       data->mutex = g_mutex_new ();
1439       data->cond = g_cond_new ();
1440       data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
1441       g_mutex_lock (data->mutex);
1442       worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
1443     }
1444   g_mutex_unlock (worker->write_lock);
1445
1446   if (data != NULL)
1447     {
1448       g_cond_wait (data->cond, data->mutex);
1449       g_mutex_unlock (data->mutex);
1450
1451       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1452       g_cond_free (data->cond);
1453       g_mutex_free (data->mutex);
1454       if (data->error != NULL)
1455         {
1456           ret = FALSE;
1457           g_propagate_error (error, data->error);
1458         }
1459       g_free (data);
1460     }
1461
1462   return ret;
1463 }
1464
1465 /* ---------------------------------------------------------------------------------------------------- */
1466
1467 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1468 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1469 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1470 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1471 #define G_DBUS_DEBUG_CALL           (1<<4)
1472 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1473 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1474 #define G_DBUS_DEBUG_RETURN         (1<<7)
1475 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1476 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1477
1478 static gint _gdbus_debug_flags = 0;
1479
1480 gboolean
1481 _g_dbus_debug_authentication (void)
1482 {
1483   _g_dbus_initialize ();
1484   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1485 }
1486
1487 gboolean
1488 _g_dbus_debug_transport (void)
1489 {
1490   _g_dbus_initialize ();
1491   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1492 }
1493
1494 gboolean
1495 _g_dbus_debug_message (void)
1496 {
1497   _g_dbus_initialize ();
1498   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1499 }
1500
1501 gboolean
1502 _g_dbus_debug_payload (void)
1503 {
1504   _g_dbus_initialize ();
1505   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1506 }
1507
1508 gboolean
1509 _g_dbus_debug_call (void)
1510 {
1511   _g_dbus_initialize ();
1512   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1513 }
1514
1515 gboolean
1516 _g_dbus_debug_signal (void)
1517 {
1518   _g_dbus_initialize ();
1519   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1520 }
1521
1522 gboolean
1523 _g_dbus_debug_incoming (void)
1524 {
1525   _g_dbus_initialize ();
1526   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1527 }
1528
1529 gboolean
1530 _g_dbus_debug_return (void)
1531 {
1532   _g_dbus_initialize ();
1533   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1534 }
1535
1536 gboolean
1537 _g_dbus_debug_emission (void)
1538 {
1539   _g_dbus_initialize ();
1540   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1541 }
1542
1543 gboolean
1544 _g_dbus_debug_address (void)
1545 {
1546   _g_dbus_initialize ();
1547   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1548 }
1549
1550 G_LOCK_DEFINE_STATIC (print_lock);
1551
1552 void
1553 _g_dbus_debug_print_lock (void)
1554 {
1555   G_LOCK (print_lock);
1556 }
1557
1558 void
1559 _g_dbus_debug_print_unlock (void)
1560 {
1561   G_UNLOCK (print_lock);
1562 }
1563
1564 /*
1565  * _g_dbus_initialize:
1566  *
1567  * Does various one-time init things such as
1568  *
1569  *  - registering the G_DBUS_ERROR error domain
1570  *  - parses the G_DBUS_DEBUG environment variable
1571  */
1572 void
1573 _g_dbus_initialize (void)
1574 {
1575   static volatile gsize initialized = 0;
1576
1577   if (g_once_init_enter (&initialized))
1578     {
1579       volatile GQuark g_dbus_error_domain;
1580       const gchar *debug;
1581
1582       g_dbus_error_domain = G_DBUS_ERROR;
1583       (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
1584
1585       debug = g_getenv ("G_DBUS_DEBUG");
1586       if (debug != NULL)
1587         {
1588           const GDebugKey keys[] = {
1589             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1590             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
1591             { "message",        G_DBUS_DEBUG_MESSAGE        },
1592             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
1593             { "call",           G_DBUS_DEBUG_CALL           },
1594             { "signal",         G_DBUS_DEBUG_SIGNAL         },
1595             { "incoming",       G_DBUS_DEBUG_INCOMING       },
1596             { "return",         G_DBUS_DEBUG_RETURN         },
1597             { "emission",       G_DBUS_DEBUG_EMISSION       },
1598             { "address",        G_DBUS_DEBUG_ADDRESS        }
1599           };
1600
1601           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1602           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1603             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1604         }
1605
1606       g_once_init_leave (&initialized, 1);
1607     }
1608 }
1609
1610 /* ---------------------------------------------------------------------------------------------------- */
1611
1612 GVariantType *
1613 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1614 {
1615   const GVariantType *arg_types[256];
1616   guint n;
1617
1618   if (args)
1619     for (n = 0; args[n] != NULL; n++)
1620       {
1621         /* DBus places a hard limit of 255 on signature length.
1622          * therefore number of args must be less than 256.
1623          */
1624         g_assert (n < 256);
1625
1626         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1627
1628         if G_UNLIKELY (arg_types[n] == NULL)
1629           return NULL;
1630       }
1631   else
1632     n = 0;
1633
1634   return g_variant_type_new_tuple (arg_types, n);
1635 }
1636
1637 /* ---------------------------------------------------------------------------------------------------- */
1638
1639 #ifdef G_OS_WIN32
1640
1641 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1642
1643 gchar *
1644 _g_dbus_win32_get_user_sid (void)
1645 {
1646   HANDLE h;
1647   TOKEN_USER *user;
1648   DWORD token_information_len;
1649   PSID psid;
1650   gchar *sid;
1651   gchar *ret;
1652
1653   ret = NULL;
1654   user = NULL;
1655   h = INVALID_HANDLE_VALUE;
1656
1657   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
1658     {
1659       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
1660       goto out;
1661     }
1662
1663   /* Get length of buffer */
1664   token_information_len = 0;
1665   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
1666     {
1667       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
1668         {
1669           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1670           goto out;
1671         }
1672     }
1673   user = g_malloc (token_information_len);
1674   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
1675     {
1676       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1677       goto out;
1678     }
1679
1680   psid = user->User.Sid;
1681   if (!IsValidSid (psid))
1682     {
1683       g_warning ("Invalid SID");
1684       goto out;
1685     }
1686
1687   if (!ConvertSidToStringSidA (psid, &sid))
1688     {
1689       g_warning ("Invalid SID");
1690       goto out;
1691     }
1692
1693   ret = g_strdup (sid);
1694   LocalFree (sid);
1695
1696 out:
1697   g_free (user);
1698   if (h != INVALID_HANDLE_VALUE)
1699     CloseHandle (h);
1700   return ret;
1701 }
1702 #endif
1703
1704 /* ---------------------------------------------------------------------------------------------------- */
1705
1706 gchar *
1707 _g_dbus_get_machine_id (GError **error)
1708 {
1709   gchar *ret;
1710   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
1711   ret = NULL;
1712   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
1713                             &ret,
1714                             NULL,
1715                             error))
1716     {
1717       g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: "));
1718     }
1719   else
1720     {
1721       /* TODO: validate value */
1722       g_strstrip (ret);
1723     }
1724   return ret;
1725 }
1726
1727 /* ---------------------------------------------------------------------------------------------------- */
1728
1729 gchar *
1730 _g_dbus_enum_to_string (GType enum_type, gint value)
1731 {
1732   gchar *ret;
1733   GEnumClass *klass;
1734   GEnumValue *enum_value;
1735
1736   klass = g_type_class_ref (enum_type);
1737   enum_value = g_enum_get_value (klass, value);
1738   if (enum_value != NULL)
1739     ret = g_strdup (enum_value->value_nick);
1740   else
1741     ret = g_strdup_printf ("unknown (value %d)", value);
1742   g_type_class_unref (klass);
1743   return ret;
1744 }
1745
1746 /* ---------------------------------------------------------------------------------------------------- */
1747
1748 static void
1749 write_message_print_transport_debug (gssize bytes_written,
1750                                      MessageToWriteData *data)
1751 {
1752   if (G_LIKELY (!_g_dbus_debug_transport ()))
1753     goto out;
1754
1755   _g_dbus_debug_print_lock ();
1756   g_print ("========================================================================\n"
1757            "GDBus-debug:Transport:\n"
1758            "  >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
1759            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
1760            bytes_written,
1761            g_dbus_message_get_serial (data->message),
1762            data->blob_size,
1763            data->total_written,
1764            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1765   _g_dbus_debug_print_unlock ();
1766  out:
1767   ;
1768 }
1769
1770 /* ---------------------------------------------------------------------------------------------------- */
1771
1772 static void
1773 read_message_print_transport_debug (gssize bytes_read,
1774                                     GDBusWorker *worker)
1775 {
1776   gsize size;
1777   gint32 serial;
1778   gint32 message_length;
1779
1780   if (G_LIKELY (!_g_dbus_debug_transport ()))
1781     goto out;
1782
1783   size = bytes_read + worker->read_buffer_cur_size;
1784   serial = 0;
1785   message_length = 0;
1786   if (size >= 16)
1787     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
1788   if (size >= 1)
1789     {
1790       switch (worker->read_buffer[0])
1791         {
1792         case 'l':
1793           if (size >= 12)
1794             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
1795           break;
1796         case 'B':
1797           if (size >= 12)
1798             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
1799           break;
1800         default:
1801           /* an error will be set elsewhere if this happens */
1802           goto out;
1803         }
1804     }
1805
1806     _g_dbus_debug_print_lock ();
1807   g_print ("========================================================================\n"
1808            "GDBus-debug:Transport:\n"
1809            "  <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
1810            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
1811            bytes_read,
1812            serial,
1813            message_length,
1814            worker->read_buffer_cur_size,
1815            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
1816   _g_dbus_debug_print_unlock ();
1817  out:
1818   ;
1819 }
1820
1821 /* ---------------------------------------------------------------------------------------------------- */
1822
1823 gboolean
1824 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
1825                                      GValue                *return_accu,
1826                                      const GValue          *handler_return,
1827                                      gpointer               dummy)
1828 {
1829   gboolean continue_emission;
1830   gboolean signal_return;
1831
1832   signal_return = g_value_get_boolean (handler_return);
1833   g_value_set_boolean (return_accu, signal_return);
1834   continue_emission = signal_return;
1835
1836   return continue_emission;
1837 }