Merge 2.53.2 into tizen
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: David Zeuthen <davidz@redhat.com>
19  */
20
21 #include "config.h"
22
23 #include <stdlib.h>
24 #include <string.h>
25
26 #include "giotypes.h"
27 #include "gsocket.h"
28 #include "gdbusprivate.h"
29 #include "gdbusmessage.h"
30 #include "gdbusconnection.h"
31 #include "gdbusproxy.h"
32 #include "gdbuserror.h"
33 #include "gdbusintrospection.h"
34 #include "gtask.h"
35 #include "ginputstream.h"
36 #include "gmemoryinputstream.h"
37 #include "giostream.h"
38 #include "glib/gstdio.h"
39 #include "gsocketcontrolmessage.h"
40 #include "gsocketconnection.h"
41 #include "gsocketoutputstream.h"
42
43 #ifdef G_OS_UNIX
44 #include "gunixfdmessage.h"
45 #include "gunixconnection.h"
46 #include "gunixcredentialsmessage.h"
47 #endif
48
49 #ifdef G_OS_WIN32
50 #include <windows.h>
51 #endif
52
53 #include "glibintl.h"
54
55 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
56 static void schedule_pending_close (GDBusWorker *worker);
57
58 /* ---------------------------------------------------------------------------------------------------- */
59
60 gchar *
61 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
62 {
63  guint n, m;
64  GString *ret;
65
66  ret = g_string_new (NULL);
67
68  for (n = 0; n < len; n += 16)
69    {
70      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
71
72      for (m = n; m < n + 16; m++)
73        {
74          if (m > n && (m%4) == 0)
75            g_string_append_c (ret, ' ');
76          if (m < len)
77            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
78          else
79            g_string_append (ret, "   ");
80        }
81
82      g_string_append (ret, "   ");
83
84      for (m = n; m < len && m < n + 16; m++)
85        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
86
87      g_string_append_c (ret, '\n');
88    }
89
90  return g_string_free (ret, FALSE);
91 }
92
93 /* ---------------------------------------------------------------------------------------------------- */
94
95 /* Unfortunately ancillary messages are discarded when reading from a
96  * socket using the GSocketInputStream abstraction. So we provide a
97  * very GInputStream-ish API that uses GSocket in this case (very
98  * similar to GSocketInputStream).
99  */
100
101 typedef struct
102 {
103   void *buffer;
104   gsize count;
105
106   GSocketControlMessage ***messages;
107   gint *num_messages;
108 } ReadWithControlData;
109
110 static void
111 read_with_control_data_free (ReadWithControlData *data)
112 {
113   g_slice_free (ReadWithControlData, data);
114 }
115
116 static gboolean
117 _g_socket_read_with_control_messages_ready (GSocket      *socket,
118                                             GIOCondition  condition,
119                                             gpointer      user_data)
120 {
121   GTask *task = user_data;
122   ReadWithControlData *data = g_task_get_task_data (task);
123   GError *error;
124   gssize result;
125   GInputVector vector;
126
127   error = NULL;
128   vector.buffer = data->buffer;
129   vector.size = data->count;
130   result = g_socket_receive_message (socket,
131                                      NULL, /* address */
132                                      &vector,
133                                      1,
134                                      data->messages,
135                                      data->num_messages,
136                                      NULL,
137                                      g_task_get_cancellable (task),
138                                      &error);
139
140   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
141     {
142       g_error_free (error);
143       return TRUE;
144     }
145
146   g_assert (result >= 0 || error != NULL);
147   if (result >= 0)
148     g_task_return_int (task, result);
149   else
150     g_task_return_error (task, error);
151   g_object_unref (task);
152
153   return FALSE;
154 }
155
156 static void
157 _g_socket_read_with_control_messages (GSocket                 *socket,
158                                       void                    *buffer,
159                                       gsize                    count,
160                                       GSocketControlMessage ***messages,
161                                       gint                    *num_messages,
162                                       gint                     io_priority,
163                                       GCancellable            *cancellable,
164                                       GAsyncReadyCallback      callback,
165                                       gpointer                 user_data)
166 {
167   GTask *task;
168   ReadWithControlData *data;
169   GSource *source;
170
171   data = g_slice_new0 (ReadWithControlData);
172   data->buffer = buffer;
173   data->count = count;
174   data->messages = messages;
175   data->num_messages = num_messages;
176
177   task = g_task_new (socket, cancellable, callback, user_data);
178   g_task_set_source_tag (task, _g_socket_read_with_control_messages);
179   g_task_set_task_data (task, data, (GDestroyNotify) read_with_control_data_free);
180
181   if (g_socket_condition_check (socket, G_IO_IN))
182     {
183       if (!_g_socket_read_with_control_messages_ready (socket, G_IO_IN, task))
184         return;
185     }
186
187   source = g_socket_create_source (socket,
188                                    G_IO_IN | G_IO_HUP | G_IO_ERR,
189                                    cancellable);
190   g_task_attach_source (task, source, (GSourceFunc) _g_socket_read_with_control_messages_ready);
191   g_source_unref (source);
192 }
193
194 static gssize
195 _g_socket_read_with_control_messages_finish (GSocket       *socket,
196                                              GAsyncResult  *result,
197                                              GError       **error)
198 {
199   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
200   g_return_val_if_fail (g_task_is_valid (result, socket), -1);
201
202   return g_task_propagate_int (G_TASK (result), error);
203 }
204
205 /* ---------------------------------------------------------------------------------------------------- */
206
207 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=674885
208    and see also the original https://bugzilla.gnome.org/show_bug.cgi?id=627724  */
209
210 static GPtrArray *ensured_classes = NULL;
211
212 static void
213 ensure_type (GType gtype)
214 {
215   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
216 }
217
218 static void
219 release_required_types (void)
220 {
221   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
222   g_ptr_array_unref (ensured_classes);
223   ensured_classes = NULL;
224 }
225
226 static void
227 ensure_required_types (void)
228 {
229   g_assert (ensured_classes == NULL);
230   ensured_classes = g_ptr_array_new ();
231   /* Generally in this list, you should initialize types which are used as
232    * properties first, then the class which has them. For example, GDBusProxy
233    * has a type of GDBusConnection, so we initialize GDBusConnection first.
234    * And because GDBusConnection has a property of type GDBusConnectionFlags,
235    * we initialize that first.
236    *
237    * Similarly, GSocket has a type of GSocketAddress.
238    *
239    * We don't fill out the whole dependency tree right now because in practice
240    * it tends to be just types that GDBus use that cause pain, and there
241    * is work on a more general approach in https://bugzilla.gnome.org/show_bug.cgi?id=674885
242    */
243   ensure_type (G_TYPE_TASK);
244   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
245   ensure_type (G_TYPE_DBUS_CONNECTION_FLAGS);
246   ensure_type (G_TYPE_DBUS_CAPABILITY_FLAGS);
247   ensure_type (G_TYPE_DBUS_AUTH_OBSERVER);
248   ensure_type (G_TYPE_DBUS_CONNECTION);
249   ensure_type (G_TYPE_DBUS_PROXY);
250   ensure_type (G_TYPE_SOCKET_FAMILY);
251   ensure_type (G_TYPE_SOCKET_TYPE);
252   ensure_type (G_TYPE_SOCKET_PROTOCOL);
253   ensure_type (G_TYPE_SOCKET_ADDRESS);
254   ensure_type (G_TYPE_SOCKET);
255 }
256 /* ---------------------------------------------------------------------------------------------------- */
257
258 typedef struct
259 {
260   volatile gint refcount;
261   GThread *thread;
262   GMainContext *context;
263   GMainLoop *loop;
264 } SharedThreadData;
265
266 static gpointer
267 gdbus_shared_thread_func (gpointer user_data)
268 {
269   SharedThreadData *data = user_data;
270
271   g_main_context_push_thread_default (data->context);
272   g_main_loop_run (data->loop);
273   g_main_context_pop_thread_default (data->context);
274
275   release_required_types ();
276
277   return NULL;
278 }
279
280 /* ---------------------------------------------------------------------------------------------------- */
281
282 static SharedThreadData *
283 _g_dbus_shared_thread_ref (void)
284 {
285   static gsize shared_thread_data = 0;
286   SharedThreadData *ret;
287
288   if (g_once_init_enter (&shared_thread_data))
289     {
290       SharedThreadData *data;
291
292       data = g_new0 (SharedThreadData, 1);
293       data->refcount = 0;
294       
295       data->context = g_main_context_new ();
296       data->loop = g_main_loop_new (data->context, FALSE);
297       data->thread = g_thread_new ("gdbus",
298                                    gdbus_shared_thread_func,
299                                    data);
300       /* We can cast between gsize and gpointer safely */
301       g_once_init_leave (&shared_thread_data, (gsize) data);
302     }
303
304   ret = (SharedThreadData*) shared_thread_data;
305   g_atomic_int_inc (&ret->refcount);
306   return ret;
307 }
308
309 static void
310 _g_dbus_shared_thread_unref (SharedThreadData *data)
311 {
312   /* TODO: actually destroy the shared thread here */
313 #if 0
314   g_assert (data != NULL);
315   if (g_atomic_int_dec_and_test (&data->refcount))
316     {
317       g_main_loop_quit (data->loop);
318       //g_thread_join (data->thread);
319       g_main_loop_unref (data->loop);
320       g_main_context_unref (data->context);
321     }
322 #endif
323 }
324
325 /* ---------------------------------------------------------------------------------------------------- */
326
327 typedef enum {
328     PENDING_NONE = 0,
329     PENDING_WRITE,
330     PENDING_FLUSH,
331     PENDING_CLOSE
332 } OutputPending;
333
334 struct GDBusWorker
335 {
336   volatile gint                       ref_count;
337
338   SharedThreadData                   *shared_thread_data;
339
340   /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
341   volatile gint                       stopped;
342
343   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
344    * only affects messages received from the other peer (since GDBusServer is the
345    * only user) - we might want it to affect messages sent to the other peer too?
346    */
347   gboolean                            frozen;
348   GDBusCapabilityFlags                capabilities;
349   GQueue                             *received_messages_while_frozen;
350
351   GIOStream                          *stream;
352   GCancellable                       *cancellable;
353   GDBusWorkerMessageReceivedCallback  message_received_callback;
354   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
355   GDBusWorkerDisconnectedCallback     disconnected_callback;
356   gpointer                            user_data;
357
358   /* if not NULL, stream is GSocketConnection */
359   GSocket *socket;
360
361   /* used for reading */
362   GMutex                              read_lock;
363   gchar                              *read_buffer;
364   gsize                               read_buffer_allocated_size;
365   gsize                               read_buffer_cur_size;
366   gsize                               read_buffer_bytes_wanted;
367   GUnixFDList                        *read_fd_list;
368   GSocketControlMessage             **read_ancillary_messages;
369   gint                                read_num_ancillary_messages;
370
371   /* Whether an async write, flush or close, or none of those, is pending.
372    * Only the worker thread may change its value, and only with the write_lock.
373    * Other threads may read its value when holding the write_lock.
374    * The worker thread may read its value at any time.
375    */
376   OutputPending                       output_pending;
377   /* used for writing */
378   GMutex                              write_lock;
379   /* queue of MessageToWriteData, protected by write_lock */
380   GQueue                             *write_queue;
381   /* protected by write_lock */
382   guint64                             write_num_messages_written;
383   /* number of messages we'd written out last time we flushed;
384    * protected by write_lock
385    */
386   guint64                             write_num_messages_flushed;
387   /* list of FlushData, protected by write_lock */
388   GList                              *write_pending_flushes;
389   /* list of CloseData, protected by write_lock */
390   GList                              *pending_close_attempts;
391   /* no lock - only used from the worker thread */
392   gboolean                            close_expected;
393 };
394
395 static void _g_dbus_worker_unref (GDBusWorker *worker);
396
397 /* ---------------------------------------------------------------------------------------------------- */
398
399 typedef struct
400 {
401   GMutex  mutex;
402   GCond   cond;
403   guint64 number_to_wait_for;
404   GError *error;
405 } FlushData;
406
407 struct _MessageToWriteData ;
408 typedef struct _MessageToWriteData MessageToWriteData;
409
410 static void message_to_write_data_free (MessageToWriteData *data);
411
412 static void read_message_print_transport_debug (gssize bytes_read,
413                                                 GDBusWorker *worker);
414
415 static void write_message_print_transport_debug (gssize bytes_written,
416                                                  MessageToWriteData *data);
417
418 typedef struct {
419     GDBusWorker *worker;
420     GTask *task;
421 } CloseData;
422
423 static void close_data_free (CloseData *close_data)
424 {
425   g_clear_object (&close_data->task);
426
427   _g_dbus_worker_unref (close_data->worker);
428   g_slice_free (CloseData, close_data);
429 }
430
431 /* ---------------------------------------------------------------------------------------------------- */
432
433 static GDBusWorker *
434 _g_dbus_worker_ref (GDBusWorker *worker)
435 {
436   g_atomic_int_inc (&worker->ref_count);
437   return worker;
438 }
439
440 static void
441 _g_dbus_worker_unref (GDBusWorker *worker)
442 {
443   if (g_atomic_int_dec_and_test (&worker->ref_count))
444     {
445       g_assert (worker->write_pending_flushes == NULL);
446
447       _g_dbus_shared_thread_unref (worker->shared_thread_data);
448
449       g_object_unref (worker->stream);
450
451       g_mutex_clear (&worker->read_lock);
452       g_object_unref (worker->cancellable);
453       if (worker->read_fd_list != NULL)
454         g_object_unref (worker->read_fd_list);
455
456       g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
457       g_mutex_clear (&worker->write_lock);
458       g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
459       g_free (worker->read_buffer);
460
461       g_free (worker);
462     }
463 }
464
465 static void
466 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
467                                   gboolean      remote_peer_vanished,
468                                   GError       *error)
469 {
470   if (!g_atomic_int_get (&worker->stopped))
471     worker->disconnected_callback (remote_peer_vanished, error, worker->user_data);
472 }
473
474 static void
475 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
476                                       GDBusMessage *message)
477 {
478   if (!g_atomic_int_get (&worker->stopped))
479     worker->message_received_callback (message, worker->user_data);
480 }
481
482 static GDBusMessage *
483 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
484                                               GDBusMessage *message)
485 {
486   GDBusMessage *ret;
487   if (!g_atomic_int_get (&worker->stopped))
488     ret = worker->message_about_to_be_sent_callback (message, worker->user_data);
489   else
490     ret = message;
491   return ret;
492 }
493
494 /* can only be called from private thread with read-lock held - takes ownership of @message */
495 static void
496 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
497                                                   GDBusMessage *message)
498 {
499   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
500     {
501       /* queue up */
502       g_queue_push_tail (worker->received_messages_while_frozen, message);
503     }
504   else
505     {
506       /* not frozen, nor anything in queue */
507       _g_dbus_worker_emit_message_received (worker, message);
508       g_object_unref (message);
509     }
510 }
511
512 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
513 static gboolean
514 unfreeze_in_idle_cb (gpointer user_data)
515 {
516   GDBusWorker *worker = user_data;
517   GDBusMessage *message;
518
519   g_mutex_lock (&worker->read_lock);
520   if (worker->frozen)
521     {
522       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
523         {
524           _g_dbus_worker_emit_message_received (worker, message);
525           g_object_unref (message);
526         }
527       worker->frozen = FALSE;
528     }
529   else
530     {
531       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
532     }
533   g_mutex_unlock (&worker->read_lock);
534   return FALSE;
535 }
536
537 /* can be called from any thread */
538 void
539 _g_dbus_worker_unfreeze (GDBusWorker *worker)
540 {
541   GSource *idle_source;
542   idle_source = g_idle_source_new ();
543   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
544   g_source_set_callback (idle_source,
545                          unfreeze_in_idle_cb,
546                          _g_dbus_worker_ref (worker),
547                          (GDestroyNotify) _g_dbus_worker_unref);
548   g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb");
549   g_source_attach (idle_source, worker->shared_thread_data->context);
550   g_source_unref (idle_source);
551 }
552
553 /* ---------------------------------------------------------------------------------------------------- */
554
555 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
556
557 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
558 static void
559 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
560                            GAsyncResult  *res,
561                            gpointer       user_data)
562 {
563   GDBusWorker *worker = user_data;
564   GError *error;
565   gssize bytes_read;
566
567   g_mutex_lock (&worker->read_lock);
568
569   /* If already stopped, don't even process the reply */
570   if (g_atomic_int_get (&worker->stopped))
571     goto out;
572
573   error = NULL;
574   if (worker->socket == NULL)
575     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
576                                              res,
577                                              &error);
578   else
579     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
580                                                               res,
581                                                               &error);
582   if (worker->read_num_ancillary_messages > 0)
583     {
584       gint n;
585       for (n = 0; n < worker->read_num_ancillary_messages; n++)
586         {
587           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
588
589           if (FALSE)
590             {
591             }
592 #ifdef G_OS_UNIX
593           else if (G_IS_UNIX_FD_MESSAGE (control_message))
594             {
595               GUnixFDMessage *fd_message;
596               gint *fds;
597               gint num_fds;
598
599               fd_message = G_UNIX_FD_MESSAGE (control_message);
600               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
601               if (worker->read_fd_list == NULL)
602                 {
603                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
604                 }
605               else
606                 {
607                   gint n;
608                   for (n = 0; n < num_fds; n++)
609                     {
610                       /* TODO: really want a append_steal() */
611                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
612                       (void) g_close (fds[n], NULL);
613                     }
614                 }
615               g_free (fds);
616             }
617           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
618             {
619               /* do nothing */
620             }
621 #endif
622           else
623             {
624               if (error == NULL)
625                 {
626                   g_set_error (&error,
627                                G_IO_ERROR,
628                                G_IO_ERROR_FAILED,
629                                "Unexpected ancillary message of type %s received from peer",
630                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
631                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
632                   g_error_free (error);
633                   g_object_unref (control_message);
634                   n++;
635                   while (n < worker->read_num_ancillary_messages)
636                     g_object_unref (worker->read_ancillary_messages[n++]);
637                   g_free (worker->read_ancillary_messages);
638                   goto out;
639                 }
640             }
641           g_object_unref (control_message);
642         }
643       g_free (worker->read_ancillary_messages);
644     }
645
646   if (bytes_read == -1)
647     {
648       if (G_UNLIKELY (_g_dbus_debug_transport ()))
649         {
650           _g_dbus_debug_print_lock ();
651           g_print ("========================================================================\n"
652                    "GDBus-debug:Transport:\n"
653                    "  ---- READ ERROR on stream of type %s:\n"
654                    "  ---- %s %d: %s\n",
655                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
656                    g_quark_to_string (error->domain), error->code,
657                    error->message);
658           _g_dbus_debug_print_unlock ();
659         }
660
661       /* Every async read that uses this callback uses worker->cancellable
662        * as its GCancellable. worker->cancellable gets cancelled if and only
663        * if the GDBusConnection tells us to close (either via
664        * _g_dbus_worker_stop, which is called on last-unref, or directly),
665        * so a cancelled read must mean our connection was closed locally.
666        *
667        * If we're closing, other errors are possible - notably,
668        * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
669        * read in-flight. It seems sensible to treat all read errors during
670        * closing as an expected thing that doesn't trip exit-on-close.
671        *
672        * Because close_expected can't be set until we get into the worker
673        * thread, but the cancellable is signalled sooner (from another
674        * thread), we do still need to check the error.
675        */
676       if (worker->close_expected ||
677           g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
678         _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
679       else
680         _g_dbus_worker_emit_disconnected (worker, TRUE, error);
681
682       g_error_free (error);
683       goto out;
684     }
685
686 #if 0
687   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
688            (gint) bytes_read,
689            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
690            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
691            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
692                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
693            worker->stream,
694            worker);
695 #endif
696
697   /* TODO: hmm, hmm... */
698   if (bytes_read == 0)
699     {
700       g_set_error (&error,
701                    G_IO_ERROR,
702                    G_IO_ERROR_FAILED,
703                    "Underlying GIOStream returned 0 bytes on an async read");
704       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
705       g_error_free (error);
706       goto out;
707     }
708
709   read_message_print_transport_debug (bytes_read, worker);
710
711   worker->read_buffer_cur_size += bytes_read;
712   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
713     {
714       /* OK, got what we asked for! */
715       if (worker->read_buffer_bytes_wanted == 16)
716         {
717           gssize message_len;
718           /* OK, got the header - determine how many more bytes are needed */
719           error = NULL;
720           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
721                                                      16,
722                                                      &error);
723           if (message_len == -1)
724             {
725               g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
726               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
727               g_error_free (error);
728               goto out;
729             }
730
731           worker->read_buffer_bytes_wanted = message_len;
732           _g_dbus_worker_do_read_unlocked (worker);
733         }
734       else
735         {
736           GDBusMessage *message;
737           error = NULL;
738
739           /* TODO: use connection->priv->auth to decode the message */
740
741           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
742                                                   worker->read_buffer_cur_size,
743                                                   worker->capabilities,
744                                                   &error);
745           if (message == NULL)
746             {
747               gchar *s;
748               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
749               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
750                          "The error is: %s\n"
751                          "The payload is as follows:\n"
752                          "%s\n",
753                          worker->read_buffer_cur_size,
754                          error->message,
755                          s);
756               g_free (s);
757               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
758               g_error_free (error);
759               goto out;
760             }
761
762 #ifdef G_OS_UNIX
763           if (worker->read_fd_list != NULL)
764             {
765               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
766               g_object_unref (worker->read_fd_list);
767               worker->read_fd_list = NULL;
768             }
769 #endif
770
771           if (G_UNLIKELY (_g_dbus_debug_message ()))
772             {
773               gchar *s;
774               _g_dbus_debug_print_lock ();
775               g_print ("========================================================================\n"
776                        "GDBus-debug:Message:\n"
777                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
778                        worker->read_buffer_cur_size);
779               s = g_dbus_message_print (message, 2);
780               g_print ("%s", s);
781               g_free (s);
782               if (G_UNLIKELY (_g_dbus_debug_payload ()))
783                 {
784                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
785                   g_print ("%s\n", s);
786                   g_free (s);
787                 }
788               _g_dbus_debug_print_unlock ();
789             }
790
791           /* yay, got a message, go deliver it */
792           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
793
794           /* start reading another message! */
795           worker->read_buffer_bytes_wanted = 0;
796           worker->read_buffer_cur_size = 0;
797           _g_dbus_worker_do_read_unlocked (worker);
798         }
799     }
800   else
801     {
802       /* didn't get all the bytes we requested - so repeat the request... */
803       _g_dbus_worker_do_read_unlocked (worker);
804     }
805
806  out:
807   g_mutex_unlock (&worker->read_lock);
808
809   /* gives up the reference acquired when calling g_input_stream_read_async() */
810   _g_dbus_worker_unref (worker);
811
812   /* check if there is any pending close */
813   schedule_pending_close (worker);
814 }
815
816 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
817 static void
818 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
819 {
820   /* Note that we do need to keep trying to read even if close_expected is
821    * true, because only failing a read causes us to signal 'closed'.
822    */
823
824   /* if bytes_wanted is zero, it means start reading a message */
825   if (worker->read_buffer_bytes_wanted == 0)
826     {
827       worker->read_buffer_cur_size = 0;
828       worker->read_buffer_bytes_wanted = 16;
829     }
830
831   /* ensure we have a (big enough) buffer */
832   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
833     {
834       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
835       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
836       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
837     }
838
839   if (worker->socket == NULL)
840     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
841                                worker->read_buffer + worker->read_buffer_cur_size,
842                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
843                                G_PRIORITY_DEFAULT,
844                                worker->cancellable,
845                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
846                                _g_dbus_worker_ref (worker));
847   else
848     {
849       worker->read_ancillary_messages = NULL;
850       worker->read_num_ancillary_messages = 0;
851       _g_socket_read_with_control_messages (worker->socket,
852                                             worker->read_buffer + worker->read_buffer_cur_size,
853                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
854                                             &worker->read_ancillary_messages,
855                                             &worker->read_num_ancillary_messages,
856                                             G_PRIORITY_DEFAULT,
857                                             worker->cancellable,
858                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
859                                             _g_dbus_worker_ref (worker));
860     }
861 }
862
863 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
864 static gboolean
865 _g_dbus_worker_do_initial_read (gpointer data)
866 {
867   GDBusWorker *worker = data;
868   g_mutex_lock (&worker->read_lock);
869   _g_dbus_worker_do_read_unlocked (worker);
870   g_mutex_unlock (&worker->read_lock);
871   return FALSE;
872 }
873
874 /* ---------------------------------------------------------------------------------------------------- */
875
876 struct _MessageToWriteData
877 {
878   GDBusWorker  *worker;
879   GDBusMessage *message;
880   gchar        *blob;
881   gsize         blob_size;
882
883   gsize         total_written;
884   GTask        *task;
885 };
886
887 static void
888 message_to_write_data_free (MessageToWriteData *data)
889 {
890   _g_dbus_worker_unref (data->worker);
891   if (data->message)
892     g_object_unref (data->message);
893   g_free (data->blob);
894   g_slice_free (MessageToWriteData, data);
895 }
896
897 /* ---------------------------------------------------------------------------------------------------- */
898
899 static void write_message_continue_writing (MessageToWriteData *data);
900
901 /* called in private thread shared by all GDBusConnection instances
902  *
903  * write-lock is not held on entry
904  * output_pending is PENDING_WRITE on entry
905  */
906 static void
907 write_message_async_cb (GObject      *source_object,
908                         GAsyncResult *res,
909                         gpointer      user_data)
910 {
911   MessageToWriteData *data = user_data;
912   GTask *task;
913   gssize bytes_written;
914   GError *error;
915
916   /* Note: we can't access data->task after calling g_task_return_* () because the
917    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
918    */
919   task = data->task;
920
921   error = NULL;
922   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
923                                                 res,
924                                                 &error);
925   if (bytes_written == -1)
926     {
927       g_task_return_error (task, error);
928       g_object_unref (task);
929       goto out;
930     }
931   g_assert (bytes_written > 0); /* zero is never returned */
932
933   write_message_print_transport_debug (bytes_written, data);
934
935   data->total_written += bytes_written;
936   g_assert (data->total_written <= data->blob_size);
937   if (data->total_written == data->blob_size)
938     {
939       g_task_return_boolean (task, TRUE);
940       g_object_unref (task);
941       goto out;
942     }
943
944   write_message_continue_writing (data);
945
946  out:
947   ;
948 }
949
950 /* called in private thread shared by all GDBusConnection instances
951  *
952  * write-lock is not held on entry
953  * output_pending is PENDING_WRITE on entry
954  */
955 #ifdef G_OS_UNIX
956 static gboolean
957 on_socket_ready (GSocket      *socket,
958                  GIOCondition  condition,
959                  gpointer      user_data)
960 {
961   MessageToWriteData *data = user_data;
962   write_message_continue_writing (data);
963   return FALSE; /* remove source */
964 }
965 #endif
966
967 /* called in private thread shared by all GDBusConnection instances
968  *
969  * write-lock is not held on entry
970  * output_pending is PENDING_WRITE on entry
971  */
972 static void
973 write_message_continue_writing (MessageToWriteData *data)
974 {
975   GOutputStream *ostream;
976 #ifdef G_OS_UNIX
977   GTask *task;
978   GUnixFDList *fd_list;
979 #endif
980
981 #ifdef G_OS_UNIX
982   /* Note: we can't access data->task after calling g_task_return_* () because the
983    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
984    */
985   task = data->task;
986 #endif
987
988   ostream = g_io_stream_get_output_stream (data->worker->stream);
989 #ifdef G_OS_UNIX
990   fd_list = g_dbus_message_get_unix_fd_list (data->message);
991 #endif
992
993   g_assert (!g_output_stream_has_pending (ostream));
994   g_assert_cmpint (data->total_written, <, data->blob_size);
995
996   if (FALSE)
997     {
998     }
999 #ifdef G_OS_UNIX
1000   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
1001     {
1002       GOutputVector vector;
1003       GSocketControlMessage *control_message;
1004       gssize bytes_written;
1005       GError *error;
1006
1007       vector.buffer = data->blob;
1008       vector.size = data->blob_size;
1009
1010       control_message = NULL;
1011       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
1012         {
1013           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
1014             {
1015               g_task_return_new_error (task,
1016                                        G_IO_ERROR,
1017                                        G_IO_ERROR_FAILED,
1018                                        "Tried sending a file descriptor but remote peer does not support this capability");
1019               g_object_unref (task);
1020               goto out;
1021             }
1022           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
1023         }
1024
1025       error = NULL;
1026       bytes_written = g_socket_send_message (data->worker->socket,
1027                                              NULL, /* address */
1028                                              &vector,
1029                                              1,
1030                                              control_message != NULL ? &control_message : NULL,
1031                                              control_message != NULL ? 1 : 0,
1032                                              G_SOCKET_MSG_NONE,
1033                                              data->worker->cancellable,
1034                                              &error);
1035       if (control_message != NULL)
1036         g_object_unref (control_message);
1037
1038       if (bytes_written == -1)
1039         {
1040           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1041           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1042             {
1043               GSource *source;
1044               source = g_socket_create_source (data->worker->socket,
1045                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
1046                                                data->worker->cancellable);
1047               g_source_set_callback (source,
1048                                      (GSourceFunc) on_socket_ready,
1049                                      data,
1050                                      NULL); /* GDestroyNotify */
1051               g_source_attach (source, g_main_context_get_thread_default ());
1052               g_source_unref (source);
1053               g_error_free (error);
1054               goto out;
1055             }
1056           g_task_return_error (task, error);
1057           g_object_unref (task);
1058           goto out;
1059         }
1060       g_assert (bytes_written > 0); /* zero is never returned */
1061
1062       write_message_print_transport_debug (bytes_written, data);
1063
1064       data->total_written += bytes_written;
1065       g_assert (data->total_written <= data->blob_size);
1066       if (data->total_written == data->blob_size)
1067         {
1068           g_task_return_boolean (task, TRUE);
1069           g_object_unref (task);
1070           goto out;
1071         }
1072
1073       write_message_continue_writing (data);
1074     }
1075 #endif
1076   else
1077     {
1078 #ifdef G_OS_UNIX
1079       if (fd_list != NULL)
1080         {
1081           g_task_return_new_error (task,
1082                                    G_IO_ERROR,
1083                                    G_IO_ERROR_FAILED,
1084                                    "Tried sending a file descriptor on unsupported stream of type %s",
1085                                    g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1086           g_object_unref (task);
1087           goto out;
1088         }
1089 #endif
1090
1091       g_output_stream_write_async (ostream,
1092                                    (const gchar *) data->blob + data->total_written,
1093                                    data->blob_size - data->total_written,
1094                                    G_PRIORITY_DEFAULT,
1095                                    data->worker->cancellable,
1096                                    write_message_async_cb,
1097                                    data);
1098     }
1099 #ifdef G_OS_UNIX
1100  out:
1101 #endif
1102   ;
1103 }
1104
1105 /* called in private thread shared by all GDBusConnection instances
1106  *
1107  * write-lock is not held on entry
1108  * output_pending is PENDING_WRITE on entry
1109  */
1110 static void
1111 write_message_async (GDBusWorker         *worker,
1112                      MessageToWriteData  *data,
1113                      GAsyncReadyCallback  callback,
1114                      gpointer             user_data)
1115 {
1116   data->task = g_task_new (NULL, NULL, callback, user_data);
1117   g_task_set_source_tag (data->task, write_message_async);
1118   data->total_written = 0;
1119   write_message_continue_writing (data);
1120 }
1121
1122 /* called in private thread shared by all GDBusConnection instances (with write-lock held) */
1123 static gboolean
1124 write_message_finish (GAsyncResult   *res,
1125                       GError        **error)
1126 {
1127   g_return_val_if_fail (g_task_is_valid (res, NULL), FALSE);
1128
1129   return g_task_propagate_boolean (G_TASK (res), error);
1130 }
1131 /* ---------------------------------------------------------------------------------------------------- */
1132
1133 static void continue_writing (GDBusWorker *worker);
1134
1135 typedef struct
1136 {
1137   GDBusWorker *worker;
1138   GList *flushers;
1139 } FlushAsyncData;
1140
1141 static void
1142 flush_data_list_complete (const GList  *flushers,
1143                           const GError *error)
1144 {
1145   const GList *l;
1146
1147   for (l = flushers; l != NULL; l = l->next)
1148     {
1149       FlushData *f = l->data;
1150
1151       f->error = error != NULL ? g_error_copy (error) : NULL;
1152
1153       g_mutex_lock (&f->mutex);
1154       g_cond_signal (&f->cond);
1155       g_mutex_unlock (&f->mutex);
1156     }
1157 }
1158
1159 /* called in private thread shared by all GDBusConnection instances
1160  *
1161  * write-lock is not held on entry
1162  * output_pending is PENDING_FLUSH on entry
1163  */
1164 static void
1165 ostream_flush_cb (GObject      *source_object,
1166                   GAsyncResult *res,
1167                   gpointer      user_data)
1168 {
1169   FlushAsyncData *data = user_data;
1170   GError *error;
1171
1172   error = NULL;
1173   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1174                                 res,
1175                                 &error);
1176
1177   if (error == NULL)
1178     {
1179       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1180         {
1181           _g_dbus_debug_print_lock ();
1182           g_print ("========================================================================\n"
1183                    "GDBus-debug:Transport:\n"
1184                    "  ---- FLUSHED stream of type %s\n",
1185                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1186           _g_dbus_debug_print_unlock ();
1187         }
1188     }
1189
1190   g_assert (data->flushers != NULL);
1191   flush_data_list_complete (data->flushers, error);
1192   g_list_free (data->flushers);
1193
1194   if (error != NULL)
1195     g_error_free (error);
1196
1197   /* Make sure we tell folks that we don't have additional
1198      flushes pending */
1199   g_mutex_lock (&data->worker->write_lock);
1200   data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
1201   g_assert (data->worker->output_pending == PENDING_FLUSH);
1202   data->worker->output_pending = PENDING_NONE;
1203   g_mutex_unlock (&data->worker->write_lock);
1204
1205   /* OK, cool, finally kick off the next write */
1206   continue_writing (data->worker);
1207
1208   _g_dbus_worker_unref (data->worker);
1209   g_free (data);
1210 }
1211
1212 /* called in private thread shared by all GDBusConnection instances
1213  *
1214  * write-lock is not held on entry
1215  * output_pending is PENDING_FLUSH on entry
1216  */
1217 static void
1218 start_flush (FlushAsyncData *data)
1219 {
1220   g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
1221                                G_PRIORITY_DEFAULT,
1222                                data->worker->cancellable,
1223                                ostream_flush_cb,
1224                                data);
1225 }
1226
1227 /* called in private thread shared by all GDBusConnection instances
1228  *
1229  * write-lock is held on entry
1230  * output_pending is PENDING_NONE on entry
1231  */
1232 static void
1233 message_written_unlocked (GDBusWorker *worker,
1234                           MessageToWriteData *message_data)
1235 {
1236   if (G_UNLIKELY (_g_dbus_debug_message ()))
1237     {
1238       gchar *s;
1239       _g_dbus_debug_print_lock ();
1240       g_print ("========================================================================\n"
1241                "GDBus-debug:Message:\n"
1242                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1243                message_data->blob_size);
1244       s = g_dbus_message_print (message_data->message, 2);
1245       g_print ("%s", s);
1246       g_free (s);
1247       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1248         {
1249           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1250           g_print ("%s\n", s);
1251           g_free (s);
1252         }
1253       _g_dbus_debug_print_unlock ();
1254     }
1255
1256   worker->write_num_messages_written += 1;
1257 }
1258
1259 /* called in private thread shared by all GDBusConnection instances
1260  *
1261  * write-lock is held on entry
1262  * output_pending is PENDING_NONE on entry
1263  *
1264  * Returns: non-%NULL, setting @output_pending, if we need to flush now
1265  */
1266 static FlushAsyncData *
1267 prepare_flush_unlocked (GDBusWorker *worker)
1268 {
1269   GList *l;
1270   GList *ll;
1271   GList *flushers;
1272
1273   flushers = NULL;
1274   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1275     {
1276       FlushData *f = l->data;
1277       ll = l->next;
1278
1279       if (f->number_to_wait_for == worker->write_num_messages_written)
1280         {
1281           flushers = g_list_append (flushers, f);
1282           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1283         }
1284     }
1285   if (flushers != NULL)
1286     {
1287       g_assert (worker->output_pending == PENDING_NONE);
1288       worker->output_pending = PENDING_FLUSH;
1289     }
1290
1291   if (flushers != NULL)
1292     {
1293       FlushAsyncData *data;
1294
1295       data = g_new0 (FlushAsyncData, 1);
1296       data->worker = _g_dbus_worker_ref (worker);
1297       data->flushers = flushers;
1298       return data;
1299     }
1300
1301   return NULL;
1302 }
1303
1304 /* called in private thread shared by all GDBusConnection instances
1305  *
1306  * write-lock is not held on entry
1307  * output_pending is PENDING_WRITE on entry
1308  */
1309 static void
1310 write_message_cb (GObject       *source_object,
1311                   GAsyncResult  *res,
1312                   gpointer       user_data)
1313 {
1314   MessageToWriteData *data = user_data;
1315   GError *error;
1316
1317   g_mutex_lock (&data->worker->write_lock);
1318   g_assert (data->worker->output_pending == PENDING_WRITE);
1319   data->worker->output_pending = PENDING_NONE;
1320
1321   error = NULL;
1322   if (!write_message_finish (res, &error))
1323     {
1324       g_mutex_unlock (&data->worker->write_lock);
1325
1326       /* TODO: handle */
1327       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1328       g_error_free (error);
1329
1330       g_mutex_lock (&data->worker->write_lock);
1331     }
1332
1333   message_written_unlocked (data->worker, data);
1334
1335   g_mutex_unlock (&data->worker->write_lock);
1336
1337   continue_writing (data->worker);
1338
1339   message_to_write_data_free (data);
1340 }
1341
1342 /* called in private thread shared by all GDBusConnection instances
1343  *
1344  * write-lock is not held on entry
1345  * output_pending is PENDING_CLOSE on entry
1346  */
1347 static void
1348 iostream_close_cb (GObject      *source_object,
1349                    GAsyncResult *res,
1350                    gpointer      user_data)
1351 {
1352   GDBusWorker *worker = user_data;
1353   GError *error = NULL;
1354   GList *pending_close_attempts, *pending_flush_attempts;
1355   GQueue *send_queue;
1356
1357   g_io_stream_close_finish (worker->stream, res, &error);
1358
1359   g_mutex_lock (&worker->write_lock);
1360
1361   pending_close_attempts = worker->pending_close_attempts;
1362   worker->pending_close_attempts = NULL;
1363
1364   pending_flush_attempts = worker->write_pending_flushes;
1365   worker->write_pending_flushes = NULL;
1366
1367   send_queue = worker->write_queue;
1368   worker->write_queue = g_queue_new ();
1369
1370   g_assert (worker->output_pending == PENDING_CLOSE);
1371   worker->output_pending = PENDING_NONE;
1372
1373   g_mutex_unlock (&worker->write_lock);
1374
1375   while (pending_close_attempts != NULL)
1376     {
1377       CloseData *close_data = pending_close_attempts->data;
1378
1379       pending_close_attempts = g_list_delete_link (pending_close_attempts,
1380                                                    pending_close_attempts);
1381
1382       if (close_data->task != NULL)
1383         {
1384           if (error != NULL)
1385             g_task_return_error (close_data->task, g_error_copy (error));
1386           else
1387             g_task_return_boolean (close_data->task, TRUE);
1388         }
1389
1390       close_data_free (close_data);
1391     }
1392
1393   g_clear_error (&error);
1394
1395   /* all messages queued for sending are discarded */
1396   g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
1397   /* all queued flushes fail */
1398   error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
1399                        _("Operation was cancelled"));
1400   flush_data_list_complete (pending_flush_attempts, error);
1401   g_list_free (pending_flush_attempts);
1402   g_clear_error (&error);
1403
1404   _g_dbus_worker_unref (worker);
1405 }
1406
1407 /* called in private thread shared by all GDBusConnection instances
1408  *
1409  * write-lock is not held on entry
1410  * output_pending must be PENDING_NONE on entry
1411  */
1412 static void
1413 continue_writing (GDBusWorker *worker)
1414 {
1415   MessageToWriteData *data;
1416   FlushAsyncData *flush_async_data;
1417
1418  write_next:
1419   /* we mustn't try to write two things at once */
1420   g_assert (worker->output_pending == PENDING_NONE);
1421
1422   g_mutex_lock (&worker->write_lock);
1423
1424   data = NULL;
1425   flush_async_data = NULL;
1426
1427   /* if we want to close the connection, that takes precedence */
1428   if (worker->pending_close_attempts != NULL)
1429     {
1430       GInputStream *input = g_io_stream_get_input_stream (worker->stream);
1431
1432       if (!g_input_stream_has_pending (input))
1433         {
1434           worker->close_expected = TRUE;
1435           worker->output_pending = PENDING_CLOSE;
1436
1437           g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
1438                                    NULL, iostream_close_cb,
1439                                    _g_dbus_worker_ref (worker));
1440         }
1441     }
1442   else
1443     {
1444       flush_async_data = prepare_flush_unlocked (worker);
1445
1446       if (flush_async_data == NULL)
1447         {
1448           data = g_queue_pop_head (worker->write_queue);
1449
1450           if (data != NULL)
1451             worker->output_pending = PENDING_WRITE;
1452         }
1453     }
1454
1455   g_mutex_unlock (&worker->write_lock);
1456
1457   /* Note that write_lock is only used for protecting the @write_queue
1458    * and @output_pending fields of the GDBusWorker struct ... which we
1459    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1460    *
1461    * Therefore, it's fine to drop it here when calling back into user
1462    * code and then writing the message out onto the GIOStream since this
1463    * function only runs on the worker thread.
1464    */
1465
1466   if (flush_async_data != NULL)
1467     {
1468       start_flush (flush_async_data);
1469       g_assert (data == NULL);
1470     }
1471   else if (data != NULL)
1472     {
1473       GDBusMessage *old_message;
1474       guchar *new_blob;
1475       gsize new_blob_size;
1476       GError *error;
1477
1478       old_message = data->message;
1479       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1480       if (data->message == old_message)
1481         {
1482           /* filters had no effect - do nothing */
1483         }
1484       else if (data->message == NULL)
1485         {
1486           /* filters dropped message */
1487           g_mutex_lock (&worker->write_lock);
1488           worker->output_pending = PENDING_NONE;
1489           g_mutex_unlock (&worker->write_lock);
1490           message_to_write_data_free (data);
1491           goto write_next;
1492         }
1493       else
1494         {
1495           /* filters altered the message -> reencode */
1496           error = NULL;
1497           new_blob = g_dbus_message_to_blob (data->message,
1498                                              &new_blob_size,
1499                                              worker->capabilities,
1500                                              &error);
1501           if (new_blob == NULL)
1502             {
1503               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1504                * the old message instead
1505                */
1506               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1507                          g_dbus_message_get_serial (data->message),
1508                          error->message);
1509               g_error_free (error);
1510             }
1511           else
1512             {
1513               g_free (data->blob);
1514               data->blob = (gchar *) new_blob;
1515               data->blob_size = new_blob_size;
1516             }
1517         }
1518
1519       write_message_async (worker,
1520                            data,
1521                            write_message_cb,
1522                            data);
1523     }
1524 }
1525
1526 /* called in private thread shared by all GDBusConnection instances
1527  *
1528  * write-lock is not held on entry
1529  * output_pending may be anything
1530  */
1531 static gboolean
1532 continue_writing_in_idle_cb (gpointer user_data)
1533 {
1534   GDBusWorker *worker = user_data;
1535
1536   /* Because this is the worker thread, we can read this struct member
1537    * without holding the lock: no other thread ever modifies it.
1538    */
1539   if (worker->output_pending == PENDING_NONE)
1540     continue_writing (worker);
1541
1542   return FALSE;
1543 }
1544
1545 /*
1546  * @write_data: (transfer full) (nullable):
1547  * @flush_data: (transfer full) (nullable):
1548  * @close_data: (transfer full) (nullable):
1549  *
1550  * Can be called from any thread
1551  *
1552  * write_lock is held on entry
1553  * output_pending may be anything
1554  */
1555 static void
1556 schedule_writing_unlocked (GDBusWorker        *worker,
1557                            MessageToWriteData *write_data,
1558                            FlushData          *flush_data,
1559                            CloseData          *close_data)
1560 {
1561   if (write_data != NULL)
1562     g_queue_push_tail (worker->write_queue, write_data);
1563
1564   if (flush_data != NULL)
1565     worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
1566
1567   if (close_data != NULL)
1568     worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
1569                                                      close_data);
1570
1571   /* If we had output pending, the next bit of output will happen
1572    * automatically when it finishes, so we only need to do this
1573    * if nothing was pending.
1574    *
1575    * The idle callback will re-check that output_pending is still
1576    * PENDING_NONE, to guard against output starting before the idle.
1577    */
1578   if (worker->output_pending == PENDING_NONE)
1579     {
1580       GSource *idle_source;
1581       idle_source = g_idle_source_new ();
1582       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1583       g_source_set_callback (idle_source,
1584                              continue_writing_in_idle_cb,
1585                              _g_dbus_worker_ref (worker),
1586                              (GDestroyNotify) _g_dbus_worker_unref);
1587       g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb");
1588       g_source_attach (idle_source, worker->shared_thread_data->context);
1589       g_source_unref (idle_source);
1590     }
1591 }
1592
1593 static void
1594 schedule_pending_close (GDBusWorker *worker)
1595 {
1596   g_mutex_lock (&worker->write_lock);
1597   if (worker->pending_close_attempts)
1598     schedule_writing_unlocked (worker, NULL, NULL, NULL);
1599   g_mutex_unlock (&worker->write_lock);
1600 }
1601
1602 /* ---------------------------------------------------------------------------------------------------- */
1603
1604 /* can be called from any thread - steals blob
1605  *
1606  * write_lock is not held on entry
1607  * output_pending may be anything
1608  */
1609 void
1610 _g_dbus_worker_send_message (GDBusWorker    *worker,
1611                              GDBusMessage   *message,
1612                              gchar          *blob,
1613                              gsize           blob_len)
1614 {
1615   MessageToWriteData *data;
1616
1617   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1618   g_return_if_fail (blob != NULL);
1619   g_return_if_fail (blob_len > 16);
1620
1621   data = g_slice_new0 (MessageToWriteData);
1622   data->worker = _g_dbus_worker_ref (worker);
1623   data->message = g_object_ref (message);
1624   data->blob = blob; /* steal! */
1625   data->blob_size = blob_len;
1626
1627   g_mutex_lock (&worker->write_lock);
1628   schedule_writing_unlocked (worker, data, NULL, NULL);
1629   g_mutex_unlock (&worker->write_lock);
1630 }
1631
1632 /* ---------------------------------------------------------------------------------------------------- */
1633
1634 GDBusWorker *
1635 _g_dbus_worker_new (GIOStream                              *stream,
1636                     GDBusCapabilityFlags                    capabilities,
1637                     gboolean                                initially_frozen,
1638                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1639                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1640                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1641                     gpointer                                user_data)
1642 {
1643   GDBusWorker *worker;
1644   GSource *idle_source;
1645
1646   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1647   g_return_val_if_fail (message_received_callback != NULL, NULL);
1648   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1649   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1650
1651   worker = g_new0 (GDBusWorker, 1);
1652   worker->ref_count = 1;
1653
1654   g_mutex_init (&worker->read_lock);
1655   worker->message_received_callback = message_received_callback;
1656   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1657   worker->disconnected_callback = disconnected_callback;
1658   worker->user_data = user_data;
1659   worker->stream = g_object_ref (stream);
1660   worker->capabilities = capabilities;
1661   worker->cancellable = g_cancellable_new ();
1662   worker->output_pending = PENDING_NONE;
1663
1664   worker->frozen = initially_frozen;
1665   worker->received_messages_while_frozen = g_queue_new ();
1666
1667   g_mutex_init (&worker->write_lock);
1668   worker->write_queue = g_queue_new ();
1669
1670   if (G_IS_SOCKET_CONNECTION (worker->stream))
1671     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1672
1673   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1674
1675   /* begin reading */
1676   idle_source = g_idle_source_new ();
1677   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1678   g_source_set_callback (idle_source,
1679                          _g_dbus_worker_do_initial_read,
1680                          _g_dbus_worker_ref (worker),
1681                          (GDestroyNotify) _g_dbus_worker_unref);
1682   g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read");
1683   g_source_attach (idle_source, worker->shared_thread_data->context);
1684   g_source_unref (idle_source);
1685
1686   return worker;
1687 }
1688
1689 /* ---------------------------------------------------------------------------------------------------- */
1690
1691 /* can be called from any thread
1692  *
1693  * write_lock is not held on entry
1694  * output_pending may be anything
1695  */
1696 void
1697 _g_dbus_worker_close (GDBusWorker         *worker,
1698                       GTask               *task)
1699 {
1700   CloseData *close_data;
1701
1702   close_data = g_slice_new0 (CloseData);
1703   close_data->worker = _g_dbus_worker_ref (worker);
1704   close_data->task = (task == NULL ? NULL : g_object_ref (task));
1705
1706   /* Don't set worker->close_expected here - we're in the wrong thread.
1707    * It'll be set before the actual close happens.
1708    */
1709   g_cancellable_cancel (worker->cancellable);
1710   g_mutex_lock (&worker->write_lock);
1711   schedule_writing_unlocked (worker, NULL, NULL, close_data);
1712   g_mutex_unlock (&worker->write_lock);
1713 }
1714
1715 /* This can be called from any thread - frees worker. Note that
1716  * callbacks might still happen if called from another thread than the
1717  * worker - use your own synchronization primitive in the callbacks.
1718  *
1719  * write_lock is not held on entry
1720  * output_pending may be anything
1721  */
1722 void
1723 _g_dbus_worker_stop (GDBusWorker *worker)
1724 {
1725   g_atomic_int_set (&worker->stopped, TRUE);
1726
1727   /* Cancel any pending operations and schedule a close of the underlying I/O
1728    * stream in the worker thread
1729    */
1730   _g_dbus_worker_close (worker, NULL);
1731
1732   /* _g_dbus_worker_close holds a ref until after an idle in the worker
1733    * thread has run, so we no longer need to unref in an idle like in
1734    * commit 322e25b535
1735    */
1736   _g_dbus_worker_unref (worker);
1737 }
1738
1739 /* ---------------------------------------------------------------------------------------------------- */
1740
1741 /* can be called from any thread (except the worker thread) - blocks
1742  * calling thread until all queued outgoing messages are written and
1743  * the transport has been flushed
1744  *
1745  * write_lock is not held on entry
1746  * output_pending may be anything
1747  */
1748 gboolean
1749 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1750                            GCancellable   *cancellable,
1751                            GError        **error)
1752 {
1753   gboolean ret;
1754   FlushData *data;
1755   guint64 pending_writes;
1756
1757   data = NULL;
1758   ret = TRUE;
1759
1760   g_mutex_lock (&worker->write_lock);
1761
1762   /* if the queue is empty, no write is in-flight and we haven't written
1763    * anything since the last flush, then there's nothing to wait for
1764    */
1765   pending_writes = g_queue_get_length (worker->write_queue);
1766
1767   /* if a write is in-flight, we shouldn't be satisfied until the first
1768    * flush operation that follows it
1769    */
1770   if (worker->output_pending == PENDING_WRITE)
1771     pending_writes += 1;
1772
1773   if (pending_writes > 0 ||
1774       worker->write_num_messages_written != worker->write_num_messages_flushed)
1775     {
1776       data = g_new0 (FlushData, 1);
1777       g_mutex_init (&data->mutex);
1778       g_cond_init (&data->cond);
1779       data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
1780       g_mutex_lock (&data->mutex);
1781
1782       schedule_writing_unlocked (worker, NULL, data, NULL);
1783     }
1784   g_mutex_unlock (&worker->write_lock);
1785
1786   if (data != NULL)
1787     {
1788       g_cond_wait (&data->cond, &data->mutex);
1789       g_mutex_unlock (&data->mutex);
1790
1791       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1792       g_cond_clear (&data->cond);
1793       g_mutex_clear (&data->mutex);
1794       if (data->error != NULL)
1795         {
1796           ret = FALSE;
1797           g_propagate_error (error, data->error);
1798         }
1799       g_free (data);
1800     }
1801
1802   return ret;
1803 }
1804
1805 /* ---------------------------------------------------------------------------------------------------- */
1806
1807 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1808 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1809 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1810 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1811 #define G_DBUS_DEBUG_CALL           (1<<4)
1812 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1813 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1814 #define G_DBUS_DEBUG_RETURN         (1<<7)
1815 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1816 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1817
1818 static gint _gdbus_debug_flags = 0;
1819
1820 gboolean
1821 _g_dbus_debug_authentication (void)
1822 {
1823   _g_dbus_initialize ();
1824   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1825 }
1826
1827 gboolean
1828 _g_dbus_debug_transport (void)
1829 {
1830   _g_dbus_initialize ();
1831   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1832 }
1833
1834 gboolean
1835 _g_dbus_debug_message (void)
1836 {
1837   _g_dbus_initialize ();
1838   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1839 }
1840
1841 gboolean
1842 _g_dbus_debug_payload (void)
1843 {
1844   _g_dbus_initialize ();
1845   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1846 }
1847
1848 gboolean
1849 _g_dbus_debug_call (void)
1850 {
1851   _g_dbus_initialize ();
1852   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1853 }
1854
1855 gboolean
1856 _g_dbus_debug_signal (void)
1857 {
1858   _g_dbus_initialize ();
1859   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1860 }
1861
1862 gboolean
1863 _g_dbus_debug_incoming (void)
1864 {
1865   _g_dbus_initialize ();
1866   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1867 }
1868
1869 gboolean
1870 _g_dbus_debug_return (void)
1871 {
1872   _g_dbus_initialize ();
1873   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1874 }
1875
1876 gboolean
1877 _g_dbus_debug_emission (void)
1878 {
1879   _g_dbus_initialize ();
1880   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1881 }
1882
1883 gboolean
1884 _g_dbus_debug_address (void)
1885 {
1886   _g_dbus_initialize ();
1887   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1888 }
1889
1890 G_LOCK_DEFINE_STATIC (print_lock);
1891
1892 void
1893 _g_dbus_debug_print_lock (void)
1894 {
1895   G_LOCK (print_lock);
1896 }
1897
1898 void
1899 _g_dbus_debug_print_unlock (void)
1900 {
1901   G_UNLOCK (print_lock);
1902 }
1903
1904 /**
1905  * _g_dbus_initialize:
1906  *
1907  * Does various one-time init things such as
1908  *
1909  *  - registering the G_DBUS_ERROR error domain
1910  *  - parses the G_DBUS_DEBUG environment variable
1911  */
1912 void
1913 _g_dbus_initialize (void)
1914 {
1915   static volatile gsize initialized = 0;
1916
1917   if (g_once_init_enter (&initialized))
1918     {
1919       volatile GQuark g_dbus_error_domain;
1920       const gchar *debug;
1921
1922       g_dbus_error_domain = G_DBUS_ERROR;
1923       (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
1924
1925       debug = g_getenv ("G_DBUS_DEBUG");
1926       if (debug != NULL)
1927         {
1928           const GDebugKey keys[] = {
1929             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1930             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
1931             { "message",        G_DBUS_DEBUG_MESSAGE        },
1932             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
1933             { "call",           G_DBUS_DEBUG_CALL           },
1934             { "signal",         G_DBUS_DEBUG_SIGNAL         },
1935             { "incoming",       G_DBUS_DEBUG_INCOMING       },
1936             { "return",         G_DBUS_DEBUG_RETURN         },
1937             { "emission",       G_DBUS_DEBUG_EMISSION       },
1938             { "address",        G_DBUS_DEBUG_ADDRESS        }
1939           };
1940
1941           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1942           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1943             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1944         }
1945
1946       /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
1947       ensure_required_types ();
1948
1949       g_once_init_leave (&initialized, 1);
1950     }
1951 }
1952
1953 /* ---------------------------------------------------------------------------------------------------- */
1954
1955 GVariantType *
1956 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1957 {
1958   const GVariantType *arg_types[256];
1959   guint n;
1960
1961   if (args)
1962     for (n = 0; args[n] != NULL; n++)
1963       {
1964         /* DBus places a hard limit of 255 on signature length.
1965          * therefore number of args must be less than 256.
1966          */
1967         g_assert (n < 256);
1968
1969         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1970
1971         if G_UNLIKELY (arg_types[n] == NULL)
1972           return NULL;
1973       }
1974   else
1975     n = 0;
1976
1977   return g_variant_type_new_tuple (arg_types, n);
1978 }
1979
1980 /* ---------------------------------------------------------------------------------------------------- */
1981
1982 #ifdef G_OS_WIN32
1983
1984 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1985
1986 gchar *
1987 _g_dbus_win32_get_user_sid (void)
1988 {
1989   HANDLE h;
1990   TOKEN_USER *user;
1991   DWORD token_information_len;
1992   PSID psid;
1993   gchar *sid;
1994   gchar *ret;
1995
1996   ret = NULL;
1997   user = NULL;
1998   h = INVALID_HANDLE_VALUE;
1999
2000   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
2001     {
2002       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
2003       goto out;
2004     }
2005
2006   /* Get length of buffer */
2007   token_information_len = 0;
2008   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
2009     {
2010       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
2011         {
2012           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2013           goto out;
2014         }
2015     }
2016   user = g_malloc (token_information_len);
2017   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
2018     {
2019       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2020       goto out;
2021     }
2022
2023   psid = user->User.Sid;
2024   if (!IsValidSid (psid))
2025     {
2026       g_warning ("Invalid SID");
2027       goto out;
2028     }
2029
2030   if (!ConvertSidToStringSidA (psid, &sid))
2031     {
2032       g_warning ("Invalid SID");
2033       goto out;
2034     }
2035
2036   ret = g_strdup (sid);
2037   LocalFree (sid);
2038
2039 out:
2040   g_free (user);
2041   if (h != INVALID_HANDLE_VALUE)
2042     CloseHandle (h);
2043   return ret;
2044 }
2045 #endif
2046
2047 /* ---------------------------------------------------------------------------------------------------- */
2048
2049 gchar *
2050 _g_dbus_get_machine_id (GError **error)
2051 {
2052 #ifdef G_OS_WIN32
2053   HW_PROFILE_INFOA info;
2054   char *src, *dest, *res;
2055   int i;
2056
2057   if (!GetCurrentHwProfileA (&info))
2058     {
2059       char *message = g_win32_error_message (GetLastError ());
2060       g_set_error (error,
2061                    G_IO_ERROR,
2062                    G_IO_ERROR_FAILED,
2063                    _("Unable to get Hardware profile: %s"), message);
2064       g_free (message);
2065       return NULL;
2066     }
2067
2068   /* Form: {12340001-4980-1920-6788-123456789012} */
2069   src = &info.szHwProfileGuid[0];
2070
2071   res = g_malloc (32+1);
2072   dest = res;
2073
2074   src++; /* Skip { */
2075   for (i = 0; i < 8; i++)
2076     *dest++ = *src++;
2077   src++; /* Skip - */
2078   for (i = 0; i < 4; i++)
2079     *dest++ = *src++;
2080   src++; /* Skip - */
2081   for (i = 0; i < 4; i++)
2082     *dest++ = *src++;
2083   src++; /* Skip - */
2084   for (i = 0; i < 4; i++)
2085     *dest++ = *src++;
2086   src++; /* Skip - */
2087   for (i = 0; i < 12; i++)
2088     *dest++ = *src++;
2089   *dest = 0;
2090
2091   return res;
2092 #else
2093   gchar *ret;
2094   GError *first_error;
2095   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
2096   ret = NULL;
2097   first_error = NULL;
2098   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
2099                             &ret,
2100                             NULL,
2101                             &first_error) &&
2102       !g_file_get_contents ("/etc/machine-id",
2103                             &ret,
2104                             NULL,
2105                             NULL))
2106     {
2107       g_propagate_prefixed_error (error, first_error,
2108                                   _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
2109     }
2110   else
2111     {
2112       /* ignore the error from the first try, if any */
2113       g_clear_error (&first_error);
2114       /* TODO: validate value */
2115       g_strstrip (ret);
2116     }
2117   return ret;
2118 #endif
2119 }
2120
2121 /* ---------------------------------------------------------------------------------------------------- */
2122
2123 gchar *
2124 _g_dbus_enum_to_string (GType enum_type, gint value)
2125 {
2126   gchar *ret;
2127   GEnumClass *klass;
2128   GEnumValue *enum_value;
2129
2130   klass = g_type_class_ref (enum_type);
2131   enum_value = g_enum_get_value (klass, value);
2132   if (enum_value != NULL)
2133     ret = g_strdup (enum_value->value_nick);
2134   else
2135     ret = g_strdup_printf ("unknown (value %d)", value);
2136   g_type_class_unref (klass);
2137   return ret;
2138 }
2139
2140 /* ---------------------------------------------------------------------------------------------------- */
2141
2142 static void
2143 write_message_print_transport_debug (gssize bytes_written,
2144                                      MessageToWriteData *data)
2145 {
2146   if (G_LIKELY (!_g_dbus_debug_transport ()))
2147     goto out;
2148
2149   _g_dbus_debug_print_lock ();
2150   g_print ("========================================================================\n"
2151            "GDBus-debug:Transport:\n"
2152            "  >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2153            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
2154            bytes_written,
2155            g_dbus_message_get_serial (data->message),
2156            data->blob_size,
2157            data->total_written,
2158            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
2159   _g_dbus_debug_print_unlock ();
2160  out:
2161   ;
2162 }
2163
2164 /* ---------------------------------------------------------------------------------------------------- */
2165
2166 static void
2167 read_message_print_transport_debug (gssize bytes_read,
2168                                     GDBusWorker *worker)
2169 {
2170   gsize size;
2171   gint32 serial;
2172   gint32 message_length;
2173
2174   if (G_LIKELY (!_g_dbus_debug_transport ()))
2175     goto out;
2176
2177   size = bytes_read + worker->read_buffer_cur_size;
2178   serial = 0;
2179   message_length = 0;
2180   if (size >= 16)
2181     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
2182   if (size >= 1)
2183     {
2184       switch (worker->read_buffer[0])
2185         {
2186         case 'l':
2187           if (size >= 12)
2188             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
2189           break;
2190         case 'B':
2191           if (size >= 12)
2192             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
2193           break;
2194         default:
2195           /* an error will be set elsewhere if this happens */
2196           goto out;
2197         }
2198     }
2199
2200     _g_dbus_debug_print_lock ();
2201   g_print ("========================================================================\n"
2202            "GDBus-debug:Transport:\n"
2203            "  <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2204            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
2205            bytes_read,
2206            serial,
2207            message_length,
2208            worker->read_buffer_cur_size,
2209            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
2210   _g_dbus_debug_print_unlock ();
2211  out:
2212   ;
2213 }
2214
2215 /* ---------------------------------------------------------------------------------------------------- */
2216
2217 gboolean
2218 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
2219                                      GValue                *return_accu,
2220                                      const GValue          *handler_return,
2221                                      gpointer               dummy)
2222 {
2223   gboolean continue_emission;
2224   gboolean signal_return;
2225
2226   signal_return = g_value_get_boolean (handler_return);
2227   g_value_set_boolean (return_accu, signal_return);
2228   continue_emission = signal_return;
2229
2230   return continue_emission;
2231 }