785a0c09091599d802da567d20abf70382b1172b
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: David Zeuthen <davidz@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <stdlib.h>
26 #include <string.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30
31 #include "giotypes.h"
32 #include "gsocket.h"
33 #include "gdbusprivate.h"
34 #include "gdbusmessage.h"
35 #include "gdbuserror.h"
36 #include "gdbusintrospection.h"
37 #include "gasyncresult.h"
38 #include "gsimpleasyncresult.h"
39 #include "ginputstream.h"
40 #include "gmemoryinputstream.h"
41 #include "giostream.h"
42 #include "glib/gstdio.h"
43 #include "gsocketcontrolmessage.h"
44 #include "gsocketconnection.h"
45 #include "gsocketoutputstream.h"
46
47 #ifdef G_OS_UNIX
48 #include "gunixfdmessage.h"
49 #include "gunixconnection.h"
50 #include "gunixcredentialsmessage.h"
51 #endif
52
53 #ifdef G_OS_WIN32
54 #include <windows.h>
55 #endif
56
57 #include "glibintl.h"
58
59 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
60
61 /* ---------------------------------------------------------------------------------------------------- */
62
63 gchar *
64 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
65 {
66  guint n, m;
67  GString *ret;
68
69  ret = g_string_new (NULL);
70
71  for (n = 0; n < len; n += 16)
72    {
73      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
74
75      for (m = n; m < n + 16; m++)
76        {
77          if (m > n && (m%4) == 0)
78            g_string_append_c (ret, ' ');
79          if (m < len)
80            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
81          else
82            g_string_append (ret, "   ");
83        }
84
85      g_string_append (ret, "   ");
86
87      for (m = n; m < len && m < n + 16; m++)
88        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
89
90      g_string_append_c (ret, '\n');
91    }
92
93  return g_string_free (ret, FALSE);
94 }
95
96 /* ---------------------------------------------------------------------------------------------------- */
97
98 /* Unfortunately ancillary messages are discarded when reading from a
99  * socket using the GSocketInputStream abstraction. So we provide a
100  * very GInputStream-ish API that uses GSocket in this case (very
101  * similar to GSocketInputStream).
102  */
103
104 typedef struct
105 {
106   GSocket *socket;
107   GCancellable *cancellable;
108
109   void *buffer;
110   gsize count;
111
112   GSocketControlMessage ***messages;
113   gint *num_messages;
114
115   GSimpleAsyncResult *simple;
116
117   gboolean from_mainloop;
118 } ReadWithControlData;
119
120 static void
121 read_with_control_data_free (ReadWithControlData *data)
122 {
123   g_object_unref (data->socket);
124   if (data->cancellable != NULL)
125     g_object_unref (data->cancellable);
126   g_object_unref (data->simple);
127   g_free (data);
128 }
129
130 static gboolean
131 _g_socket_read_with_control_messages_ready (GSocket      *socket,
132                                             GIOCondition  condition,
133                                             gpointer      user_data)
134 {
135   ReadWithControlData *data = user_data;
136   GError *error;
137   gssize result;
138   GInputVector vector;
139
140   error = NULL;
141   vector.buffer = data->buffer;
142   vector.size = data->count;
143   result = g_socket_receive_message (data->socket,
144                                      NULL, /* address */
145                                      &vector,
146                                      1,
147                                      data->messages,
148                                      data->num_messages,
149                                      NULL,
150                                      data->cancellable,
151                                      &error);
152   if (result >= 0)
153     {
154       g_simple_async_result_set_op_res_gssize (data->simple, result);
155     }
156   else
157     {
158       g_assert (error != NULL);
159       g_simple_async_result_take_error (data->simple, error);
160     }
161
162   if (data->from_mainloop)
163     g_simple_async_result_complete (data->simple);
164   else
165     g_simple_async_result_complete_in_idle (data->simple);
166
167   return FALSE;
168 }
169
170 static void
171 _g_socket_read_with_control_messages (GSocket                 *socket,
172                                       void                    *buffer,
173                                       gsize                    count,
174                                       GSocketControlMessage ***messages,
175                                       gint                    *num_messages,
176                                       gint                     io_priority,
177                                       GCancellable            *cancellable,
178                                       GAsyncReadyCallback      callback,
179                                       gpointer                 user_data)
180 {
181   ReadWithControlData *data;
182
183   data = g_new0 (ReadWithControlData, 1);
184   data->socket = g_object_ref (socket);
185   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
186   data->buffer = buffer;
187   data->count = count;
188   data->messages = messages;
189   data->num_messages = num_messages;
190
191   data->simple = g_simple_async_result_new (G_OBJECT (socket),
192                                             callback,
193                                             user_data,
194                                             _g_socket_read_with_control_messages);
195   g_simple_async_result_set_check_cancellable (data->simple, cancellable);
196
197   if (!g_socket_condition_check (socket, G_IO_IN))
198     {
199       GSource *source;
200       data->from_mainloop = TRUE;
201       source = g_socket_create_source (data->socket,
202                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
203                                        cancellable);
204       g_source_set_callback (source,
205                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
206                              data,
207                              (GDestroyNotify) read_with_control_data_free);
208       g_source_attach (source, g_main_context_get_thread_default ());
209       g_source_unref (source);
210     }
211   else
212     {
213       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
214       read_with_control_data_free (data);
215     }
216 }
217
218 static gssize
219 _g_socket_read_with_control_messages_finish (GSocket       *socket,
220                                              GAsyncResult  *result,
221                                              GError       **error)
222 {
223   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
224
225   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
226   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
227
228   if (g_simple_async_result_propagate_error (simple, error))
229       return -1;
230   else
231     return g_simple_async_result_get_op_res_gssize (simple);
232 }
233
234 /* ---------------------------------------------------------------------------------------------------- */
235
236 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
237
238 static GPtrArray *ensured_classes = NULL;
239
240 static void
241 ensure_type (GType gtype)
242 {
243   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
244 }
245
246 static void
247 release_required_types (void)
248 {
249   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
250   g_ptr_array_unref (ensured_classes);
251   ensured_classes = NULL;
252 }
253
254 static void
255 ensure_required_types (void)
256 {
257   g_assert (ensured_classes == NULL);
258   ensured_classes = g_ptr_array_new ();
259   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
260   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
261 }
262 /* ---------------------------------------------------------------------------------------------------- */
263
264 typedef struct
265 {
266   volatile gint refcount;
267   GThread *thread;
268   GMainContext *context;
269   GMainLoop *loop;
270 } SharedThreadData;
271
272 static gpointer
273 gdbus_shared_thread_func (gpointer user_data)
274 {
275   SharedThreadData *data = user_data;
276
277   g_main_context_push_thread_default (data->context);
278   g_main_loop_run (data->loop);
279   g_main_context_pop_thread_default (data->context);
280
281   release_required_types ();
282
283   return NULL;
284 }
285
286 /* ---------------------------------------------------------------------------------------------------- */
287
288 static SharedThreadData *
289 _g_dbus_shared_thread_ref (void)
290 {
291   static gsize shared_thread_data = 0;
292   SharedThreadData *ret;
293
294   if (g_once_init_enter (&shared_thread_data))
295     {
296       SharedThreadData *data;
297
298       /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
299       ensure_required_types ();
300
301       data = g_new0 (SharedThreadData, 1);
302       data->refcount = 0;
303       
304       data->context = g_main_context_new ();
305       data->loop = g_main_loop_new (data->context, FALSE);
306       data->thread = g_thread_new ("gdbus",
307                                    gdbus_shared_thread_func,
308                                    data);
309       /* We can cast between gsize and gpointer safely */
310       g_once_init_leave (&shared_thread_data, (gsize) data);
311     }
312
313   ret = (SharedThreadData*) shared_thread_data;
314   g_atomic_int_inc (&ret->refcount);
315   return ret;
316 }
317
318 static void
319 _g_dbus_shared_thread_unref (SharedThreadData *data)
320 {
321   /* TODO: actually destroy the shared thread here */
322 #if 0
323   g_assert (data != NULL);
324   if (g_atomic_int_dec_and_test (&data->refcount))
325     {
326       g_main_loop_quit (data->loop);
327       //g_thread_join (data->thread);
328       g_main_loop_unref (data->loop);
329       g_main_context_unref (data->context);
330     }
331 #endif
332 }
333
334 /* ---------------------------------------------------------------------------------------------------- */
335
336 typedef enum {
337     PENDING_NONE = 0,
338     PENDING_WRITE,
339     PENDING_FLUSH,
340     PENDING_CLOSE
341 } OutputPending;
342
343 struct GDBusWorker
344 {
345   volatile gint                       ref_count;
346
347   SharedThreadData                   *shared_thread_data;
348
349   /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
350   volatile gint                       stopped;
351
352   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
353    * only affects messages received from the other peer (since GDBusServer is the
354    * only user) - we might want it to affect messages sent to the other peer too?
355    */
356   gboolean                            frozen;
357   GDBusCapabilityFlags                capabilities;
358   GQueue                             *received_messages_while_frozen;
359
360   GIOStream                          *stream;
361   GCancellable                       *cancellable;
362   GDBusWorkerMessageReceivedCallback  message_received_callback;
363   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
364   GDBusWorkerDisconnectedCallback     disconnected_callback;
365   gpointer                            user_data;
366
367   /* if not NULL, stream is GSocketConnection */
368   GSocket *socket;
369
370   /* used for reading */
371   GMutex                              read_lock;
372   gchar                              *read_buffer;
373   gsize                               read_buffer_allocated_size;
374   gsize                               read_buffer_cur_size;
375   gsize                               read_buffer_bytes_wanted;
376   GUnixFDList                        *read_fd_list;
377   GSocketControlMessage             **read_ancillary_messages;
378   gint                                read_num_ancillary_messages;
379
380   /* Whether an async write, flush or close, or none of those, is pending.
381    * Only the worker thread may change its value, and only with the write_lock.
382    * Other threads may read its value when holding the write_lock.
383    * The worker thread may read its value at any time.
384    */
385   OutputPending                       output_pending;
386   /* used for writing */
387   GMutex                              write_lock;
388   /* queue of MessageToWriteData, protected by write_lock */
389   GQueue                             *write_queue;
390   /* protected by write_lock */
391   guint64                             write_num_messages_written;
392   /* number of messages we'd written out last time we flushed;
393    * protected by write_lock
394    */
395   guint64                             write_num_messages_flushed;
396   /* list of FlushData, protected by write_lock */
397   GList                              *write_pending_flushes;
398   /* list of CloseData, protected by write_lock */
399   GList                              *pending_close_attempts;
400   /* no lock - only used from the worker thread */
401   gboolean                            close_expected;
402 };
403
404 static void _g_dbus_worker_unref (GDBusWorker *worker);
405
406 /* ---------------------------------------------------------------------------------------------------- */
407
408 typedef struct
409 {
410   GMutex  mutex;
411   GCond   cond;
412   guint64 number_to_wait_for;
413   GError *error;
414 } FlushData;
415
416 struct _MessageToWriteData ;
417 typedef struct _MessageToWriteData MessageToWriteData;
418
419 static void message_to_write_data_free (MessageToWriteData *data);
420
421 static void read_message_print_transport_debug (gssize bytes_read,
422                                                 GDBusWorker *worker);
423
424 static void write_message_print_transport_debug (gssize bytes_written,
425                                                  MessageToWriteData *data);
426
427 typedef struct {
428     GDBusWorker *worker;
429     GCancellable *cancellable;
430     GSimpleAsyncResult *result;
431 } CloseData;
432
433 static void close_data_free (CloseData *close_data)
434 {
435   if (close_data->cancellable != NULL)
436     g_object_unref (close_data->cancellable);
437
438   if (close_data->result != NULL)
439     g_object_unref (close_data->result);
440
441   _g_dbus_worker_unref (close_data->worker);
442   g_slice_free (CloseData, close_data);
443 }
444
445 /* ---------------------------------------------------------------------------------------------------- */
446
447 static GDBusWorker *
448 _g_dbus_worker_ref (GDBusWorker *worker)
449 {
450   g_atomic_int_inc (&worker->ref_count);
451   return worker;
452 }
453
454 static void
455 _g_dbus_worker_unref (GDBusWorker *worker)
456 {
457   if (g_atomic_int_dec_and_test (&worker->ref_count))
458     {
459       g_assert (worker->write_pending_flushes == NULL);
460
461       _g_dbus_shared_thread_unref (worker->shared_thread_data);
462
463       g_object_unref (worker->stream);
464
465       g_mutex_clear (&worker->read_lock);
466       g_object_unref (worker->cancellable);
467       if (worker->read_fd_list != NULL)
468         g_object_unref (worker->read_fd_list);
469
470       g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
471       g_mutex_clear (&worker->write_lock);
472       g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
473       g_free (worker->read_buffer);
474
475       g_free (worker);
476     }
477 }
478
479 static void
480 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
481                                   gboolean      remote_peer_vanished,
482                                   GError       *error)
483 {
484   if (!g_atomic_int_get (&worker->stopped))
485     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
486 }
487
488 static void
489 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
490                                       GDBusMessage *message)
491 {
492   if (!g_atomic_int_get (&worker->stopped))
493     worker->message_received_callback (worker, message, worker->user_data);
494 }
495
496 static GDBusMessage *
497 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
498                                               GDBusMessage *message)
499 {
500   GDBusMessage *ret;
501   if (!g_atomic_int_get (&worker->stopped))
502     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
503   else
504     ret = message;
505   return ret;
506 }
507
508 /* can only be called from private thread with read-lock held - takes ownership of @message */
509 static void
510 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
511                                                   GDBusMessage *message)
512 {
513   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
514     {
515       /* queue up */
516       g_queue_push_tail (worker->received_messages_while_frozen, message);
517     }
518   else
519     {
520       /* not frozen, nor anything in queue */
521       _g_dbus_worker_emit_message_received (worker, message);
522       g_object_unref (message);
523     }
524 }
525
526 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
527 static gboolean
528 unfreeze_in_idle_cb (gpointer user_data)
529 {
530   GDBusWorker *worker = user_data;
531   GDBusMessage *message;
532
533   g_mutex_lock (&worker->read_lock);
534   if (worker->frozen)
535     {
536       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
537         {
538           _g_dbus_worker_emit_message_received (worker, message);
539           g_object_unref (message);
540         }
541       worker->frozen = FALSE;
542     }
543   else
544     {
545       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
546     }
547   g_mutex_unlock (&worker->read_lock);
548   return FALSE;
549 }
550
551 /* can be called from any thread */
552 void
553 _g_dbus_worker_unfreeze (GDBusWorker *worker)
554 {
555   GSource *idle_source;
556   idle_source = g_idle_source_new ();
557   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
558   g_source_set_callback (idle_source,
559                          unfreeze_in_idle_cb,
560                          _g_dbus_worker_ref (worker),
561                          (GDestroyNotify) _g_dbus_worker_unref);
562   g_source_attach (idle_source, worker->shared_thread_data->context);
563   g_source_unref (idle_source);
564 }
565
566 /* ---------------------------------------------------------------------------------------------------- */
567
568 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
569
570 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
571 static void
572 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
573                            GAsyncResult  *res,
574                            gpointer       user_data)
575 {
576   GDBusWorker *worker = user_data;
577   GError *error;
578   gssize bytes_read;
579
580   g_mutex_lock (&worker->read_lock);
581
582   /* If already stopped, don't even process the reply */
583   if (g_atomic_int_get (&worker->stopped))
584     goto out;
585
586   error = NULL;
587   if (worker->socket == NULL)
588     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
589                                              res,
590                                              &error);
591   else
592     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
593                                                               res,
594                                                               &error);
595   if (worker->read_num_ancillary_messages > 0)
596     {
597       gint n;
598       for (n = 0; n < worker->read_num_ancillary_messages; n++)
599         {
600           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
601
602           if (FALSE)
603             {
604             }
605 #ifdef G_OS_UNIX
606           else if (G_IS_UNIX_FD_MESSAGE (control_message))
607             {
608               GUnixFDMessage *fd_message;
609               gint *fds;
610               gint num_fds;
611
612               fd_message = G_UNIX_FD_MESSAGE (control_message);
613               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
614               if (worker->read_fd_list == NULL)
615                 {
616                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
617                 }
618               else
619                 {
620                   gint n;
621                   for (n = 0; n < num_fds; n++)
622                     {
623                       /* TODO: really want a append_steal() */
624                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
625                       (void) g_close (fds[n], NULL);
626                     }
627                 }
628               g_free (fds);
629             }
630           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
631             {
632               /* do nothing */
633             }
634 #endif
635           else
636             {
637               if (error == NULL)
638                 {
639                   g_set_error (&error,
640                                G_IO_ERROR,
641                                G_IO_ERROR_FAILED,
642                                "Unexpected ancillary message of type %s received from peer",
643                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
644                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
645                   g_error_free (error);
646                   g_object_unref (control_message);
647                   n++;
648                   while (n < worker->read_num_ancillary_messages)
649                     g_object_unref (worker->read_ancillary_messages[n++]);
650                   g_free (worker->read_ancillary_messages);
651                   goto out;
652                 }
653             }
654           g_object_unref (control_message);
655         }
656       g_free (worker->read_ancillary_messages);
657     }
658
659   if (bytes_read == -1)
660     {
661       if (G_UNLIKELY (_g_dbus_debug_transport ()))
662         {
663           _g_dbus_debug_print_lock ();
664           g_print ("========================================================================\n"
665                    "GDBus-debug:Transport:\n"
666                    "  ---- READ ERROR on stream of type %s:\n"
667                    "  ---- %s %d: %s\n",
668                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
669                    g_quark_to_string (error->domain), error->code,
670                    error->message);
671           _g_dbus_debug_print_unlock ();
672         }
673
674       /* Every async read that uses this callback uses worker->cancellable
675        * as its GCancellable. worker->cancellable gets cancelled if and only
676        * if the GDBusConnection tells us to close (either via
677        * _g_dbus_worker_stop, which is called on last-unref, or directly),
678        * so a cancelled read must mean our connection was closed locally.
679        *
680        * If we're closing, other errors are possible - notably,
681        * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
682        * read in-flight. It seems sensible to treat all read errors during
683        * closing as an expected thing that doesn't trip exit-on-close.
684        *
685        * Because close_expected can't be set until we get into the worker
686        * thread, but the cancellable is signalled sooner (from another
687        * thread), we do still need to check the error.
688        */
689       if (worker->close_expected ||
690           g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
691         _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
692       else
693         _g_dbus_worker_emit_disconnected (worker, TRUE, error);
694
695       g_error_free (error);
696       goto out;
697     }
698
699 #if 0
700   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
701            (gint) bytes_read,
702            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
703            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
704            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
705                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
706            worker->stream,
707            worker);
708 #endif
709
710   /* TODO: hmm, hmm... */
711   if (bytes_read == 0)
712     {
713       g_set_error (&error,
714                    G_IO_ERROR,
715                    G_IO_ERROR_FAILED,
716                    "Underlying GIOStream returned 0 bytes on an async read");
717       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
718       g_error_free (error);
719       goto out;
720     }
721
722   read_message_print_transport_debug (bytes_read, worker);
723
724   worker->read_buffer_cur_size += bytes_read;
725   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
726     {
727       /* OK, got what we asked for! */
728       if (worker->read_buffer_bytes_wanted == 16)
729         {
730           gssize message_len;
731           /* OK, got the header - determine how many more bytes are needed */
732           error = NULL;
733           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
734                                                      16,
735                                                      &error);
736           if (message_len == -1)
737             {
738               g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
739               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
740               g_error_free (error);
741               goto out;
742             }
743
744           worker->read_buffer_bytes_wanted = message_len;
745           _g_dbus_worker_do_read_unlocked (worker);
746         }
747       else
748         {
749           GDBusMessage *message;
750           error = NULL;
751
752           /* TODO: use connection->priv->auth to decode the message */
753
754           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
755                                                   worker->read_buffer_cur_size,
756                                                   worker->capabilities,
757                                                   &error);
758           if (message == NULL)
759             {
760               gchar *s;
761               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
762               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
763                          "The error is: %s\n"
764                          "The payload is as follows:\n"
765                          "%s\n",
766                          worker->read_buffer_cur_size,
767                          error->message,
768                          s);
769               g_free (s);
770               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
771               g_error_free (error);
772               goto out;
773             }
774
775 #ifdef G_OS_UNIX
776           if (worker->read_fd_list != NULL)
777             {
778               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
779               g_object_unref (worker->read_fd_list);
780               worker->read_fd_list = NULL;
781             }
782 #endif
783
784           if (G_UNLIKELY (_g_dbus_debug_message ()))
785             {
786               gchar *s;
787               _g_dbus_debug_print_lock ();
788               g_print ("========================================================================\n"
789                        "GDBus-debug:Message:\n"
790                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
791                        worker->read_buffer_cur_size);
792               s = g_dbus_message_print (message, 2);
793               g_print ("%s", s);
794               g_free (s);
795               if (G_UNLIKELY (_g_dbus_debug_payload ()))
796                 {
797                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
798                   g_print ("%s\n", s);
799                   g_free (s);
800                 }
801               _g_dbus_debug_print_unlock ();
802             }
803
804           /* yay, got a message, go deliver it */
805           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
806
807           /* start reading another message! */
808           worker->read_buffer_bytes_wanted = 0;
809           worker->read_buffer_cur_size = 0;
810           _g_dbus_worker_do_read_unlocked (worker);
811         }
812     }
813   else
814     {
815       /* didn't get all the bytes we requested - so repeat the request... */
816       _g_dbus_worker_do_read_unlocked (worker);
817     }
818
819  out:
820   g_mutex_unlock (&worker->read_lock);
821
822   /* gives up the reference acquired when calling g_input_stream_read_async() */
823   _g_dbus_worker_unref (worker);
824 }
825
826 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
827 static void
828 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
829 {
830   /* Note that we do need to keep trying to read even if close_expected is
831    * true, because only failing a read causes us to signal 'closed'.
832    */
833
834   /* if bytes_wanted is zero, it means start reading a message */
835   if (worker->read_buffer_bytes_wanted == 0)
836     {
837       worker->read_buffer_cur_size = 0;
838       worker->read_buffer_bytes_wanted = 16;
839     }
840
841   /* ensure we have a (big enough) buffer */
842   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
843     {
844       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
845       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
846       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
847     }
848
849   if (worker->socket == NULL)
850     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
851                                worker->read_buffer + worker->read_buffer_cur_size,
852                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
853                                G_PRIORITY_DEFAULT,
854                                worker->cancellable,
855                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
856                                _g_dbus_worker_ref (worker));
857   else
858     {
859       worker->read_ancillary_messages = NULL;
860       worker->read_num_ancillary_messages = 0;
861       _g_socket_read_with_control_messages (worker->socket,
862                                             worker->read_buffer + worker->read_buffer_cur_size,
863                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
864                                             &worker->read_ancillary_messages,
865                                             &worker->read_num_ancillary_messages,
866                                             G_PRIORITY_DEFAULT,
867                                             worker->cancellable,
868                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
869                                             _g_dbus_worker_ref (worker));
870     }
871 }
872
873 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
874 static gboolean
875 _g_dbus_worker_do_initial_read (gpointer data)
876 {
877   GDBusWorker *worker = data;
878   g_mutex_lock (&worker->read_lock);
879   _g_dbus_worker_do_read_unlocked (worker);
880   g_mutex_unlock (&worker->read_lock);
881   return FALSE;
882 }
883
884 /* ---------------------------------------------------------------------------------------------------- */
885
886 struct _MessageToWriteData
887 {
888   GDBusWorker  *worker;
889   GDBusMessage *message;
890   gchar        *blob;
891   gsize         blob_size;
892
893   gsize               total_written;
894   GSimpleAsyncResult *simple;
895
896 };
897
898 static void
899 message_to_write_data_free (MessageToWriteData *data)
900 {
901   _g_dbus_worker_unref (data->worker);
902   if (data->message)
903     g_object_unref (data->message);
904   g_free (data->blob);
905   g_free (data);
906 }
907
908 /* ---------------------------------------------------------------------------------------------------- */
909
910 static void write_message_continue_writing (MessageToWriteData *data);
911
912 /* called in private thread shared by all GDBusConnection instances
913  *
914  * write-lock is not held on entry
915  * output_pending is PENDING_WRITE on entry
916  */
917 static void
918 write_message_async_cb (GObject      *source_object,
919                         GAsyncResult *res,
920                         gpointer      user_data)
921 {
922   MessageToWriteData *data = user_data;
923   GSimpleAsyncResult *simple;
924   gssize bytes_written;
925   GError *error;
926
927   /* Note: we can't access data->simple after calling g_async_result_complete () because the
928    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
929    */
930   simple = data->simple;
931
932   error = NULL;
933   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
934                                                 res,
935                                                 &error);
936   if (bytes_written == -1)
937     {
938       g_simple_async_result_take_error (simple, error);
939       g_simple_async_result_complete (simple);
940       g_object_unref (simple);
941       goto out;
942     }
943   g_assert (bytes_written > 0); /* zero is never returned */
944
945   write_message_print_transport_debug (bytes_written, data);
946
947   data->total_written += bytes_written;
948   g_assert (data->total_written <= data->blob_size);
949   if (data->total_written == data->blob_size)
950     {
951       g_simple_async_result_complete (simple);
952       g_object_unref (simple);
953       goto out;
954     }
955
956   write_message_continue_writing (data);
957
958  out:
959   ;
960 }
961
962 /* called in private thread shared by all GDBusConnection instances
963  *
964  * write-lock is not held on entry
965  * output_pending is PENDING_WRITE on entry
966  */
967 #ifdef G_OS_UNIX
968 static gboolean
969 on_socket_ready (GSocket      *socket,
970                  GIOCondition  condition,
971                  gpointer      user_data)
972 {
973   MessageToWriteData *data = user_data;
974   write_message_continue_writing (data);
975   return FALSE; /* remove source */
976 }
977 #endif
978
979 /* called in private thread shared by all GDBusConnection instances
980  *
981  * write-lock is not held on entry
982  * output_pending is PENDING_WRITE on entry
983  */
984 static void
985 write_message_continue_writing (MessageToWriteData *data)
986 {
987   GOutputStream *ostream;
988 #ifdef G_OS_UNIX
989   GSimpleAsyncResult *simple;
990   GUnixFDList *fd_list;
991 #endif
992
993 #ifdef G_OS_UNIX
994   /* Note: we can't access data->simple after calling g_async_result_complete () because the
995    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
996    */
997   simple = data->simple;
998 #endif
999
1000   ostream = g_io_stream_get_output_stream (data->worker->stream);
1001 #ifdef G_OS_UNIX
1002   fd_list = g_dbus_message_get_unix_fd_list (data->message);
1003 #endif
1004
1005   g_assert (!g_output_stream_has_pending (ostream));
1006   g_assert_cmpint (data->total_written, <, data->blob_size);
1007
1008   if (FALSE)
1009     {
1010     }
1011 #ifdef G_OS_UNIX
1012   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
1013     {
1014       GOutputVector vector;
1015       GSocketControlMessage *control_message;
1016       gssize bytes_written;
1017       GError *error;
1018
1019       vector.buffer = data->blob;
1020       vector.size = data->blob_size;
1021
1022       control_message = NULL;
1023       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
1024         {
1025           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
1026             {
1027               g_simple_async_result_set_error (simple,
1028                                                G_IO_ERROR,
1029                                                G_IO_ERROR_FAILED,
1030                                                "Tried sending a file descriptor but remote peer does not support this capability");
1031               g_simple_async_result_complete (simple);
1032               g_object_unref (simple);
1033               goto out;
1034             }
1035           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
1036         }
1037
1038       error = NULL;
1039       bytes_written = g_socket_send_message (data->worker->socket,
1040                                              NULL, /* address */
1041                                              &vector,
1042                                              1,
1043                                              control_message != NULL ? &control_message : NULL,
1044                                              control_message != NULL ? 1 : 0,
1045                                              G_SOCKET_MSG_NONE,
1046                                              data->worker->cancellable,
1047                                              &error);
1048       if (control_message != NULL)
1049         g_object_unref (control_message);
1050
1051       if (bytes_written == -1)
1052         {
1053           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1054           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1055             {
1056               GSource *source;
1057               source = g_socket_create_source (data->worker->socket,
1058                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
1059                                                data->worker->cancellable);
1060               g_source_set_callback (source,
1061                                      (GSourceFunc) on_socket_ready,
1062                                      data,
1063                                      NULL); /* GDestroyNotify */
1064               g_source_attach (source, g_main_context_get_thread_default ());
1065               g_source_unref (source);
1066               g_error_free (error);
1067               goto out;
1068             }
1069           g_simple_async_result_take_error (simple, error);
1070           g_simple_async_result_complete (simple);
1071           g_object_unref (simple);
1072           goto out;
1073         }
1074       g_assert (bytes_written > 0); /* zero is never returned */
1075
1076       write_message_print_transport_debug (bytes_written, data);
1077
1078       data->total_written += bytes_written;
1079       g_assert (data->total_written <= data->blob_size);
1080       if (data->total_written == data->blob_size)
1081         {
1082           g_simple_async_result_complete (simple);
1083           g_object_unref (simple);
1084           goto out;
1085         }
1086
1087       write_message_continue_writing (data);
1088     }
1089 #endif
1090   else
1091     {
1092 #ifdef G_OS_UNIX
1093       if (fd_list != NULL)
1094         {
1095           g_simple_async_result_set_error (simple,
1096                                            G_IO_ERROR,
1097                                            G_IO_ERROR_FAILED,
1098                                            "Tried sending a file descriptor on unsupported stream of type %s",
1099                                            g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1100           g_simple_async_result_complete (simple);
1101           g_object_unref (simple);
1102           goto out;
1103         }
1104 #endif
1105
1106       g_output_stream_write_async (ostream,
1107                                    (const gchar *) data->blob + data->total_written,
1108                                    data->blob_size - data->total_written,
1109                                    G_PRIORITY_DEFAULT,
1110                                    data->worker->cancellable,
1111                                    write_message_async_cb,
1112                                    data);
1113     }
1114 #ifdef G_OS_UNIX
1115  out:
1116 #endif
1117   ;
1118 }
1119
1120 /* called in private thread shared by all GDBusConnection instances
1121  *
1122  * write-lock is not held on entry
1123  * output_pending is PENDING_WRITE on entry
1124  */
1125 static void
1126 write_message_async (GDBusWorker         *worker,
1127                      MessageToWriteData  *data,
1128                      GAsyncReadyCallback  callback,
1129                      gpointer             user_data)
1130 {
1131   data->simple = g_simple_async_result_new (NULL,
1132                                             callback,
1133                                             user_data,
1134                                             write_message_async);
1135   data->total_written = 0;
1136   write_message_continue_writing (data);
1137 }
1138
1139 /* called in private thread shared by all GDBusConnection instances (with write-lock held) */
1140 static gboolean
1141 write_message_finish (GAsyncResult   *res,
1142                       GError        **error)
1143 {
1144   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1145   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1146     return FALSE;
1147   else
1148     return TRUE;
1149 }
1150 /* ---------------------------------------------------------------------------------------------------- */
1151
1152 static void continue_writing (GDBusWorker *worker);
1153
1154 typedef struct
1155 {
1156   GDBusWorker *worker;
1157   GList *flushers;
1158 } FlushAsyncData;
1159
1160 static void
1161 flush_data_list_complete (const GList  *flushers,
1162                           const GError *error)
1163 {
1164   const GList *l;
1165
1166   for (l = flushers; l != NULL; l = l->next)
1167     {
1168       FlushData *f = l->data;
1169
1170       f->error = error != NULL ? g_error_copy (error) : NULL;
1171
1172       g_mutex_lock (&f->mutex);
1173       g_cond_signal (&f->cond);
1174       g_mutex_unlock (&f->mutex);
1175     }
1176 }
1177
1178 /* called in private thread shared by all GDBusConnection instances
1179  *
1180  * write-lock is not held on entry
1181  * output_pending is PENDING_FLUSH on entry
1182  */
1183 static void
1184 ostream_flush_cb (GObject      *source_object,
1185                   GAsyncResult *res,
1186                   gpointer      user_data)
1187 {
1188   FlushAsyncData *data = user_data;
1189   GError *error;
1190
1191   error = NULL;
1192   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1193                                 res,
1194                                 &error);
1195
1196   if (error == NULL)
1197     {
1198       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1199         {
1200           _g_dbus_debug_print_lock ();
1201           g_print ("========================================================================\n"
1202                    "GDBus-debug:Transport:\n"
1203                    "  ---- FLUSHED stream of type %s\n",
1204                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1205           _g_dbus_debug_print_unlock ();
1206         }
1207     }
1208
1209   g_assert (data->flushers != NULL);
1210   flush_data_list_complete (data->flushers, error);
1211   g_list_free (data->flushers);
1212
1213   if (error != NULL)
1214     g_error_free (error);
1215
1216   /* Make sure we tell folks that we don't have additional
1217      flushes pending */
1218   g_mutex_lock (&data->worker->write_lock);
1219   data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
1220   g_assert (data->worker->output_pending == PENDING_FLUSH);
1221   data->worker->output_pending = PENDING_NONE;
1222   g_mutex_unlock (&data->worker->write_lock);
1223
1224   /* OK, cool, finally kick off the next write */
1225   continue_writing (data->worker);
1226
1227   _g_dbus_worker_unref (data->worker);
1228   g_free (data);
1229 }
1230
1231 /* called in private thread shared by all GDBusConnection instances
1232  *
1233  * write-lock is not held on entry
1234  * output_pending is PENDING_FLUSH on entry
1235  */
1236 static void
1237 start_flush (FlushAsyncData *data)
1238 {
1239   g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
1240                                G_PRIORITY_DEFAULT,
1241                                data->worker->cancellable,
1242                                ostream_flush_cb,
1243                                data);
1244 }
1245
1246 /* called in private thread shared by all GDBusConnection instances
1247  *
1248  * write-lock is held on entry
1249  * output_pending is PENDING_NONE on entry
1250  */
1251 static void
1252 message_written_unlocked (GDBusWorker *worker,
1253                           MessageToWriteData *message_data)
1254 {
1255   if (G_UNLIKELY (_g_dbus_debug_message ()))
1256     {
1257       gchar *s;
1258       _g_dbus_debug_print_lock ();
1259       g_print ("========================================================================\n"
1260                "GDBus-debug:Message:\n"
1261                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1262                message_data->blob_size);
1263       s = g_dbus_message_print (message_data->message, 2);
1264       g_print ("%s", s);
1265       g_free (s);
1266       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1267         {
1268           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1269           g_print ("%s\n", s);
1270           g_free (s);
1271         }
1272       _g_dbus_debug_print_unlock ();
1273     }
1274
1275   worker->write_num_messages_written += 1;
1276 }
1277
1278 /* called in private thread shared by all GDBusConnection instances
1279  *
1280  * write-lock is held on entry
1281  * output_pending is PENDING_NONE on entry
1282  *
1283  * Returns: non-%NULL, setting @output_pending, if we need to flush now
1284  */
1285 static FlushAsyncData *
1286 prepare_flush_unlocked (GDBusWorker *worker)
1287 {
1288   GList *l;
1289   GList *ll;
1290   GList *flushers;
1291
1292   flushers = NULL;
1293   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1294     {
1295       FlushData *f = l->data;
1296       ll = l->next;
1297
1298       if (f->number_to_wait_for == worker->write_num_messages_written)
1299         {
1300           flushers = g_list_append (flushers, f);
1301           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1302         }
1303     }
1304   if (flushers != NULL)
1305     {
1306       g_assert (worker->output_pending == PENDING_NONE);
1307       worker->output_pending = PENDING_FLUSH;
1308     }
1309
1310   if (flushers != NULL)
1311     {
1312       FlushAsyncData *data;
1313
1314       data = g_new0 (FlushAsyncData, 1);
1315       data->worker = _g_dbus_worker_ref (worker);
1316       data->flushers = flushers;
1317       return data;
1318     }
1319
1320   return NULL;
1321 }
1322
1323 /* called in private thread shared by all GDBusConnection instances
1324  *
1325  * write-lock is not held on entry
1326  * output_pending is PENDING_WRITE on entry
1327  */
1328 static void
1329 write_message_cb (GObject       *source_object,
1330                   GAsyncResult  *res,
1331                   gpointer       user_data)
1332 {
1333   MessageToWriteData *data = user_data;
1334   GError *error;
1335
1336   g_mutex_lock (&data->worker->write_lock);
1337   g_assert (data->worker->output_pending == PENDING_WRITE);
1338   data->worker->output_pending = PENDING_NONE;
1339
1340   error = NULL;
1341   if (!write_message_finish (res, &error))
1342     {
1343       g_mutex_unlock (&data->worker->write_lock);
1344
1345       /* TODO: handle */
1346       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1347       g_error_free (error);
1348
1349       g_mutex_lock (&data->worker->write_lock);
1350     }
1351
1352   message_written_unlocked (data->worker, data);
1353
1354   g_mutex_unlock (&data->worker->write_lock);
1355
1356   continue_writing (data->worker);
1357
1358   message_to_write_data_free (data);
1359 }
1360
1361 /* called in private thread shared by all GDBusConnection instances
1362  *
1363  * write-lock is not held on entry
1364  * output_pending is PENDING_CLOSE on entry
1365  */
1366 static void
1367 iostream_close_cb (GObject      *source_object,
1368                    GAsyncResult *res,
1369                    gpointer      user_data)
1370 {
1371   GDBusWorker *worker = user_data;
1372   GError *error = NULL;
1373   GList *pending_close_attempts, *pending_flush_attempts;
1374   GQueue *send_queue;
1375
1376   g_io_stream_close_finish (worker->stream, res, &error);
1377
1378   g_mutex_lock (&worker->write_lock);
1379
1380   pending_close_attempts = worker->pending_close_attempts;
1381   worker->pending_close_attempts = NULL;
1382
1383   pending_flush_attempts = worker->write_pending_flushes;
1384   worker->write_pending_flushes = NULL;
1385
1386   send_queue = worker->write_queue;
1387   worker->write_queue = g_queue_new ();
1388
1389   g_assert (worker->output_pending == PENDING_CLOSE);
1390   worker->output_pending = PENDING_NONE;
1391
1392   g_mutex_unlock (&worker->write_lock);
1393
1394   while (pending_close_attempts != NULL)
1395     {
1396       CloseData *close_data = pending_close_attempts->data;
1397
1398       pending_close_attempts = g_list_delete_link (pending_close_attempts,
1399                                                    pending_close_attempts);
1400
1401       if (close_data->result != NULL)
1402         {
1403           if (error != NULL)
1404             g_simple_async_result_set_from_error (close_data->result, error);
1405
1406           /* this must be in an idle because the result is likely to be
1407            * intended for another thread
1408            */
1409           g_simple_async_result_complete_in_idle (close_data->result);
1410         }
1411
1412       close_data_free (close_data);
1413     }
1414
1415   g_clear_error (&error);
1416
1417   /* all messages queued for sending are discarded */
1418   g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
1419   /* all queued flushes fail */
1420   error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
1421                        _("Operation was cancelled"));
1422   flush_data_list_complete (pending_flush_attempts, error);
1423   g_list_free (pending_flush_attempts);
1424   g_clear_error (&error);
1425
1426   _g_dbus_worker_unref (worker);
1427 }
1428
1429 /* called in private thread shared by all GDBusConnection instances
1430  *
1431  * write-lock is not held on entry
1432  * output_pending must be PENDING_NONE on entry
1433  */
1434 static void
1435 continue_writing (GDBusWorker *worker)
1436 {
1437   MessageToWriteData *data;
1438   FlushAsyncData *flush_async_data;
1439
1440  write_next:
1441   /* we mustn't try to write two things at once */
1442   g_assert (worker->output_pending == PENDING_NONE);
1443
1444   g_mutex_lock (&worker->write_lock);
1445
1446   data = NULL;
1447   flush_async_data = NULL;
1448
1449   /* if we want to close the connection, that takes precedence */
1450   if (worker->pending_close_attempts != NULL)
1451     {
1452       worker->close_expected = TRUE;
1453       worker->output_pending = PENDING_CLOSE;
1454
1455       g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
1456                                NULL, iostream_close_cb,
1457                                _g_dbus_worker_ref (worker));
1458     }
1459   else
1460     {
1461       flush_async_data = prepare_flush_unlocked (worker);
1462
1463       if (flush_async_data == NULL)
1464         {
1465           data = g_queue_pop_head (worker->write_queue);
1466
1467           if (data != NULL)
1468             worker->output_pending = PENDING_WRITE;
1469         }
1470     }
1471
1472   g_mutex_unlock (&worker->write_lock);
1473
1474   /* Note that write_lock is only used for protecting the @write_queue
1475    * and @output_pending fields of the GDBusWorker struct ... which we
1476    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1477    *
1478    * Therefore, it's fine to drop it here when calling back into user
1479    * code and then writing the message out onto the GIOStream since this
1480    * function only runs on the worker thread.
1481    */
1482
1483   if (flush_async_data != NULL)
1484     {
1485       start_flush (flush_async_data);
1486       g_assert (data == NULL);
1487     }
1488   else if (data != NULL)
1489     {
1490       GDBusMessage *old_message;
1491       guchar *new_blob;
1492       gsize new_blob_size;
1493       GError *error;
1494
1495       old_message = data->message;
1496       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1497       if (data->message == old_message)
1498         {
1499           /* filters had no effect - do nothing */
1500         }
1501       else if (data->message == NULL)
1502         {
1503           /* filters dropped message */
1504           g_mutex_lock (&worker->write_lock);
1505           worker->output_pending = PENDING_NONE;
1506           g_mutex_unlock (&worker->write_lock);
1507           message_to_write_data_free (data);
1508           goto write_next;
1509         }
1510       else
1511         {
1512           /* filters altered the message -> reencode */
1513           error = NULL;
1514           new_blob = g_dbus_message_to_blob (data->message,
1515                                              &new_blob_size,
1516                                              worker->capabilities,
1517                                              &error);
1518           if (new_blob == NULL)
1519             {
1520               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1521                * the old message instead
1522                */
1523               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1524                          g_dbus_message_get_serial (data->message),
1525                          error->message);
1526               g_error_free (error);
1527             }
1528           else
1529             {
1530               g_free (data->blob);
1531               data->blob = (gchar *) new_blob;
1532               data->blob_size = new_blob_size;
1533             }
1534         }
1535
1536       write_message_async (worker,
1537                            data,
1538                            write_message_cb,
1539                            data);
1540     }
1541 }
1542
1543 /* called in private thread shared by all GDBusConnection instances
1544  *
1545  * write-lock is not held on entry
1546  * output_pending may be anything
1547  */
1548 static gboolean
1549 continue_writing_in_idle_cb (gpointer user_data)
1550 {
1551   GDBusWorker *worker = user_data;
1552
1553   /* Because this is the worker thread, we can read this struct member
1554    * without holding the lock: no other thread ever modifies it.
1555    */
1556   if (worker->output_pending == PENDING_NONE)
1557     continue_writing (worker);
1558
1559   return FALSE;
1560 }
1561
1562 /*
1563  * @write_data: (transfer full) (allow-none):
1564  * @flush_data: (transfer full) (allow-none):
1565  * @close_data: (transfer full) (allow-none):
1566  *
1567  * Can be called from any thread
1568  *
1569  * write_lock is held on entry
1570  * output_pending may be anything
1571  */
1572 static void
1573 schedule_writing_unlocked (GDBusWorker        *worker,
1574                            MessageToWriteData *write_data,
1575                            FlushData          *flush_data,
1576                            CloseData          *close_data)
1577 {
1578   if (write_data != NULL)
1579     g_queue_push_tail (worker->write_queue, write_data);
1580
1581   if (flush_data != NULL)
1582     worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
1583
1584   if (close_data != NULL)
1585     worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
1586                                                      close_data);
1587
1588   /* If we had output pending, the next bit of output will happen
1589    * automatically when it finishes, so we only need to do this
1590    * if nothing was pending.
1591    *
1592    * The idle callback will re-check that output_pending is still
1593    * PENDING_NONE, to guard against output starting before the idle.
1594    */
1595   if (worker->output_pending == PENDING_NONE)
1596     {
1597       GSource *idle_source;
1598       idle_source = g_idle_source_new ();
1599       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1600       g_source_set_callback (idle_source,
1601                              continue_writing_in_idle_cb,
1602                              _g_dbus_worker_ref (worker),
1603                              (GDestroyNotify) _g_dbus_worker_unref);
1604       g_source_attach (idle_source, worker->shared_thread_data->context);
1605       g_source_unref (idle_source);
1606     }
1607 }
1608
1609 /* ---------------------------------------------------------------------------------------------------- */
1610
1611 /* can be called from any thread - steals blob
1612  *
1613  * write_lock is not held on entry
1614  * output_pending may be anything
1615  */
1616 void
1617 _g_dbus_worker_send_message (GDBusWorker    *worker,
1618                              GDBusMessage   *message,
1619                              gchar          *blob,
1620                              gsize           blob_len)
1621 {
1622   MessageToWriteData *data;
1623
1624   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1625   g_return_if_fail (blob != NULL);
1626   g_return_if_fail (blob_len > 16);
1627
1628   data = g_new0 (MessageToWriteData, 1);
1629   data->worker = _g_dbus_worker_ref (worker);
1630   data->message = g_object_ref (message);
1631   data->blob = blob; /* steal! */
1632   data->blob_size = blob_len;
1633
1634   g_mutex_lock (&worker->write_lock);
1635   schedule_writing_unlocked (worker, data, NULL, NULL);
1636   g_mutex_unlock (&worker->write_lock);
1637 }
1638
1639 /* ---------------------------------------------------------------------------------------------------- */
1640
1641 GDBusWorker *
1642 _g_dbus_worker_new (GIOStream                              *stream,
1643                     GDBusCapabilityFlags                    capabilities,
1644                     gboolean                                initially_frozen,
1645                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1646                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1647                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1648                     gpointer                                user_data)
1649 {
1650   GDBusWorker *worker;
1651   GSource *idle_source;
1652
1653   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1654   g_return_val_if_fail (message_received_callback != NULL, NULL);
1655   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1656   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1657
1658   worker = g_new0 (GDBusWorker, 1);
1659   worker->ref_count = 1;
1660
1661   g_mutex_init (&worker->read_lock);
1662   worker->message_received_callback = message_received_callback;
1663   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1664   worker->disconnected_callback = disconnected_callback;
1665   worker->user_data = user_data;
1666   worker->stream = g_object_ref (stream);
1667   worker->capabilities = capabilities;
1668   worker->cancellable = g_cancellable_new ();
1669   worker->output_pending = PENDING_NONE;
1670
1671   worker->frozen = initially_frozen;
1672   worker->received_messages_while_frozen = g_queue_new ();
1673
1674   g_mutex_init (&worker->write_lock);
1675   worker->write_queue = g_queue_new ();
1676
1677   if (G_IS_SOCKET_CONNECTION (worker->stream))
1678     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1679
1680   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1681
1682   /* begin reading */
1683   idle_source = g_idle_source_new ();
1684   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1685   g_source_set_callback (idle_source,
1686                          _g_dbus_worker_do_initial_read,
1687                          _g_dbus_worker_ref (worker),
1688                          (GDestroyNotify) _g_dbus_worker_unref);
1689   g_source_attach (idle_source, worker->shared_thread_data->context);
1690   g_source_unref (idle_source);
1691
1692   return worker;
1693 }
1694
1695 /* ---------------------------------------------------------------------------------------------------- */
1696
1697 /* can be called from any thread
1698  *
1699  * write_lock is not held on entry
1700  * output_pending may be anything
1701  */
1702 void
1703 _g_dbus_worker_close (GDBusWorker         *worker,
1704                       GCancellable        *cancellable,
1705                       GSimpleAsyncResult  *result)
1706 {
1707   CloseData *close_data;
1708
1709   close_data = g_slice_new0 (CloseData);
1710   close_data->worker = _g_dbus_worker_ref (worker);
1711   close_data->cancellable =
1712       (cancellable == NULL ? NULL : g_object_ref (cancellable));
1713   close_data->result = (result == NULL ? NULL : g_object_ref (result));
1714
1715   /* Don't set worker->close_expected here - we're in the wrong thread.
1716    * It'll be set before the actual close happens.
1717    */
1718   g_cancellable_cancel (worker->cancellable);
1719   g_mutex_lock (&worker->write_lock);
1720   schedule_writing_unlocked (worker, NULL, NULL, close_data);
1721   g_mutex_unlock (&worker->write_lock);
1722 }
1723
1724 /* This can be called from any thread - frees worker. Note that
1725  * callbacks might still happen if called from another thread than the
1726  * worker - use your own synchronization primitive in the callbacks.
1727  *
1728  * write_lock is not held on entry
1729  * output_pending may be anything
1730  */
1731 void
1732 _g_dbus_worker_stop (GDBusWorker *worker)
1733 {
1734   g_atomic_int_set (&worker->stopped, TRUE);
1735
1736   /* Cancel any pending operations and schedule a close of the underlying I/O
1737    * stream in the worker thread
1738    */
1739   _g_dbus_worker_close (worker, NULL, NULL);
1740
1741   /* _g_dbus_worker_close holds a ref until after an idle in the worker
1742    * thread has run, so we no longer need to unref in an idle like in
1743    * commit 322e25b535
1744    */
1745   _g_dbus_worker_unref (worker);
1746 }
1747
1748 /* ---------------------------------------------------------------------------------------------------- */
1749
1750 /* can be called from any thread (except the worker thread) - blocks
1751  * calling thread until all queued outgoing messages are written and
1752  * the transport has been flushed
1753  *
1754  * write_lock is not held on entry
1755  * output_pending may be anything
1756  */
1757 gboolean
1758 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1759                            GCancellable   *cancellable,
1760                            GError        **error)
1761 {
1762   gboolean ret;
1763   FlushData *data;
1764   guint64 pending_writes;
1765
1766   data = NULL;
1767   ret = TRUE;
1768
1769   g_mutex_lock (&worker->write_lock);
1770
1771   /* if the queue is empty, no write is in-flight and we haven't written
1772    * anything since the last flush, then there's nothing to wait for
1773    */
1774   pending_writes = g_queue_get_length (worker->write_queue);
1775
1776   /* if a write is in-flight, we shouldn't be satisfied until the first
1777    * flush operation that follows it
1778    */
1779   if (worker->output_pending == PENDING_WRITE)
1780     pending_writes += 1;
1781
1782   if (pending_writes > 0 ||
1783       worker->write_num_messages_written != worker->write_num_messages_flushed)
1784     {
1785       data = g_new0 (FlushData, 1);
1786       g_mutex_init (&data->mutex);
1787       g_cond_init (&data->cond);
1788       data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
1789       g_mutex_lock (&data->mutex);
1790
1791       schedule_writing_unlocked (worker, NULL, data, NULL);
1792     }
1793   g_mutex_unlock (&worker->write_lock);
1794
1795   if (data != NULL)
1796     {
1797       g_cond_wait (&data->cond, &data->mutex);
1798       g_mutex_unlock (&data->mutex);
1799
1800       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1801       g_cond_clear (&data->cond);
1802       g_mutex_clear (&data->mutex);
1803       if (data->error != NULL)
1804         {
1805           ret = FALSE;
1806           g_propagate_error (error, data->error);
1807         }
1808       g_free (data);
1809     }
1810
1811   return ret;
1812 }
1813
1814 /* ---------------------------------------------------------------------------------------------------- */
1815
1816 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1817 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1818 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1819 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1820 #define G_DBUS_DEBUG_CALL           (1<<4)
1821 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1822 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1823 #define G_DBUS_DEBUG_RETURN         (1<<7)
1824 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1825 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1826
1827 static gint _gdbus_debug_flags = 0;
1828
1829 gboolean
1830 _g_dbus_debug_authentication (void)
1831 {
1832   _g_dbus_initialize ();
1833   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1834 }
1835
1836 gboolean
1837 _g_dbus_debug_transport (void)
1838 {
1839   _g_dbus_initialize ();
1840   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1841 }
1842
1843 gboolean
1844 _g_dbus_debug_message (void)
1845 {
1846   _g_dbus_initialize ();
1847   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1848 }
1849
1850 gboolean
1851 _g_dbus_debug_payload (void)
1852 {
1853   _g_dbus_initialize ();
1854   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1855 }
1856
1857 gboolean
1858 _g_dbus_debug_call (void)
1859 {
1860   _g_dbus_initialize ();
1861   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1862 }
1863
1864 gboolean
1865 _g_dbus_debug_signal (void)
1866 {
1867   _g_dbus_initialize ();
1868   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1869 }
1870
1871 gboolean
1872 _g_dbus_debug_incoming (void)
1873 {
1874   _g_dbus_initialize ();
1875   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1876 }
1877
1878 gboolean
1879 _g_dbus_debug_return (void)
1880 {
1881   _g_dbus_initialize ();
1882   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1883 }
1884
1885 gboolean
1886 _g_dbus_debug_emission (void)
1887 {
1888   _g_dbus_initialize ();
1889   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1890 }
1891
1892 gboolean
1893 _g_dbus_debug_address (void)
1894 {
1895   _g_dbus_initialize ();
1896   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1897 }
1898
1899 G_LOCK_DEFINE_STATIC (print_lock);
1900
1901 void
1902 _g_dbus_debug_print_lock (void)
1903 {
1904   G_LOCK (print_lock);
1905 }
1906
1907 void
1908 _g_dbus_debug_print_unlock (void)
1909 {
1910   G_UNLOCK (print_lock);
1911 }
1912
1913 /*
1914  * _g_dbus_initialize:
1915  *
1916  * Does various one-time init things such as
1917  *
1918  *  - registering the G_DBUS_ERROR error domain
1919  *  - parses the G_DBUS_DEBUG environment variable
1920  */
1921 void
1922 _g_dbus_initialize (void)
1923 {
1924   static volatile gsize initialized = 0;
1925
1926   if (g_once_init_enter (&initialized))
1927     {
1928       volatile GQuark g_dbus_error_domain;
1929       const gchar *debug;
1930
1931       g_dbus_error_domain = G_DBUS_ERROR;
1932       (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
1933
1934       debug = g_getenv ("G_DBUS_DEBUG");
1935       if (debug != NULL)
1936         {
1937           const GDebugKey keys[] = {
1938             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1939             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
1940             { "message",        G_DBUS_DEBUG_MESSAGE        },
1941             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
1942             { "call",           G_DBUS_DEBUG_CALL           },
1943             { "signal",         G_DBUS_DEBUG_SIGNAL         },
1944             { "incoming",       G_DBUS_DEBUG_INCOMING       },
1945             { "return",         G_DBUS_DEBUG_RETURN         },
1946             { "emission",       G_DBUS_DEBUG_EMISSION       },
1947             { "address",        G_DBUS_DEBUG_ADDRESS        }
1948           };
1949
1950           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1951           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1952             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1953         }
1954
1955       g_once_init_leave (&initialized, 1);
1956     }
1957 }
1958
1959 /* ---------------------------------------------------------------------------------------------------- */
1960
1961 GVariantType *
1962 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1963 {
1964   const GVariantType *arg_types[256];
1965   guint n;
1966
1967   if (args)
1968     for (n = 0; args[n] != NULL; n++)
1969       {
1970         /* DBus places a hard limit of 255 on signature length.
1971          * therefore number of args must be less than 256.
1972          */
1973         g_assert (n < 256);
1974
1975         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1976
1977         if G_UNLIKELY (arg_types[n] == NULL)
1978           return NULL;
1979       }
1980   else
1981     n = 0;
1982
1983   return g_variant_type_new_tuple (arg_types, n);
1984 }
1985
1986 /* ---------------------------------------------------------------------------------------------------- */
1987
1988 #ifdef G_OS_WIN32
1989
1990 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1991
1992 gchar *
1993 _g_dbus_win32_get_user_sid (void)
1994 {
1995   HANDLE h;
1996   TOKEN_USER *user;
1997   DWORD token_information_len;
1998   PSID psid;
1999   gchar *sid;
2000   gchar *ret;
2001
2002   ret = NULL;
2003   user = NULL;
2004   h = INVALID_HANDLE_VALUE;
2005
2006   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
2007     {
2008       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
2009       goto out;
2010     }
2011
2012   /* Get length of buffer */
2013   token_information_len = 0;
2014   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
2015     {
2016       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
2017         {
2018           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2019           goto out;
2020         }
2021     }
2022   user = g_malloc (token_information_len);
2023   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
2024     {
2025       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2026       goto out;
2027     }
2028
2029   psid = user->User.Sid;
2030   if (!IsValidSid (psid))
2031     {
2032       g_warning ("Invalid SID");
2033       goto out;
2034     }
2035
2036   if (!ConvertSidToStringSidA (psid, &sid))
2037     {
2038       g_warning ("Invalid SID");
2039       goto out;
2040     }
2041
2042   ret = g_strdup (sid);
2043   LocalFree (sid);
2044
2045 out:
2046   g_free (user);
2047   if (h != INVALID_HANDLE_VALUE)
2048     CloseHandle (h);
2049   return ret;
2050 }
2051 #endif
2052
2053 /* ---------------------------------------------------------------------------------------------------- */
2054
2055 gchar *
2056 _g_dbus_get_machine_id (GError **error)
2057 {
2058 #ifdef G_OS_WIN32
2059   HW_PROFILE_INFOA info;
2060   char *src, *dest, *res;
2061   int i;
2062
2063   if (!GetCurrentHwProfileA (&info))
2064     {
2065       char *message = g_win32_error_message (GetLastError ());
2066       g_set_error (error,
2067                    G_IO_ERROR,
2068                    G_IO_ERROR_FAILED,
2069                    _("Unable to get Hardware profile: %s"), message);
2070       g_free (message);
2071       return NULL;
2072     }
2073
2074   /* Form: {12340001-4980-1920-6788-123456789012} */
2075   src = &info.szHwProfileGuid[0];
2076
2077   res = g_malloc (32+1);
2078   dest = res;
2079
2080   src++; /* Skip { */
2081   for (i = 0; i < 8; i++)
2082     *dest++ = *src++;
2083   src++; /* Skip - */
2084   for (i = 0; i < 4; i++)
2085     *dest++ = *src++;
2086   src++; /* Skip - */
2087   for (i = 0; i < 4; i++)
2088     *dest++ = *src++;
2089   src++; /* Skip - */
2090   for (i = 0; i < 4; i++)
2091     *dest++ = *src++;
2092   src++; /* Skip - */
2093   for (i = 0; i < 12; i++)
2094     *dest++ = *src++;
2095   *dest = 0;
2096
2097   return res;
2098 #else
2099   gchar *ret;
2100   GError *first_error;
2101   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
2102   ret = NULL;
2103   first_error = NULL;
2104   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
2105                             &ret,
2106                             NULL,
2107                             &first_error) &&
2108       !g_file_get_contents ("/etc/machine-id",
2109                             &ret,
2110                             NULL,
2111                             NULL))
2112     {
2113       g_propagate_prefixed_error (error, first_error,
2114                                   _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
2115     }
2116   else
2117     {
2118       /* ignore the error from the first try, if any */
2119       g_clear_error (&first_error);
2120       /* TODO: validate value */
2121       g_strstrip (ret);
2122     }
2123   return ret;
2124 #endif
2125 }
2126
2127 /* ---------------------------------------------------------------------------------------------------- */
2128
2129 gchar *
2130 _g_dbus_enum_to_string (GType enum_type, gint value)
2131 {
2132   gchar *ret;
2133   GEnumClass *klass;
2134   GEnumValue *enum_value;
2135
2136   klass = g_type_class_ref (enum_type);
2137   enum_value = g_enum_get_value (klass, value);
2138   if (enum_value != NULL)
2139     ret = g_strdup (enum_value->value_nick);
2140   else
2141     ret = g_strdup_printf ("unknown (value %d)", value);
2142   g_type_class_unref (klass);
2143   return ret;
2144 }
2145
2146 /* ---------------------------------------------------------------------------------------------------- */
2147
2148 static void
2149 write_message_print_transport_debug (gssize bytes_written,
2150                                      MessageToWriteData *data)
2151 {
2152   if (G_LIKELY (!_g_dbus_debug_transport ()))
2153     goto out;
2154
2155   _g_dbus_debug_print_lock ();
2156   g_print ("========================================================================\n"
2157            "GDBus-debug:Transport:\n"
2158            "  >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2159            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
2160            bytes_written,
2161            g_dbus_message_get_serial (data->message),
2162            data->blob_size,
2163            data->total_written,
2164            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
2165   _g_dbus_debug_print_unlock ();
2166  out:
2167   ;
2168 }
2169
2170 /* ---------------------------------------------------------------------------------------------------- */
2171
2172 static void
2173 read_message_print_transport_debug (gssize bytes_read,
2174                                     GDBusWorker *worker)
2175 {
2176   gsize size;
2177   gint32 serial;
2178   gint32 message_length;
2179
2180   if (G_LIKELY (!_g_dbus_debug_transport ()))
2181     goto out;
2182
2183   size = bytes_read + worker->read_buffer_cur_size;
2184   serial = 0;
2185   message_length = 0;
2186   if (size >= 16)
2187     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
2188   if (size >= 1)
2189     {
2190       switch (worker->read_buffer[0])
2191         {
2192         case 'l':
2193           if (size >= 12)
2194             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
2195           break;
2196         case 'B':
2197           if (size >= 12)
2198             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
2199           break;
2200         default:
2201           /* an error will be set elsewhere if this happens */
2202           goto out;
2203         }
2204     }
2205
2206     _g_dbus_debug_print_lock ();
2207   g_print ("========================================================================\n"
2208            "GDBus-debug:Transport:\n"
2209            "  <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2210            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
2211            bytes_read,
2212            serial,
2213            message_length,
2214            worker->read_buffer_cur_size,
2215            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
2216   _g_dbus_debug_print_unlock ();
2217  out:
2218   ;
2219 }
2220
2221 /* ---------------------------------------------------------------------------------------------------- */
2222
2223 gboolean
2224 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
2225                                      GValue                *return_accu,
2226                                      const GValue          *handler_return,
2227                                      gpointer               dummy)
2228 {
2229   gboolean continue_emission;
2230   gboolean signal_return;
2231
2232   signal_return = g_value_get_boolean (handler_return);
2233   g_value_set_boolean (return_accu, signal_return);
2234   continue_emission = signal_return;
2235
2236   return continue_emission;
2237 }