GDBusWorker: debug on read errors if transport debugging is enabled
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: David Zeuthen <davidz@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <stdlib.h>
26 #include <string.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30
31 #include "giotypes.h"
32 #include "gsocket.h"
33 #include "gdbusprivate.h"
34 #include "gdbusmessage.h"
35 #include "gdbuserror.h"
36 #include "gdbusintrospection.h"
37 #include "gasyncresult.h"
38 #include "gsimpleasyncresult.h"
39 #include "ginputstream.h"
40 #include "gmemoryinputstream.h"
41 #include "giostream.h"
42 #include "gsocketcontrolmessage.h"
43 #include "gsocketconnection.h"
44 #include "gsocketoutputstream.h"
45
46 #ifdef G_OS_UNIX
47 #include "gunixfdmessage.h"
48 #include "gunixconnection.h"
49 #include "gunixcredentialsmessage.h"
50 #endif
51
52 #ifdef G_OS_WIN32
53 #include <windows.h>
54 #endif
55
56 #include "glibintl.h"
57
58 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
59
60 /* ---------------------------------------------------------------------------------------------------- */
61
62 gchar *
63 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
64 {
65  guint n, m;
66  GString *ret;
67
68  ret = g_string_new (NULL);
69
70  for (n = 0; n < len; n += 16)
71    {
72      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
73
74      for (m = n; m < n + 16; m++)
75        {
76          if (m > n && (m%4) == 0)
77            g_string_append_c (ret, ' ');
78          if (m < len)
79            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
80          else
81            g_string_append (ret, "   ");
82        }
83
84      g_string_append (ret, "   ");
85
86      for (m = n; m < len && m < n + 16; m++)
87        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
88
89      g_string_append_c (ret, '\n');
90    }
91
92  return g_string_free (ret, FALSE);
93 }
94
95 /* ---------------------------------------------------------------------------------------------------- */
96
97 /* Unfortunately ancillary messages are discarded when reading from a
98  * socket using the GSocketInputStream abstraction. So we provide a
99  * very GInputStream-ish API that uses GSocket in this case (very
100  * similar to GSocketInputStream).
101  */
102
103 typedef struct
104 {
105   GSocket *socket;
106   GCancellable *cancellable;
107
108   void *buffer;
109   gsize count;
110
111   GSocketControlMessage ***messages;
112   gint *num_messages;
113
114   GSimpleAsyncResult *simple;
115
116   gboolean from_mainloop;
117 } ReadWithControlData;
118
119 static void
120 read_with_control_data_free (ReadWithControlData *data)
121 {
122   g_object_unref (data->socket);
123   if (data->cancellable != NULL)
124     g_object_unref (data->cancellable);
125   g_object_unref (data->simple);
126   g_free (data);
127 }
128
129 static gboolean
130 _g_socket_read_with_control_messages_ready (GSocket      *socket,
131                                             GIOCondition  condition,
132                                             gpointer      user_data)
133 {
134   ReadWithControlData *data = user_data;
135   GError *error;
136   gssize result;
137   GInputVector vector;
138
139   error = NULL;
140   vector.buffer = data->buffer;
141   vector.size = data->count;
142   result = g_socket_receive_message (data->socket,
143                                      NULL, /* address */
144                                      &vector,
145                                      1,
146                                      data->messages,
147                                      data->num_messages,
148                                      NULL,
149                                      data->cancellable,
150                                      &error);
151   if (result >= 0)
152     {
153       g_simple_async_result_set_op_res_gssize (data->simple, result);
154     }
155   else
156     {
157       g_assert (error != NULL);
158       g_simple_async_result_take_error (data->simple, error);
159     }
160
161   if (data->from_mainloop)
162     g_simple_async_result_complete (data->simple);
163   else
164     g_simple_async_result_complete_in_idle (data->simple);
165
166   return FALSE;
167 }
168
169 static void
170 _g_socket_read_with_control_messages (GSocket                 *socket,
171                                       void                    *buffer,
172                                       gsize                    count,
173                                       GSocketControlMessage ***messages,
174                                       gint                    *num_messages,
175                                       gint                     io_priority,
176                                       GCancellable            *cancellable,
177                                       GAsyncReadyCallback      callback,
178                                       gpointer                 user_data)
179 {
180   ReadWithControlData *data;
181
182   data = g_new0 (ReadWithControlData, 1);
183   data->socket = g_object_ref (socket);
184   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
185   data->buffer = buffer;
186   data->count = count;
187   data->messages = messages;
188   data->num_messages = num_messages;
189
190   data->simple = g_simple_async_result_new (G_OBJECT (socket),
191                                             callback,
192                                             user_data,
193                                             _g_socket_read_with_control_messages);
194
195   if (!g_socket_condition_check (socket, G_IO_IN))
196     {
197       GSource *source;
198       data->from_mainloop = TRUE;
199       source = g_socket_create_source (data->socket,
200                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
201                                        cancellable);
202       g_source_set_callback (source,
203                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
204                              data,
205                              (GDestroyNotify) read_with_control_data_free);
206       g_source_attach (source, g_main_context_get_thread_default ());
207       g_source_unref (source);
208     }
209   else
210     {
211       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
212       read_with_control_data_free (data);
213     }
214 }
215
216 static gssize
217 _g_socket_read_with_control_messages_finish (GSocket       *socket,
218                                              GAsyncResult  *result,
219                                              GError       **error)
220 {
221   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
222
223   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
224   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
225
226   if (g_simple_async_result_propagate_error (simple, error))
227       return -1;
228   else
229     return g_simple_async_result_get_op_res_gssize (simple);
230 }
231
232 /* ---------------------------------------------------------------------------------------------------- */
233
234 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
235
236 static GPtrArray *ensured_classes = NULL;
237
238 static void
239 ensure_type (GType gtype)
240 {
241   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
242 }
243
244 static void
245 release_required_types (void)
246 {
247   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
248   g_ptr_array_unref (ensured_classes);
249   ensured_classes = NULL;
250 }
251
252 static void
253 ensure_required_types (void)
254 {
255   g_assert (ensured_classes == NULL);
256   ensured_classes = g_ptr_array_new ();
257   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
258   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
259 }
260 /* ---------------------------------------------------------------------------------------------------- */
261
262 typedef struct
263 {
264   volatile gint refcount;
265   GThread *thread;
266   GMainContext *context;
267   GMainLoop *loop;
268 } SharedThreadData;
269
270 static gpointer
271 gdbus_shared_thread_func (gpointer user_data)
272 {
273   SharedThreadData *data = user_data;
274
275   g_main_context_push_thread_default (data->context);
276   g_main_loop_run (data->loop);
277   g_main_context_pop_thread_default (data->context);
278
279   release_required_types ();
280
281   return NULL;
282 }
283
284 /* ---------------------------------------------------------------------------------------------------- */
285
286 static SharedThreadData *
287 _g_dbus_shared_thread_ref (void)
288 {
289   static gsize shared_thread_data = 0;
290   SharedThreadData *ret;
291
292   if (g_once_init_enter (&shared_thread_data))
293     {
294       SharedThreadData *data;
295
296       /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
297       ensure_required_types ();
298
299       data = g_new0 (SharedThreadData, 1);
300       data->refcount = 0;
301       
302       data->context = g_main_context_new ();
303       data->loop = g_main_loop_new (data->context, FALSE);
304       data->thread = g_thread_new ("gdbus",
305                                    gdbus_shared_thread_func,
306                                    data);
307       /* We can cast between gsize and gpointer safely */
308       g_once_init_leave (&shared_thread_data, (gsize) data);
309     }
310
311   ret = (SharedThreadData*) shared_thread_data;
312   g_atomic_int_inc (&ret->refcount);
313   return ret;
314 }
315
316 static void
317 _g_dbus_shared_thread_unref (SharedThreadData *data)
318 {
319   /* TODO: actually destroy the shared thread here */
320 #if 0
321   g_assert (data != NULL);
322   if (g_atomic_int_dec_and_test (&data->refcount))
323     {
324       g_main_loop_quit (data->loop);
325       //g_thread_join (data->thread);
326       g_main_loop_unref (data->loop);
327       g_main_context_unref (data->context);
328     }
329 #endif
330 }
331
332 /* ---------------------------------------------------------------------------------------------------- */
333
334 struct GDBusWorker
335 {
336   volatile gint                       ref_count;
337
338   SharedThreadData                   *shared_thread_data;
339
340   /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
341   volatile gint                       stopped;
342
343   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
344    * only affects messages received from the other peer (since GDBusServer is the
345    * only user) - we might want it to affect messages sent to the other peer too?
346    */
347   gboolean                            frozen;
348   GDBusCapabilityFlags                capabilities;
349   GQueue                             *received_messages_while_frozen;
350
351   GIOStream                          *stream;
352   GCancellable                       *cancellable;
353   GDBusWorkerMessageReceivedCallback  message_received_callback;
354   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
355   GDBusWorkerDisconnectedCallback     disconnected_callback;
356   gpointer                            user_data;
357
358   /* if not NULL, stream is GSocketConnection */
359   GSocket *socket;
360
361   /* used for reading */
362   GMutex                              read_lock;
363   gchar                              *read_buffer;
364   gsize                               read_buffer_allocated_size;
365   gsize                               read_buffer_cur_size;
366   gsize                               read_buffer_bytes_wanted;
367   GUnixFDList                        *read_fd_list;
368   GSocketControlMessage             **read_ancillary_messages;
369   gint                                read_num_ancillary_messages;
370
371   /* TRUE if an async write, flush or close is pending.
372    * Only the worker thread may change its value, and only with the write_lock.
373    * Other threads may read its value when holding the write_lock.
374    * The worker thread may read its value at any time.
375    */
376   gboolean                            output_pending;
377   /* used for writing */
378   GMutex                              write_lock;
379   /* queue of MessageToWriteData, protected by write_lock */
380   GQueue                             *write_queue;
381   /* protected by write_lock */
382   guint64                             write_num_messages_written;
383   /* list of FlushData, protected by write_lock */
384   GList                              *write_pending_flushes;
385   /* list of CloseData, protected by write_lock */
386   GList                              *pending_close_attempts;
387 };
388
389 static void _g_dbus_worker_unref (GDBusWorker *worker);
390
391 /* ---------------------------------------------------------------------------------------------------- */
392
393 typedef struct
394 {
395   GMutex  mutex;
396   GCond   cond;
397   guint64 number_to_wait_for;
398   GError *error;
399 } FlushData;
400
401 struct _MessageToWriteData ;
402 typedef struct _MessageToWriteData MessageToWriteData;
403
404 static void message_to_write_data_free (MessageToWriteData *data);
405
406 static void read_message_print_transport_debug (gssize bytes_read,
407                                                 GDBusWorker *worker);
408
409 static void write_message_print_transport_debug (gssize bytes_written,
410                                                  MessageToWriteData *data);
411
412 typedef struct {
413     GDBusWorker *worker;
414     GCancellable *cancellable;
415     GSimpleAsyncResult *result;
416 } CloseData;
417
418 static void close_data_free (CloseData *close_data)
419 {
420   if (close_data->cancellable != NULL)
421     g_object_unref (close_data->cancellable);
422
423   if (close_data->result != NULL)
424     g_object_unref (close_data->result);
425
426   _g_dbus_worker_unref (close_data->worker);
427   g_slice_free (CloseData, close_data);
428 }
429
430 /* ---------------------------------------------------------------------------------------------------- */
431
432 static GDBusWorker *
433 _g_dbus_worker_ref (GDBusWorker *worker)
434 {
435   g_atomic_int_inc (&worker->ref_count);
436   return worker;
437 }
438
439 static void
440 _g_dbus_worker_unref (GDBusWorker *worker)
441 {
442   if (g_atomic_int_dec_and_test (&worker->ref_count))
443     {
444       g_assert (worker->write_pending_flushes == NULL);
445
446       _g_dbus_shared_thread_unref (worker->shared_thread_data);
447
448       g_object_unref (worker->stream);
449
450       g_mutex_clear (&worker->read_lock);
451       g_object_unref (worker->cancellable);
452       if (worker->read_fd_list != NULL)
453         g_object_unref (worker->read_fd_list);
454
455       g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
456       g_queue_free (worker->received_messages_while_frozen);
457
458       g_mutex_clear (&worker->write_lock);
459       g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
460       g_queue_free (worker->write_queue);
461
462       g_free (worker->read_buffer);
463
464       g_free (worker);
465     }
466 }
467
468 static void
469 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
470                                   gboolean      remote_peer_vanished,
471                                   GError       *error)
472 {
473   if (!g_atomic_int_get (&worker->stopped))
474     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
475 }
476
477 static void
478 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
479                                       GDBusMessage *message)
480 {
481   if (!g_atomic_int_get (&worker->stopped))
482     worker->message_received_callback (worker, message, worker->user_data);
483 }
484
485 static GDBusMessage *
486 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
487                                               GDBusMessage *message)
488 {
489   GDBusMessage *ret;
490   if (!g_atomic_int_get (&worker->stopped))
491     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
492   else
493     ret = message;
494   return ret;
495 }
496
497 /* can only be called from private thread with read-lock held - takes ownership of @message */
498 static void
499 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
500                                                   GDBusMessage *message)
501 {
502   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
503     {
504       /* queue up */
505       g_queue_push_tail (worker->received_messages_while_frozen, message);
506     }
507   else
508     {
509       /* not frozen, nor anything in queue */
510       _g_dbus_worker_emit_message_received (worker, message);
511       g_object_unref (message);
512     }
513 }
514
515 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
516 static gboolean
517 unfreeze_in_idle_cb (gpointer user_data)
518 {
519   GDBusWorker *worker = user_data;
520   GDBusMessage *message;
521
522   g_mutex_lock (&worker->read_lock);
523   if (worker->frozen)
524     {
525       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
526         {
527           _g_dbus_worker_emit_message_received (worker, message);
528           g_object_unref (message);
529         }
530       worker->frozen = FALSE;
531     }
532   else
533     {
534       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
535     }
536   g_mutex_unlock (&worker->read_lock);
537   return FALSE;
538 }
539
540 /* can be called from any thread */
541 void
542 _g_dbus_worker_unfreeze (GDBusWorker *worker)
543 {
544   GSource *idle_source;
545   idle_source = g_idle_source_new ();
546   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
547   g_source_set_callback (idle_source,
548                          unfreeze_in_idle_cb,
549                          _g_dbus_worker_ref (worker),
550                          (GDestroyNotify) _g_dbus_worker_unref);
551   g_source_attach (idle_source, worker->shared_thread_data->context);
552   g_source_unref (idle_source);
553 }
554
555 /* ---------------------------------------------------------------------------------------------------- */
556
557 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
558
559 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
560 static void
561 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
562                            GAsyncResult  *res,
563                            gpointer       user_data)
564 {
565   GDBusWorker *worker = user_data;
566   GError *error;
567   gssize bytes_read;
568
569   g_mutex_lock (&worker->read_lock);
570
571   /* If already stopped, don't even process the reply */
572   if (g_atomic_int_get (&worker->stopped))
573     goto out;
574
575   error = NULL;
576   if (worker->socket == NULL)
577     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
578                                              res,
579                                              &error);
580   else
581     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
582                                                               res,
583                                                               &error);
584   if (worker->read_num_ancillary_messages > 0)
585     {
586       gint n;
587       for (n = 0; n < worker->read_num_ancillary_messages; n++)
588         {
589           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
590
591           if (FALSE)
592             {
593             }
594 #ifdef G_OS_UNIX
595           else if (G_IS_UNIX_FD_MESSAGE (control_message))
596             {
597               GUnixFDMessage *fd_message;
598               gint *fds;
599               gint num_fds;
600
601               fd_message = G_UNIX_FD_MESSAGE (control_message);
602               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
603               if (worker->read_fd_list == NULL)
604                 {
605                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
606                 }
607               else
608                 {
609                   gint n;
610                   for (n = 0; n < num_fds; n++)
611                     {
612                       /* TODO: really want a append_steal() */
613                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
614                       close (fds[n]);
615                     }
616                 }
617               g_free (fds);
618             }
619           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
620             {
621               /* do nothing */
622             }
623 #endif
624           else
625             {
626               if (error == NULL)
627                 {
628                   g_set_error (&error,
629                                G_IO_ERROR,
630                                G_IO_ERROR_FAILED,
631                                "Unexpected ancillary message of type %s received from peer",
632                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
633                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
634                   g_error_free (error);
635                   g_object_unref (control_message);
636                   n++;
637                   while (n < worker->read_num_ancillary_messages)
638                     g_object_unref (worker->read_ancillary_messages[n++]);
639                   g_free (worker->read_ancillary_messages);
640                   goto out;
641                 }
642             }
643           g_object_unref (control_message);
644         }
645       g_free (worker->read_ancillary_messages);
646     }
647
648   if (bytes_read == -1)
649     {
650       if (G_UNLIKELY (_g_dbus_debug_transport ()))
651         {
652           _g_dbus_debug_print_lock ();
653           g_print ("========================================================================\n"
654                    "GDBus-debug:Transport:\n"
655                    "  ---- READ ERROR on stream of type %s:\n"
656                    "  ---- %s %d: %s\n",
657                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
658                    g_quark_to_string (error->domain), error->code,
659                    error->message);
660           _g_dbus_debug_print_unlock ();
661         }
662
663       /* Every async read that uses this callback uses worker->cancellable
664        * as its GCancellable. worker->cancellable gets cancelled if and only
665        * if the GDBusConnection tells us to close (either via
666        * _g_dbus_worker_stop, which is called on last-unref, or directly),
667        * so a cancelled read must mean our connection was closed locally.
668        */
669       if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
670         _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
671       else
672         _g_dbus_worker_emit_disconnected (worker, TRUE, error);
673
674       g_error_free (error);
675       goto out;
676     }
677
678 #if 0
679   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
680            (gint) bytes_read,
681            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
682            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
683            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
684                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
685            worker->stream,
686            worker);
687 #endif
688
689   /* TODO: hmm, hmm... */
690   if (bytes_read == 0)
691     {
692       g_set_error (&error,
693                    G_IO_ERROR,
694                    G_IO_ERROR_FAILED,
695                    "Underlying GIOStream returned 0 bytes on an async read");
696       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
697       g_error_free (error);
698       goto out;
699     }
700
701   read_message_print_transport_debug (bytes_read, worker);
702
703   worker->read_buffer_cur_size += bytes_read;
704   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
705     {
706       /* OK, got what we asked for! */
707       if (worker->read_buffer_bytes_wanted == 16)
708         {
709           gssize message_len;
710           /* OK, got the header - determine how many more bytes are needed */
711           error = NULL;
712           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
713                                                      16,
714                                                      &error);
715           if (message_len == -1)
716             {
717               g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
718               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
719               g_error_free (error);
720               goto out;
721             }
722
723           worker->read_buffer_bytes_wanted = message_len;
724           _g_dbus_worker_do_read_unlocked (worker);
725         }
726       else
727         {
728           GDBusMessage *message;
729           error = NULL;
730
731           /* TODO: use connection->priv->auth to decode the message */
732
733           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
734                                                   worker->read_buffer_cur_size,
735                                                   worker->capabilities,
736                                                   &error);
737           if (message == NULL)
738             {
739               gchar *s;
740               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
741               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
742                          "The error is: %s\n"
743                          "The payload is as follows:\n"
744                          "%s\n",
745                          worker->read_buffer_cur_size,
746                          error->message,
747                          s);
748               g_free (s);
749               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
750               g_error_free (error);
751               goto out;
752             }
753
754 #ifdef G_OS_UNIX
755           if (worker->read_fd_list != NULL)
756             {
757               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
758               g_object_unref (worker->read_fd_list);
759               worker->read_fd_list = NULL;
760             }
761 #endif
762
763           if (G_UNLIKELY (_g_dbus_debug_message ()))
764             {
765               gchar *s;
766               _g_dbus_debug_print_lock ();
767               g_print ("========================================================================\n"
768                        "GDBus-debug:Message:\n"
769                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
770                        worker->read_buffer_cur_size);
771               s = g_dbus_message_print (message, 2);
772               g_print ("%s", s);
773               g_free (s);
774               if (G_UNLIKELY (_g_dbus_debug_payload ()))
775                 {
776                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
777                   g_print ("%s\n", s);
778                   g_free (s);
779                 }
780               _g_dbus_debug_print_unlock ();
781             }
782
783           /* yay, got a message, go deliver it */
784           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
785
786           /* start reading another message! */
787           worker->read_buffer_bytes_wanted = 0;
788           worker->read_buffer_cur_size = 0;
789           _g_dbus_worker_do_read_unlocked (worker);
790         }
791     }
792   else
793     {
794       /* didn't get all the bytes we requested - so repeat the request... */
795       _g_dbus_worker_do_read_unlocked (worker);
796     }
797
798  out:
799   g_mutex_unlock (&worker->read_lock);
800
801   /* gives up the reference acquired when calling g_input_stream_read_async() */
802   _g_dbus_worker_unref (worker);
803 }
804
805 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
806 static void
807 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
808 {
809   /* if bytes_wanted is zero, it means start reading a message */
810   if (worker->read_buffer_bytes_wanted == 0)
811     {
812       worker->read_buffer_cur_size = 0;
813       worker->read_buffer_bytes_wanted = 16;
814     }
815
816   /* ensure we have a (big enough) buffer */
817   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
818     {
819       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
820       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
821       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
822     }
823
824   if (worker->socket == NULL)
825     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
826                                worker->read_buffer + worker->read_buffer_cur_size,
827                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
828                                G_PRIORITY_DEFAULT,
829                                worker->cancellable,
830                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
831                                _g_dbus_worker_ref (worker));
832   else
833     {
834       worker->read_ancillary_messages = NULL;
835       worker->read_num_ancillary_messages = 0;
836       _g_socket_read_with_control_messages (worker->socket,
837                                             worker->read_buffer + worker->read_buffer_cur_size,
838                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
839                                             &worker->read_ancillary_messages,
840                                             &worker->read_num_ancillary_messages,
841                                             G_PRIORITY_DEFAULT,
842                                             worker->cancellable,
843                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
844                                             _g_dbus_worker_ref (worker));
845     }
846 }
847
848 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
849 static gboolean
850 _g_dbus_worker_do_initial_read (gpointer data)
851 {
852   GDBusWorker *worker = data;
853   g_mutex_lock (&worker->read_lock);
854   _g_dbus_worker_do_read_unlocked (worker);
855   g_mutex_unlock (&worker->read_lock);
856   return FALSE;
857 }
858
859 /* ---------------------------------------------------------------------------------------------------- */
860
861 struct _MessageToWriteData
862 {
863   GDBusWorker  *worker;
864   GDBusMessage *message;
865   gchar        *blob;
866   gsize         blob_size;
867
868   gsize               total_written;
869   GSimpleAsyncResult *simple;
870
871 };
872
873 static void
874 message_to_write_data_free (MessageToWriteData *data)
875 {
876   _g_dbus_worker_unref (data->worker);
877   if (data->message)
878     g_object_unref (data->message);
879   g_free (data->blob);
880   g_free (data);
881 }
882
883 /* ---------------------------------------------------------------------------------------------------- */
884
885 static void write_message_continue_writing (MessageToWriteData *data);
886
887 /* called in private thread shared by all GDBusConnection instances
888  *
889  * write-lock is not held on entry
890  * output_pending is true on entry
891  */
892 static void
893 write_message_async_cb (GObject      *source_object,
894                         GAsyncResult *res,
895                         gpointer      user_data)
896 {
897   MessageToWriteData *data = user_data;
898   GSimpleAsyncResult *simple;
899   gssize bytes_written;
900   GError *error;
901
902   /* Note: we can't access data->simple after calling g_async_result_complete () because the
903    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
904    */
905   simple = data->simple;
906
907   error = NULL;
908   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
909                                                 res,
910                                                 &error);
911   if (bytes_written == -1)
912     {
913       g_simple_async_result_take_error (simple, error);
914       g_simple_async_result_complete (simple);
915       g_object_unref (simple);
916       goto out;
917     }
918   g_assert (bytes_written > 0); /* zero is never returned */
919
920   write_message_print_transport_debug (bytes_written, data);
921
922   data->total_written += bytes_written;
923   g_assert (data->total_written <= data->blob_size);
924   if (data->total_written == data->blob_size)
925     {
926       g_simple_async_result_complete (simple);
927       g_object_unref (simple);
928       goto out;
929     }
930
931   write_message_continue_writing (data);
932
933  out:
934   ;
935 }
936
937 /* called in private thread shared by all GDBusConnection instances
938  *
939  * write-lock is not held on entry
940  * output_pending is true on entry
941  */
942 static gboolean
943 on_socket_ready (GSocket      *socket,
944                  GIOCondition  condition,
945                  gpointer      user_data)
946 {
947   MessageToWriteData *data = user_data;
948   write_message_continue_writing (data);
949   return FALSE; /* remove source */
950 }
951
952 /* called in private thread shared by all GDBusConnection instances
953  *
954  * write-lock is not held on entry
955  * output_pending is true on entry
956  */
957 static void
958 write_message_continue_writing (MessageToWriteData *data)
959 {
960   GOutputStream *ostream;
961   GSimpleAsyncResult *simple;
962 #ifdef G_OS_UNIX
963   GUnixFDList *fd_list;
964 #endif
965
966   /* Note: we can't access data->simple after calling g_async_result_complete () because the
967    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
968    */
969   simple = data->simple;
970
971   ostream = g_io_stream_get_output_stream (data->worker->stream);
972 #ifdef G_OS_UNIX
973   fd_list = g_dbus_message_get_unix_fd_list (data->message);
974 #endif
975
976   g_assert (!g_output_stream_has_pending (ostream));
977   g_assert_cmpint (data->total_written, <, data->blob_size);
978
979   if (FALSE)
980     {
981     }
982 #ifdef G_OS_UNIX
983   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
984     {
985       GOutputVector vector;
986       GSocketControlMessage *control_message;
987       gssize bytes_written;
988       GError *error;
989
990       vector.buffer = data->blob;
991       vector.size = data->blob_size;
992
993       control_message = NULL;
994       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
995         {
996           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
997             {
998               g_simple_async_result_set_error (simple,
999                                                G_IO_ERROR,
1000                                                G_IO_ERROR_FAILED,
1001                                                "Tried sending a file descriptor but remote peer does not support this capability");
1002               g_simple_async_result_complete (simple);
1003               g_object_unref (simple);
1004               goto out;
1005             }
1006           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
1007         }
1008
1009       error = NULL;
1010       bytes_written = g_socket_send_message (data->worker->socket,
1011                                              NULL, /* address */
1012                                              &vector,
1013                                              1,
1014                                              control_message != NULL ? &control_message : NULL,
1015                                              control_message != NULL ? 1 : 0,
1016                                              G_SOCKET_MSG_NONE,
1017                                              data->worker->cancellable,
1018                                              &error);
1019       if (control_message != NULL)
1020         g_object_unref (control_message);
1021
1022       if (bytes_written == -1)
1023         {
1024           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1025           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1026             {
1027               GSource *source;
1028               source = g_socket_create_source (data->worker->socket,
1029                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
1030                                                data->worker->cancellable);
1031               g_source_set_callback (source,
1032                                      (GSourceFunc) on_socket_ready,
1033                                      data,
1034                                      NULL); /* GDestroyNotify */
1035               g_source_attach (source, g_main_context_get_thread_default ());
1036               g_source_unref (source);
1037               g_error_free (error);
1038               goto out;
1039             }
1040           g_simple_async_result_take_error (simple, error);
1041           g_simple_async_result_complete (simple);
1042           g_object_unref (simple);
1043           goto out;
1044         }
1045       g_assert (bytes_written > 0); /* zero is never returned */
1046
1047       write_message_print_transport_debug (bytes_written, data);
1048
1049       data->total_written += bytes_written;
1050       g_assert (data->total_written <= data->blob_size);
1051       if (data->total_written == data->blob_size)
1052         {
1053           g_simple_async_result_complete (simple);
1054           g_object_unref (simple);
1055           goto out;
1056         }
1057
1058       write_message_continue_writing (data);
1059     }
1060 #endif
1061   else
1062     {
1063 #ifdef G_OS_UNIX
1064       if (fd_list != NULL)
1065         {
1066           g_simple_async_result_set_error (simple,
1067                                            G_IO_ERROR,
1068                                            G_IO_ERROR_FAILED,
1069                                            "Tried sending a file descriptor on unsupported stream of type %s",
1070                                            g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1071           g_simple_async_result_complete (simple);
1072           g_object_unref (simple);
1073           goto out;
1074         }
1075 #endif
1076
1077       g_output_stream_write_async (ostream,
1078                                    (const gchar *) data->blob + data->total_written,
1079                                    data->blob_size - data->total_written,
1080                                    G_PRIORITY_DEFAULT,
1081                                    data->worker->cancellable,
1082                                    write_message_async_cb,
1083                                    data);
1084     }
1085  out:
1086   ;
1087 }
1088
1089 /* called in private thread shared by all GDBusConnection instances
1090  *
1091  * write-lock is not held on entry
1092  * output_pending is true on entry
1093  */
1094 static void
1095 write_message_async (GDBusWorker         *worker,
1096                      MessageToWriteData  *data,
1097                      GAsyncReadyCallback  callback,
1098                      gpointer             user_data)
1099 {
1100   data->simple = g_simple_async_result_new (NULL,
1101                                             callback,
1102                                             user_data,
1103                                             write_message_async);
1104   data->total_written = 0;
1105   write_message_continue_writing (data);
1106 }
1107
1108 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1109 static gboolean
1110 write_message_finish (GAsyncResult   *res,
1111                       GError        **error)
1112 {
1113   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1114   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1115     return FALSE;
1116   else
1117     return TRUE;
1118 }
1119 /* ---------------------------------------------------------------------------------------------------- */
1120
1121 static void maybe_write_next_message (GDBusWorker *worker);
1122
1123 typedef struct
1124 {
1125   GDBusWorker *worker;
1126   GList *flushers;
1127 } FlushAsyncData;
1128
1129 static void
1130 flush_data_list_complete (const GList  *flushers,
1131                           const GError *error)
1132 {
1133   const GList *l;
1134
1135   for (l = flushers; l != NULL; l = l->next)
1136     {
1137       FlushData *f = l->data;
1138
1139       f->error = error != NULL ? g_error_copy (error) : NULL;
1140
1141       g_mutex_lock (&f->mutex);
1142       g_cond_signal (&f->cond);
1143       g_mutex_unlock (&f->mutex);
1144     }
1145 }
1146
1147 /* called in private thread shared by all GDBusConnection instances
1148  *
1149  * write-lock is not held on entry
1150  * output_pending is true on entry
1151  */
1152 static void
1153 ostream_flush_cb (GObject      *source_object,
1154                   GAsyncResult *res,
1155                   gpointer      user_data)
1156 {
1157   FlushAsyncData *data = user_data;
1158   GError *error;
1159
1160   error = NULL;
1161   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1162                                 res,
1163                                 &error);
1164
1165   if (error == NULL)
1166     {
1167       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1168         {
1169           _g_dbus_debug_print_lock ();
1170           g_print ("========================================================================\n"
1171                    "GDBus-debug:Transport:\n"
1172                    "  ---- FLUSHED stream of type %s\n",
1173                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1174           _g_dbus_debug_print_unlock ();
1175         }
1176     }
1177
1178   g_assert (data->flushers != NULL);
1179   flush_data_list_complete (data->flushers, error);
1180   g_list_free (data->flushers);
1181
1182   if (error != NULL)
1183     g_error_free (error);
1184
1185   /* Make sure we tell folks that we don't have additional
1186      flushes pending */
1187   g_mutex_lock (&data->worker->write_lock);
1188   g_assert (data->worker->output_pending);
1189   data->worker->output_pending = FALSE;
1190   g_mutex_unlock (&data->worker->write_lock);
1191
1192   /* OK, cool, finally kick off the next write */
1193   maybe_write_next_message (data->worker);
1194
1195   _g_dbus_worker_unref (data->worker);
1196   g_free (data);
1197 }
1198
1199 /* called in private thread shared by all GDBusConnection instances
1200  *
1201  * write-lock is not held on entry
1202  * output_pending is false on entry
1203  */
1204 static void
1205 message_written (GDBusWorker *worker,
1206                  MessageToWriteData *message_data)
1207 {
1208   GList *l;
1209   GList *ll;
1210   GList *flushers;
1211
1212   /* first log the fact that we wrote a message */
1213   if (G_UNLIKELY (_g_dbus_debug_message ()))
1214     {
1215       gchar *s;
1216       _g_dbus_debug_print_lock ();
1217       g_print ("========================================================================\n"
1218                "GDBus-debug:Message:\n"
1219                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1220                message_data->blob_size);
1221       s = g_dbus_message_print (message_data->message, 2);
1222       g_print ("%s", s);
1223       g_free (s);
1224       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1225         {
1226           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1227           g_print ("%s\n", s);
1228           g_free (s);
1229         }
1230       _g_dbus_debug_print_unlock ();
1231     }
1232
1233   /* then first wake up pending flushes and, if needed, flush the stream */
1234   flushers = NULL;
1235   g_mutex_lock (&worker->write_lock);
1236   worker->write_num_messages_written += 1;
1237   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1238     {
1239       FlushData *f = l->data;
1240       ll = l->next;
1241
1242       if (f->number_to_wait_for == worker->write_num_messages_written)
1243         {
1244           flushers = g_list_append (flushers, f);
1245           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1246         }
1247     }
1248   if (flushers != NULL)
1249     {
1250       g_assert (!worker->output_pending);
1251       worker->output_pending = TRUE;
1252     }
1253   g_mutex_unlock (&worker->write_lock);
1254
1255   if (flushers != NULL)
1256     {
1257       FlushAsyncData *data;
1258       data = g_new0 (FlushAsyncData, 1);
1259       data->worker = _g_dbus_worker_ref (worker);
1260       data->flushers = flushers;
1261       /* flush the stream before writing the next message */
1262       g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
1263                                    G_PRIORITY_DEFAULT,
1264                                    worker->cancellable,
1265                                    ostream_flush_cb,
1266                                    data);
1267     }
1268   else
1269     {
1270       /* kick off the next write! */
1271       maybe_write_next_message (worker);
1272     }
1273 }
1274
1275 /* called in private thread shared by all GDBusConnection instances
1276  *
1277  * write-lock is not held on entry
1278  * output_pending is true on entry
1279  */
1280 static void
1281 write_message_cb (GObject       *source_object,
1282                   GAsyncResult  *res,
1283                   gpointer       user_data)
1284 {
1285   MessageToWriteData *data = user_data;
1286   GError *error;
1287
1288   g_mutex_lock (&data->worker->write_lock);
1289   g_assert (data->worker->output_pending);
1290   data->worker->output_pending = FALSE;
1291   g_mutex_unlock (&data->worker->write_lock);
1292
1293   error = NULL;
1294   if (!write_message_finish (res, &error))
1295     {
1296       /* TODO: handle */
1297       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1298       g_error_free (error);
1299     }
1300
1301   /* this function will also kick of the next write (it might need to
1302    * flush so writing the next message might happen much later
1303    * e.g. async)
1304    */
1305   message_written (data->worker, data);
1306
1307   message_to_write_data_free (data);
1308 }
1309
1310 /* called in private thread shared by all GDBusConnection instances
1311  *
1312  * write-lock is not held on entry
1313  * output_pending is true on entry
1314  */
1315 static void
1316 iostream_close_cb (GObject      *source_object,
1317                    GAsyncResult *res,
1318                    gpointer      user_data)
1319 {
1320   GDBusWorker *worker = user_data;
1321   GError *error = NULL;
1322   GList *pending_close_attempts, *pending_flush_attempts;
1323   GQueue *send_queue;
1324
1325   g_io_stream_close_finish (worker->stream, res, &error);
1326
1327   g_mutex_lock (&worker->write_lock);
1328
1329   pending_close_attempts = worker->pending_close_attempts;
1330   worker->pending_close_attempts = NULL;
1331
1332   pending_flush_attempts = worker->write_pending_flushes;
1333   worker->write_pending_flushes = NULL;
1334
1335   send_queue = worker->write_queue;
1336   worker->write_queue = g_queue_new ();
1337
1338   g_assert (worker->output_pending);
1339   worker->output_pending = FALSE;
1340
1341   g_mutex_unlock (&worker->write_lock);
1342
1343   while (pending_close_attempts != NULL)
1344     {
1345       CloseData *close_data = pending_close_attempts->data;
1346
1347       pending_close_attempts = g_list_delete_link (pending_close_attempts,
1348                                                    pending_close_attempts);
1349
1350       if (close_data->result != NULL)
1351         {
1352           if (error != NULL)
1353             g_simple_async_result_set_from_error (close_data->result, error);
1354
1355           /* this must be in an idle because the result is likely to be
1356            * intended for another thread
1357            */
1358           g_simple_async_result_complete_in_idle (close_data->result);
1359         }
1360
1361       close_data_free (close_data);
1362     }
1363
1364   g_clear_error (&error);
1365
1366   /* all messages queued for sending are discarded */
1367   g_queue_foreach (send_queue, (GFunc) message_to_write_data_free, NULL);
1368   g_queue_free (send_queue);
1369
1370   /* all queued flushes fail */
1371   error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
1372                        _("Operation was cancelled"));
1373   flush_data_list_complete (pending_flush_attempts, error);
1374   g_list_free (pending_flush_attempts);
1375   g_clear_error (&error);
1376
1377   _g_dbus_worker_unref (worker);
1378 }
1379
1380 /* called in private thread shared by all GDBusConnection instances
1381  *
1382  * write-lock is not held on entry
1383  * output_pending must be false on entry
1384  */
1385 static void
1386 maybe_write_next_message (GDBusWorker *worker)
1387 {
1388   MessageToWriteData *data;
1389
1390  write_next:
1391   /* we mustn't try to write two things at once */
1392   g_assert (!worker->output_pending);
1393
1394   g_mutex_lock (&worker->write_lock);
1395
1396   /* if we want to close the connection, that takes precedence */
1397   if (worker->pending_close_attempts != NULL)
1398     {
1399       worker->output_pending = TRUE;
1400
1401       g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
1402                                NULL, iostream_close_cb,
1403                                _g_dbus_worker_ref (worker));
1404       data = NULL;
1405     }
1406   else
1407     {
1408       data = g_queue_pop_head (worker->write_queue);
1409
1410       if (data != NULL)
1411         worker->output_pending = TRUE;
1412     }
1413
1414   g_mutex_unlock (&worker->write_lock);
1415
1416   /* Note that write_lock is only used for protecting the @write_queue
1417    * and @output_pending fields of the GDBusWorker struct ... which we
1418    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1419    *
1420    * Therefore, it's fine to drop it here when calling back into user
1421    * code and then writing the message out onto the GIOStream since this
1422    * function only runs on the worker thread.
1423    */
1424   if (data != NULL)
1425     {
1426       GDBusMessage *old_message;
1427       guchar *new_blob;
1428       gsize new_blob_size;
1429       GError *error;
1430
1431       old_message = data->message;
1432       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1433       if (data->message == old_message)
1434         {
1435           /* filters had no effect - do nothing */
1436         }
1437       else if (data->message == NULL)
1438         {
1439           /* filters dropped message */
1440           g_mutex_lock (&worker->write_lock);
1441           worker->output_pending = FALSE;
1442           g_mutex_unlock (&worker->write_lock);
1443           message_to_write_data_free (data);
1444           goto write_next;
1445         }
1446       else
1447         {
1448           /* filters altered the message -> reencode */
1449           error = NULL;
1450           new_blob = g_dbus_message_to_blob (data->message,
1451                                              &new_blob_size,
1452                                              worker->capabilities,
1453                                              &error);
1454           if (new_blob == NULL)
1455             {
1456               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1457                * the old message instead
1458                */
1459               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1460                          g_dbus_message_get_serial (data->message),
1461                          error->message);
1462               g_error_free (error);
1463             }
1464           else
1465             {
1466               g_free (data->blob);
1467               data->blob = (gchar *) new_blob;
1468               data->blob_size = new_blob_size;
1469             }
1470         }
1471
1472       write_message_async (worker,
1473                            data,
1474                            write_message_cb,
1475                            data);
1476     }
1477 }
1478
1479 /* called in private thread shared by all GDBusConnection instances
1480  *
1481  * write-lock is not held on entry
1482  * output_pending may be true or false
1483  */
1484 static gboolean
1485 write_message_in_idle_cb (gpointer user_data)
1486 {
1487   GDBusWorker *worker = user_data;
1488
1489   /* Because this is the worker thread, we can read this struct member
1490    * without holding the lock: no other thread ever modifies it.
1491    */
1492   if (!worker->output_pending)
1493     maybe_write_next_message (worker);
1494
1495   return FALSE;
1496 }
1497
1498 /*
1499  * @write_data: (transfer full) (allow-none):
1500  * @close_data: (transfer full) (allow-none):
1501  *
1502  * Can be called from any thread
1503  *
1504  * write_lock is not held on entry
1505  * output_pending may be true or false
1506  */
1507 static void
1508 schedule_write_in_worker_thread (GDBusWorker        *worker,
1509                                  MessageToWriteData *write_data,
1510                                  CloseData          *close_data)
1511 {
1512   g_mutex_lock (&worker->write_lock);
1513
1514   if (write_data != NULL)
1515     g_queue_push_tail (worker->write_queue, write_data);
1516
1517   if (close_data != NULL)
1518     worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
1519                                                      close_data);
1520
1521   if (!worker->output_pending)
1522     {
1523       GSource *idle_source;
1524       idle_source = g_idle_source_new ();
1525       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1526       g_source_set_callback (idle_source,
1527                              write_message_in_idle_cb,
1528                              _g_dbus_worker_ref (worker),
1529                              (GDestroyNotify) _g_dbus_worker_unref);
1530       g_source_attach (idle_source, worker->shared_thread_data->context);
1531       g_source_unref (idle_source);
1532     }
1533
1534   g_mutex_unlock (&worker->write_lock);
1535 }
1536
1537 /* ---------------------------------------------------------------------------------------------------- */
1538
1539 /* can be called from any thread - steals blob
1540  *
1541  * write_lock is not held on entry
1542  * output_pending may be true or false
1543  */
1544 void
1545 _g_dbus_worker_send_message (GDBusWorker    *worker,
1546                              GDBusMessage   *message,
1547                              gchar          *blob,
1548                              gsize           blob_len)
1549 {
1550   MessageToWriteData *data;
1551
1552   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1553   g_return_if_fail (blob != NULL);
1554   g_return_if_fail (blob_len > 16);
1555
1556   data = g_new0 (MessageToWriteData, 1);
1557   data->worker = _g_dbus_worker_ref (worker);
1558   data->message = g_object_ref (message);
1559   data->blob = blob; /* steal! */
1560   data->blob_size = blob_len;
1561
1562   schedule_write_in_worker_thread (worker, data, NULL);
1563 }
1564
1565 /* ---------------------------------------------------------------------------------------------------- */
1566
1567 GDBusWorker *
1568 _g_dbus_worker_new (GIOStream                              *stream,
1569                     GDBusCapabilityFlags                    capabilities,
1570                     gboolean                                initially_frozen,
1571                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1572                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1573                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1574                     gpointer                                user_data)
1575 {
1576   GDBusWorker *worker;
1577   GSource *idle_source;
1578
1579   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1580   g_return_val_if_fail (message_received_callback != NULL, NULL);
1581   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1582   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1583
1584   worker = g_new0 (GDBusWorker, 1);
1585   worker->ref_count = 1;
1586
1587   g_mutex_init (&worker->read_lock);
1588   worker->message_received_callback = message_received_callback;
1589   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1590   worker->disconnected_callback = disconnected_callback;
1591   worker->user_data = user_data;
1592   worker->stream = g_object_ref (stream);
1593   worker->capabilities = capabilities;
1594   worker->cancellable = g_cancellable_new ();
1595   worker->output_pending = FALSE;
1596
1597   worker->frozen = initially_frozen;
1598   worker->received_messages_while_frozen = g_queue_new ();
1599
1600   g_mutex_init (&worker->write_lock);
1601   worker->write_queue = g_queue_new ();
1602
1603   if (G_IS_SOCKET_CONNECTION (worker->stream))
1604     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1605
1606   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1607
1608   /* begin reading */
1609   idle_source = g_idle_source_new ();
1610   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1611   g_source_set_callback (idle_source,
1612                          _g_dbus_worker_do_initial_read,
1613                          _g_dbus_worker_ref (worker),
1614                          (GDestroyNotify) _g_dbus_worker_unref);
1615   g_source_attach (idle_source, worker->shared_thread_data->context);
1616   g_source_unref (idle_source);
1617
1618   return worker;
1619 }
1620
1621 /* ---------------------------------------------------------------------------------------------------- */
1622
1623 /* can be called from any thread
1624  *
1625  * write_lock is not held on entry
1626  * output_pending may be true or false
1627  */
1628 void
1629 _g_dbus_worker_close (GDBusWorker         *worker,
1630                       GCancellable        *cancellable,
1631                       GSimpleAsyncResult  *result)
1632 {
1633   CloseData *close_data;
1634
1635   close_data = g_slice_new0 (CloseData);
1636   close_data->worker = _g_dbus_worker_ref (worker);
1637   close_data->cancellable =
1638       (cancellable == NULL ? NULL : g_object_ref (cancellable));
1639   close_data->result = (result == NULL ? NULL : g_object_ref (result));
1640
1641   g_cancellable_cancel (worker->cancellable);
1642   schedule_write_in_worker_thread (worker, NULL, close_data);
1643 }
1644
1645 /* This can be called from any thread - frees worker. Note that
1646  * callbacks might still happen if called from another thread than the
1647  * worker - use your own synchronization primitive in the callbacks.
1648  *
1649  * write_lock is not held on entry
1650  * output_pending may be true or false
1651  */
1652 void
1653 _g_dbus_worker_stop (GDBusWorker *worker)
1654 {
1655   g_atomic_int_set (&worker->stopped, TRUE);
1656
1657   /* Cancel any pending operations and schedule a close of the underlying I/O
1658    * stream in the worker thread
1659    */
1660   _g_dbus_worker_close (worker, NULL, NULL);
1661
1662   /* _g_dbus_worker_close holds a ref until after an idle in the the worker
1663    * thread has run, so we no longer need to unref in an idle like in
1664    * commit 322e25b535
1665    */
1666   _g_dbus_worker_unref (worker);
1667 }
1668
1669 /* ---------------------------------------------------------------------------------------------------- */
1670
1671 /* can be called from any thread (except the worker thread) - blocks
1672  * calling thread until all queued outgoing messages are written and
1673  * the transport has been flushed
1674  *
1675  * write_lock is not held on entry
1676  * output_pending may be true or false
1677  */
1678 gboolean
1679 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1680                            GCancellable   *cancellable,
1681                            GError        **error)
1682 {
1683   gboolean ret;
1684   FlushData *data;
1685
1686   data = NULL;
1687   ret = TRUE;
1688
1689   /* if the queue is empty, there's nothing to wait for */
1690   g_mutex_lock (&worker->write_lock);
1691   if (g_queue_get_length (worker->write_queue) > 0)
1692     {
1693       data = g_new0 (FlushData, 1);
1694       g_mutex_init (&data->mutex);
1695       g_cond_init (&data->cond);
1696       data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
1697       g_mutex_lock (&data->mutex);
1698       worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
1699     }
1700   g_mutex_unlock (&worker->write_lock);
1701
1702   if (data != NULL)
1703     {
1704       g_cond_wait (&data->cond, &data->mutex);
1705       g_mutex_unlock (&data->mutex);
1706
1707       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1708       g_cond_clear (&data->cond);
1709       g_mutex_clear (&data->mutex);
1710       if (data->error != NULL)
1711         {
1712           ret = FALSE;
1713           g_propagate_error (error, data->error);
1714         }
1715       g_free (data);
1716     }
1717
1718   return ret;
1719 }
1720
1721 /* ---------------------------------------------------------------------------------------------------- */
1722
1723 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1724 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1725 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1726 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1727 #define G_DBUS_DEBUG_CALL           (1<<4)
1728 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1729 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1730 #define G_DBUS_DEBUG_RETURN         (1<<7)
1731 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1732 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1733
1734 static gint _gdbus_debug_flags = 0;
1735
1736 gboolean
1737 _g_dbus_debug_authentication (void)
1738 {
1739   _g_dbus_initialize ();
1740   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1741 }
1742
1743 gboolean
1744 _g_dbus_debug_transport (void)
1745 {
1746   _g_dbus_initialize ();
1747   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1748 }
1749
1750 gboolean
1751 _g_dbus_debug_message (void)
1752 {
1753   _g_dbus_initialize ();
1754   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1755 }
1756
1757 gboolean
1758 _g_dbus_debug_payload (void)
1759 {
1760   _g_dbus_initialize ();
1761   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1762 }
1763
1764 gboolean
1765 _g_dbus_debug_call (void)
1766 {
1767   _g_dbus_initialize ();
1768   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1769 }
1770
1771 gboolean
1772 _g_dbus_debug_signal (void)
1773 {
1774   _g_dbus_initialize ();
1775   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1776 }
1777
1778 gboolean
1779 _g_dbus_debug_incoming (void)
1780 {
1781   _g_dbus_initialize ();
1782   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1783 }
1784
1785 gboolean
1786 _g_dbus_debug_return (void)
1787 {
1788   _g_dbus_initialize ();
1789   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1790 }
1791
1792 gboolean
1793 _g_dbus_debug_emission (void)
1794 {
1795   _g_dbus_initialize ();
1796   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1797 }
1798
1799 gboolean
1800 _g_dbus_debug_address (void)
1801 {
1802   _g_dbus_initialize ();
1803   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1804 }
1805
1806 G_LOCK_DEFINE_STATIC (print_lock);
1807
1808 void
1809 _g_dbus_debug_print_lock (void)
1810 {
1811   G_LOCK (print_lock);
1812 }
1813
1814 void
1815 _g_dbus_debug_print_unlock (void)
1816 {
1817   G_UNLOCK (print_lock);
1818 }
1819
1820 /*
1821  * _g_dbus_initialize:
1822  *
1823  * Does various one-time init things such as
1824  *
1825  *  - registering the G_DBUS_ERROR error domain
1826  *  - parses the G_DBUS_DEBUG environment variable
1827  */
1828 void
1829 _g_dbus_initialize (void)
1830 {
1831   static volatile gsize initialized = 0;
1832
1833   if (g_once_init_enter (&initialized))
1834     {
1835       volatile GQuark g_dbus_error_domain;
1836       const gchar *debug;
1837
1838       g_dbus_error_domain = G_DBUS_ERROR;
1839       (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
1840
1841       debug = g_getenv ("G_DBUS_DEBUG");
1842       if (debug != NULL)
1843         {
1844           const GDebugKey keys[] = {
1845             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1846             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
1847             { "message",        G_DBUS_DEBUG_MESSAGE        },
1848             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
1849             { "call",           G_DBUS_DEBUG_CALL           },
1850             { "signal",         G_DBUS_DEBUG_SIGNAL         },
1851             { "incoming",       G_DBUS_DEBUG_INCOMING       },
1852             { "return",         G_DBUS_DEBUG_RETURN         },
1853             { "emission",       G_DBUS_DEBUG_EMISSION       },
1854             { "address",        G_DBUS_DEBUG_ADDRESS        }
1855           };
1856
1857           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1858           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1859             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1860         }
1861
1862       g_once_init_leave (&initialized, 1);
1863     }
1864 }
1865
1866 /* ---------------------------------------------------------------------------------------------------- */
1867
1868 GVariantType *
1869 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1870 {
1871   const GVariantType *arg_types[256];
1872   guint n;
1873
1874   if (args)
1875     for (n = 0; args[n] != NULL; n++)
1876       {
1877         /* DBus places a hard limit of 255 on signature length.
1878          * therefore number of args must be less than 256.
1879          */
1880         g_assert (n < 256);
1881
1882         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1883
1884         if G_UNLIKELY (arg_types[n] == NULL)
1885           return NULL;
1886       }
1887   else
1888     n = 0;
1889
1890   return g_variant_type_new_tuple (arg_types, n);
1891 }
1892
1893 /* ---------------------------------------------------------------------------------------------------- */
1894
1895 #ifdef G_OS_WIN32
1896
1897 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1898
1899 gchar *
1900 _g_dbus_win32_get_user_sid (void)
1901 {
1902   HANDLE h;
1903   TOKEN_USER *user;
1904   DWORD token_information_len;
1905   PSID psid;
1906   gchar *sid;
1907   gchar *ret;
1908
1909   ret = NULL;
1910   user = NULL;
1911   h = INVALID_HANDLE_VALUE;
1912
1913   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
1914     {
1915       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
1916       goto out;
1917     }
1918
1919   /* Get length of buffer */
1920   token_information_len = 0;
1921   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
1922     {
1923       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
1924         {
1925           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1926           goto out;
1927         }
1928     }
1929   user = g_malloc (token_information_len);
1930   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
1931     {
1932       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1933       goto out;
1934     }
1935
1936   psid = user->User.Sid;
1937   if (!IsValidSid (psid))
1938     {
1939       g_warning ("Invalid SID");
1940       goto out;
1941     }
1942
1943   if (!ConvertSidToStringSidA (psid, &sid))
1944     {
1945       g_warning ("Invalid SID");
1946       goto out;
1947     }
1948
1949   ret = g_strdup (sid);
1950   LocalFree (sid);
1951
1952 out:
1953   g_free (user);
1954   if (h != INVALID_HANDLE_VALUE)
1955     CloseHandle (h);
1956   return ret;
1957 }
1958 #endif
1959
1960 /* ---------------------------------------------------------------------------------------------------- */
1961
1962 gchar *
1963 _g_dbus_get_machine_id (GError **error)
1964 {
1965   gchar *ret;
1966   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
1967   ret = NULL;
1968   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
1969                             &ret,
1970                             NULL,
1971                             error))
1972     {
1973       g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: "));
1974     }
1975   else
1976     {
1977       /* TODO: validate value */
1978       g_strstrip (ret);
1979     }
1980   return ret;
1981 }
1982
1983 /* ---------------------------------------------------------------------------------------------------- */
1984
1985 gchar *
1986 _g_dbus_enum_to_string (GType enum_type, gint value)
1987 {
1988   gchar *ret;
1989   GEnumClass *klass;
1990   GEnumValue *enum_value;
1991
1992   klass = g_type_class_ref (enum_type);
1993   enum_value = g_enum_get_value (klass, value);
1994   if (enum_value != NULL)
1995     ret = g_strdup (enum_value->value_nick);
1996   else
1997     ret = g_strdup_printf ("unknown (value %d)", value);
1998   g_type_class_unref (klass);
1999   return ret;
2000 }
2001
2002 /* ---------------------------------------------------------------------------------------------------- */
2003
2004 static void
2005 write_message_print_transport_debug (gssize bytes_written,
2006                                      MessageToWriteData *data)
2007 {
2008   if (G_LIKELY (!_g_dbus_debug_transport ()))
2009     goto out;
2010
2011   _g_dbus_debug_print_lock ();
2012   g_print ("========================================================================\n"
2013            "GDBus-debug:Transport:\n"
2014            "  >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
2015            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
2016            bytes_written,
2017            g_dbus_message_get_serial (data->message),
2018            data->blob_size,
2019            data->total_written,
2020            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
2021   _g_dbus_debug_print_unlock ();
2022  out:
2023   ;
2024 }
2025
2026 /* ---------------------------------------------------------------------------------------------------- */
2027
2028 static void
2029 read_message_print_transport_debug (gssize bytes_read,
2030                                     GDBusWorker *worker)
2031 {
2032   gsize size;
2033   gint32 serial;
2034   gint32 message_length;
2035
2036   if (G_LIKELY (!_g_dbus_debug_transport ()))
2037     goto out;
2038
2039   size = bytes_read + worker->read_buffer_cur_size;
2040   serial = 0;
2041   message_length = 0;
2042   if (size >= 16)
2043     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
2044   if (size >= 1)
2045     {
2046       switch (worker->read_buffer[0])
2047         {
2048         case 'l':
2049           if (size >= 12)
2050             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
2051           break;
2052         case 'B':
2053           if (size >= 12)
2054             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
2055           break;
2056         default:
2057           /* an error will be set elsewhere if this happens */
2058           goto out;
2059         }
2060     }
2061
2062     _g_dbus_debug_print_lock ();
2063   g_print ("========================================================================\n"
2064            "GDBus-debug:Transport:\n"
2065            "  <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
2066            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
2067            bytes_read,
2068            serial,
2069            message_length,
2070            worker->read_buffer_cur_size,
2071            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
2072   _g_dbus_debug_print_unlock ();
2073  out:
2074   ;
2075 }
2076
2077 /* ---------------------------------------------------------------------------------------------------- */
2078
2079 gboolean
2080 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
2081                                      GValue                *return_accu,
2082                                      const GValue          *handler_return,
2083                                      gpointer               dummy)
2084 {
2085   gboolean continue_emission;
2086   gboolean signal_return;
2087
2088   signal_return = g_value_get_boolean (handler_return);
2089   g_value_set_boolean (return_accu, signal_return);
2090   continue_emission = signal_return;
2091
2092   return continue_emission;
2093 }