GDBusWorker: tolerate read errors while closing
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: David Zeuthen <davidz@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <stdlib.h>
26 #include <string.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30
31 #include "giotypes.h"
32 #include "gsocket.h"
33 #include "gdbusprivate.h"
34 #include "gdbusmessage.h"
35 #include "gdbuserror.h"
36 #include "gdbusintrospection.h"
37 #include "gasyncresult.h"
38 #include "gsimpleasyncresult.h"
39 #include "ginputstream.h"
40 #include "gmemoryinputstream.h"
41 #include "giostream.h"
42 #include "gsocketcontrolmessage.h"
43 #include "gsocketconnection.h"
44 #include "gsocketoutputstream.h"
45
46 #ifdef G_OS_UNIX
47 #include "gunixfdmessage.h"
48 #include "gunixconnection.h"
49 #include "gunixcredentialsmessage.h"
50 #endif
51
52 #ifdef G_OS_WIN32
53 #include <windows.h>
54 #endif
55
56 #include "glibintl.h"
57
58 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
59
60 /* ---------------------------------------------------------------------------------------------------- */
61
62 gchar *
63 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
64 {
65  guint n, m;
66  GString *ret;
67
68  ret = g_string_new (NULL);
69
70  for (n = 0; n < len; n += 16)
71    {
72      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
73
74      for (m = n; m < n + 16; m++)
75        {
76          if (m > n && (m%4) == 0)
77            g_string_append_c (ret, ' ');
78          if (m < len)
79            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
80          else
81            g_string_append (ret, "   ");
82        }
83
84      g_string_append (ret, "   ");
85
86      for (m = n; m < len && m < n + 16; m++)
87        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
88
89      g_string_append_c (ret, '\n');
90    }
91
92  return g_string_free (ret, FALSE);
93 }
94
95 /* ---------------------------------------------------------------------------------------------------- */
96
97 /* Unfortunately ancillary messages are discarded when reading from a
98  * socket using the GSocketInputStream abstraction. So we provide a
99  * very GInputStream-ish API that uses GSocket in this case (very
100  * similar to GSocketInputStream).
101  */
102
103 typedef struct
104 {
105   GSocket *socket;
106   GCancellable *cancellable;
107
108   void *buffer;
109   gsize count;
110
111   GSocketControlMessage ***messages;
112   gint *num_messages;
113
114   GSimpleAsyncResult *simple;
115
116   gboolean from_mainloop;
117 } ReadWithControlData;
118
119 static void
120 read_with_control_data_free (ReadWithControlData *data)
121 {
122   g_object_unref (data->socket);
123   if (data->cancellable != NULL)
124     g_object_unref (data->cancellable);
125   g_object_unref (data->simple);
126   g_free (data);
127 }
128
129 static gboolean
130 _g_socket_read_with_control_messages_ready (GSocket      *socket,
131                                             GIOCondition  condition,
132                                             gpointer      user_data)
133 {
134   ReadWithControlData *data = user_data;
135   GError *error;
136   gssize result;
137   GInputVector vector;
138
139   error = NULL;
140   vector.buffer = data->buffer;
141   vector.size = data->count;
142   result = g_socket_receive_message (data->socket,
143                                      NULL, /* address */
144                                      &vector,
145                                      1,
146                                      data->messages,
147                                      data->num_messages,
148                                      NULL,
149                                      data->cancellable,
150                                      &error);
151   if (result >= 0)
152     {
153       g_simple_async_result_set_op_res_gssize (data->simple, result);
154     }
155   else
156     {
157       g_assert (error != NULL);
158       g_simple_async_result_take_error (data->simple, error);
159     }
160
161   if (data->from_mainloop)
162     g_simple_async_result_complete (data->simple);
163   else
164     g_simple_async_result_complete_in_idle (data->simple);
165
166   return FALSE;
167 }
168
169 static void
170 _g_socket_read_with_control_messages (GSocket                 *socket,
171                                       void                    *buffer,
172                                       gsize                    count,
173                                       GSocketControlMessage ***messages,
174                                       gint                    *num_messages,
175                                       gint                     io_priority,
176                                       GCancellable            *cancellable,
177                                       GAsyncReadyCallback      callback,
178                                       gpointer                 user_data)
179 {
180   ReadWithControlData *data;
181
182   data = g_new0 (ReadWithControlData, 1);
183   data->socket = g_object_ref (socket);
184   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
185   data->buffer = buffer;
186   data->count = count;
187   data->messages = messages;
188   data->num_messages = num_messages;
189
190   data->simple = g_simple_async_result_new (G_OBJECT (socket),
191                                             callback,
192                                             user_data,
193                                             _g_socket_read_with_control_messages);
194
195   if (!g_socket_condition_check (socket, G_IO_IN))
196     {
197       GSource *source;
198       data->from_mainloop = TRUE;
199       source = g_socket_create_source (data->socket,
200                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
201                                        cancellable);
202       g_source_set_callback (source,
203                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
204                              data,
205                              (GDestroyNotify) read_with_control_data_free);
206       g_source_attach (source, g_main_context_get_thread_default ());
207       g_source_unref (source);
208     }
209   else
210     {
211       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
212       read_with_control_data_free (data);
213     }
214 }
215
216 static gssize
217 _g_socket_read_with_control_messages_finish (GSocket       *socket,
218                                              GAsyncResult  *result,
219                                              GError       **error)
220 {
221   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
222
223   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
224   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
225
226   if (g_simple_async_result_propagate_error (simple, error))
227       return -1;
228   else
229     return g_simple_async_result_get_op_res_gssize (simple);
230 }
231
232 /* ---------------------------------------------------------------------------------------------------- */
233
234 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
235
236 static GPtrArray *ensured_classes = NULL;
237
238 static void
239 ensure_type (GType gtype)
240 {
241   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
242 }
243
244 static void
245 release_required_types (void)
246 {
247   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
248   g_ptr_array_unref (ensured_classes);
249   ensured_classes = NULL;
250 }
251
252 static void
253 ensure_required_types (void)
254 {
255   g_assert (ensured_classes == NULL);
256   ensured_classes = g_ptr_array_new ();
257   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
258   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
259 }
260 /* ---------------------------------------------------------------------------------------------------- */
261
262 typedef struct
263 {
264   volatile gint refcount;
265   GThread *thread;
266   GMainContext *context;
267   GMainLoop *loop;
268 } SharedThreadData;
269
270 static gpointer
271 gdbus_shared_thread_func (gpointer user_data)
272 {
273   SharedThreadData *data = user_data;
274
275   g_main_context_push_thread_default (data->context);
276   g_main_loop_run (data->loop);
277   g_main_context_pop_thread_default (data->context);
278
279   release_required_types ();
280
281   return NULL;
282 }
283
284 /* ---------------------------------------------------------------------------------------------------- */
285
286 static SharedThreadData *
287 _g_dbus_shared_thread_ref (void)
288 {
289   static gsize shared_thread_data = 0;
290   SharedThreadData *ret;
291
292   if (g_once_init_enter (&shared_thread_data))
293     {
294       SharedThreadData *data;
295
296       /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
297       ensure_required_types ();
298
299       data = g_new0 (SharedThreadData, 1);
300       data->refcount = 0;
301       
302       data->context = g_main_context_new ();
303       data->loop = g_main_loop_new (data->context, FALSE);
304       data->thread = g_thread_new ("gdbus",
305                                    gdbus_shared_thread_func,
306                                    data);
307       /* We can cast between gsize and gpointer safely */
308       g_once_init_leave (&shared_thread_data, (gsize) data);
309     }
310
311   ret = (SharedThreadData*) shared_thread_data;
312   g_atomic_int_inc (&ret->refcount);
313   return ret;
314 }
315
316 static void
317 _g_dbus_shared_thread_unref (SharedThreadData *data)
318 {
319   /* TODO: actually destroy the shared thread here */
320 #if 0
321   g_assert (data != NULL);
322   if (g_atomic_int_dec_and_test (&data->refcount))
323     {
324       g_main_loop_quit (data->loop);
325       //g_thread_join (data->thread);
326       g_main_loop_unref (data->loop);
327       g_main_context_unref (data->context);
328     }
329 #endif
330 }
331
332 /* ---------------------------------------------------------------------------------------------------- */
333
334 struct GDBusWorker
335 {
336   volatile gint                       ref_count;
337
338   SharedThreadData                   *shared_thread_data;
339
340   /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
341   volatile gint                       stopped;
342
343   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
344    * only affects messages received from the other peer (since GDBusServer is the
345    * only user) - we might want it to affect messages sent to the other peer too?
346    */
347   gboolean                            frozen;
348   GDBusCapabilityFlags                capabilities;
349   GQueue                             *received_messages_while_frozen;
350
351   GIOStream                          *stream;
352   GCancellable                       *cancellable;
353   GDBusWorkerMessageReceivedCallback  message_received_callback;
354   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
355   GDBusWorkerDisconnectedCallback     disconnected_callback;
356   gpointer                            user_data;
357
358   /* if not NULL, stream is GSocketConnection */
359   GSocket *socket;
360
361   /* used for reading */
362   GMutex                              read_lock;
363   gchar                              *read_buffer;
364   gsize                               read_buffer_allocated_size;
365   gsize                               read_buffer_cur_size;
366   gsize                               read_buffer_bytes_wanted;
367   GUnixFDList                        *read_fd_list;
368   GSocketControlMessage             **read_ancillary_messages;
369   gint                                read_num_ancillary_messages;
370
371   /* TRUE if an async write, flush or close is pending.
372    * Only the worker thread may change its value, and only with the write_lock.
373    * Other threads may read its value when holding the write_lock.
374    * The worker thread may read its value at any time.
375    */
376   gboolean                            output_pending;
377   /* used for writing */
378   GMutex                              write_lock;
379   /* queue of MessageToWriteData, protected by write_lock */
380   GQueue                             *write_queue;
381   /* protected by write_lock */
382   guint64                             write_num_messages_written;
383   /* list of FlushData, protected by write_lock */
384   GList                              *write_pending_flushes;
385   /* list of CloseData, protected by write_lock */
386   GList                              *pending_close_attempts;
387   /* no lock - only used from the worker thread */
388   gboolean                            close_expected;
389 };
390
391 static void _g_dbus_worker_unref (GDBusWorker *worker);
392
393 /* ---------------------------------------------------------------------------------------------------- */
394
395 typedef struct
396 {
397   GMutex  mutex;
398   GCond   cond;
399   guint64 number_to_wait_for;
400   GError *error;
401 } FlushData;
402
403 struct _MessageToWriteData ;
404 typedef struct _MessageToWriteData MessageToWriteData;
405
406 static void message_to_write_data_free (MessageToWriteData *data);
407
408 static void read_message_print_transport_debug (gssize bytes_read,
409                                                 GDBusWorker *worker);
410
411 static void write_message_print_transport_debug (gssize bytes_written,
412                                                  MessageToWriteData *data);
413
414 typedef struct {
415     GDBusWorker *worker;
416     GCancellable *cancellable;
417     GSimpleAsyncResult *result;
418 } CloseData;
419
420 static void close_data_free (CloseData *close_data)
421 {
422   if (close_data->cancellable != NULL)
423     g_object_unref (close_data->cancellable);
424
425   if (close_data->result != NULL)
426     g_object_unref (close_data->result);
427
428   _g_dbus_worker_unref (close_data->worker);
429   g_slice_free (CloseData, close_data);
430 }
431
432 /* ---------------------------------------------------------------------------------------------------- */
433
434 static GDBusWorker *
435 _g_dbus_worker_ref (GDBusWorker *worker)
436 {
437   g_atomic_int_inc (&worker->ref_count);
438   return worker;
439 }
440
441 static void
442 _g_dbus_worker_unref (GDBusWorker *worker)
443 {
444   if (g_atomic_int_dec_and_test (&worker->ref_count))
445     {
446       g_assert (worker->write_pending_flushes == NULL);
447
448       _g_dbus_shared_thread_unref (worker->shared_thread_data);
449
450       g_object_unref (worker->stream);
451
452       g_mutex_clear (&worker->read_lock);
453       g_object_unref (worker->cancellable);
454       if (worker->read_fd_list != NULL)
455         g_object_unref (worker->read_fd_list);
456
457       g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
458       g_queue_free (worker->received_messages_while_frozen);
459
460       g_mutex_clear (&worker->write_lock);
461       g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
462       g_queue_free (worker->write_queue);
463
464       g_free (worker->read_buffer);
465
466       g_free (worker);
467     }
468 }
469
470 static void
471 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
472                                   gboolean      remote_peer_vanished,
473                                   GError       *error)
474 {
475   if (!g_atomic_int_get (&worker->stopped))
476     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
477 }
478
479 static void
480 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
481                                       GDBusMessage *message)
482 {
483   if (!g_atomic_int_get (&worker->stopped))
484     worker->message_received_callback (worker, message, worker->user_data);
485 }
486
487 static GDBusMessage *
488 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
489                                               GDBusMessage *message)
490 {
491   GDBusMessage *ret;
492   if (!g_atomic_int_get (&worker->stopped))
493     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
494   else
495     ret = message;
496   return ret;
497 }
498
499 /* can only be called from private thread with read-lock held - takes ownership of @message */
500 static void
501 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
502                                                   GDBusMessage *message)
503 {
504   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
505     {
506       /* queue up */
507       g_queue_push_tail (worker->received_messages_while_frozen, message);
508     }
509   else
510     {
511       /* not frozen, nor anything in queue */
512       _g_dbus_worker_emit_message_received (worker, message);
513       g_object_unref (message);
514     }
515 }
516
517 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
518 static gboolean
519 unfreeze_in_idle_cb (gpointer user_data)
520 {
521   GDBusWorker *worker = user_data;
522   GDBusMessage *message;
523
524   g_mutex_lock (&worker->read_lock);
525   if (worker->frozen)
526     {
527       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
528         {
529           _g_dbus_worker_emit_message_received (worker, message);
530           g_object_unref (message);
531         }
532       worker->frozen = FALSE;
533     }
534   else
535     {
536       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
537     }
538   g_mutex_unlock (&worker->read_lock);
539   return FALSE;
540 }
541
542 /* can be called from any thread */
543 void
544 _g_dbus_worker_unfreeze (GDBusWorker *worker)
545 {
546   GSource *idle_source;
547   idle_source = g_idle_source_new ();
548   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
549   g_source_set_callback (idle_source,
550                          unfreeze_in_idle_cb,
551                          _g_dbus_worker_ref (worker),
552                          (GDestroyNotify) _g_dbus_worker_unref);
553   g_source_attach (idle_source, worker->shared_thread_data->context);
554   g_source_unref (idle_source);
555 }
556
557 /* ---------------------------------------------------------------------------------------------------- */
558
559 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
560
561 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
562 static void
563 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
564                            GAsyncResult  *res,
565                            gpointer       user_data)
566 {
567   GDBusWorker *worker = user_data;
568   GError *error;
569   gssize bytes_read;
570
571   g_mutex_lock (&worker->read_lock);
572
573   /* If already stopped, don't even process the reply */
574   if (g_atomic_int_get (&worker->stopped))
575     goto out;
576
577   error = NULL;
578   if (worker->socket == NULL)
579     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
580                                              res,
581                                              &error);
582   else
583     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
584                                                               res,
585                                                               &error);
586   if (worker->read_num_ancillary_messages > 0)
587     {
588       gint n;
589       for (n = 0; n < worker->read_num_ancillary_messages; n++)
590         {
591           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
592
593           if (FALSE)
594             {
595             }
596 #ifdef G_OS_UNIX
597           else if (G_IS_UNIX_FD_MESSAGE (control_message))
598             {
599               GUnixFDMessage *fd_message;
600               gint *fds;
601               gint num_fds;
602
603               fd_message = G_UNIX_FD_MESSAGE (control_message);
604               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
605               if (worker->read_fd_list == NULL)
606                 {
607                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
608                 }
609               else
610                 {
611                   gint n;
612                   for (n = 0; n < num_fds; n++)
613                     {
614                       /* TODO: really want a append_steal() */
615                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
616                       close (fds[n]);
617                     }
618                 }
619               g_free (fds);
620             }
621           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
622             {
623               /* do nothing */
624             }
625 #endif
626           else
627             {
628               if (error == NULL)
629                 {
630                   g_set_error (&error,
631                                G_IO_ERROR,
632                                G_IO_ERROR_FAILED,
633                                "Unexpected ancillary message of type %s received from peer",
634                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
635                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
636                   g_error_free (error);
637                   g_object_unref (control_message);
638                   n++;
639                   while (n < worker->read_num_ancillary_messages)
640                     g_object_unref (worker->read_ancillary_messages[n++]);
641                   g_free (worker->read_ancillary_messages);
642                   goto out;
643                 }
644             }
645           g_object_unref (control_message);
646         }
647       g_free (worker->read_ancillary_messages);
648     }
649
650   if (bytes_read == -1)
651     {
652       if (G_UNLIKELY (_g_dbus_debug_transport ()))
653         {
654           _g_dbus_debug_print_lock ();
655           g_print ("========================================================================\n"
656                    "GDBus-debug:Transport:\n"
657                    "  ---- READ ERROR on stream of type %s:\n"
658                    "  ---- %s %d: %s\n",
659                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
660                    g_quark_to_string (error->domain), error->code,
661                    error->message);
662           _g_dbus_debug_print_unlock ();
663         }
664
665       /* Every async read that uses this callback uses worker->cancellable
666        * as its GCancellable. worker->cancellable gets cancelled if and only
667        * if the GDBusConnection tells us to close (either via
668        * _g_dbus_worker_stop, which is called on last-unref, or directly),
669        * so a cancelled read must mean our connection was closed locally.
670        *
671        * If we're closing, other errors are possible - notably,
672        * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
673        * read in-flight. It seems sensible to treat all read errors during
674        * closing as an expected thing that doesn't trip exit-on-close.
675        *
676        * Because close_expected can't be set until we get into the worker
677        * thread, but the cancellable is signalled sooner (from another
678        * thread), we do still need to check the error.
679        */
680       if (worker->close_expected ||
681           g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
682         _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
683       else
684         _g_dbus_worker_emit_disconnected (worker, TRUE, error);
685
686       g_error_free (error);
687       goto out;
688     }
689
690 #if 0
691   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
692            (gint) bytes_read,
693            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
694            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
695            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
696                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
697            worker->stream,
698            worker);
699 #endif
700
701   /* TODO: hmm, hmm... */
702   if (bytes_read == 0)
703     {
704       g_set_error (&error,
705                    G_IO_ERROR,
706                    G_IO_ERROR_FAILED,
707                    "Underlying GIOStream returned 0 bytes on an async read");
708       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
709       g_error_free (error);
710       goto out;
711     }
712
713   read_message_print_transport_debug (bytes_read, worker);
714
715   worker->read_buffer_cur_size += bytes_read;
716   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
717     {
718       /* OK, got what we asked for! */
719       if (worker->read_buffer_bytes_wanted == 16)
720         {
721           gssize message_len;
722           /* OK, got the header - determine how many more bytes are needed */
723           error = NULL;
724           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
725                                                      16,
726                                                      &error);
727           if (message_len == -1)
728             {
729               g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
730               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
731               g_error_free (error);
732               goto out;
733             }
734
735           worker->read_buffer_bytes_wanted = message_len;
736           _g_dbus_worker_do_read_unlocked (worker);
737         }
738       else
739         {
740           GDBusMessage *message;
741           error = NULL;
742
743           /* TODO: use connection->priv->auth to decode the message */
744
745           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
746                                                   worker->read_buffer_cur_size,
747                                                   worker->capabilities,
748                                                   &error);
749           if (message == NULL)
750             {
751               gchar *s;
752               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
753               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
754                          "The error is: %s\n"
755                          "The payload is as follows:\n"
756                          "%s\n",
757                          worker->read_buffer_cur_size,
758                          error->message,
759                          s);
760               g_free (s);
761               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
762               g_error_free (error);
763               goto out;
764             }
765
766 #ifdef G_OS_UNIX
767           if (worker->read_fd_list != NULL)
768             {
769               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
770               g_object_unref (worker->read_fd_list);
771               worker->read_fd_list = NULL;
772             }
773 #endif
774
775           if (G_UNLIKELY (_g_dbus_debug_message ()))
776             {
777               gchar *s;
778               _g_dbus_debug_print_lock ();
779               g_print ("========================================================================\n"
780                        "GDBus-debug:Message:\n"
781                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
782                        worker->read_buffer_cur_size);
783               s = g_dbus_message_print (message, 2);
784               g_print ("%s", s);
785               g_free (s);
786               if (G_UNLIKELY (_g_dbus_debug_payload ()))
787                 {
788                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
789                   g_print ("%s\n", s);
790                   g_free (s);
791                 }
792               _g_dbus_debug_print_unlock ();
793             }
794
795           /* yay, got a message, go deliver it */
796           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
797
798           /* start reading another message! */
799           worker->read_buffer_bytes_wanted = 0;
800           worker->read_buffer_cur_size = 0;
801           _g_dbus_worker_do_read_unlocked (worker);
802         }
803     }
804   else
805     {
806       /* didn't get all the bytes we requested - so repeat the request... */
807       _g_dbus_worker_do_read_unlocked (worker);
808     }
809
810  out:
811   g_mutex_unlock (&worker->read_lock);
812
813   /* gives up the reference acquired when calling g_input_stream_read_async() */
814   _g_dbus_worker_unref (worker);
815 }
816
817 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
818 static void
819 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
820 {
821   /* Note that we do need to keep trying to read even if close_expected is
822    * true, because only failing a read causes us to signal 'closed'.
823    */
824
825   /* if bytes_wanted is zero, it means start reading a message */
826   if (worker->read_buffer_bytes_wanted == 0)
827     {
828       worker->read_buffer_cur_size = 0;
829       worker->read_buffer_bytes_wanted = 16;
830     }
831
832   /* ensure we have a (big enough) buffer */
833   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
834     {
835       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
836       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
837       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
838     }
839
840   if (worker->socket == NULL)
841     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
842                                worker->read_buffer + worker->read_buffer_cur_size,
843                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
844                                G_PRIORITY_DEFAULT,
845                                worker->cancellable,
846                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
847                                _g_dbus_worker_ref (worker));
848   else
849     {
850       worker->read_ancillary_messages = NULL;
851       worker->read_num_ancillary_messages = 0;
852       _g_socket_read_with_control_messages (worker->socket,
853                                             worker->read_buffer + worker->read_buffer_cur_size,
854                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
855                                             &worker->read_ancillary_messages,
856                                             &worker->read_num_ancillary_messages,
857                                             G_PRIORITY_DEFAULT,
858                                             worker->cancellable,
859                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
860                                             _g_dbus_worker_ref (worker));
861     }
862 }
863
864 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
865 static gboolean
866 _g_dbus_worker_do_initial_read (gpointer data)
867 {
868   GDBusWorker *worker = data;
869   g_mutex_lock (&worker->read_lock);
870   _g_dbus_worker_do_read_unlocked (worker);
871   g_mutex_unlock (&worker->read_lock);
872   return FALSE;
873 }
874
875 /* ---------------------------------------------------------------------------------------------------- */
876
877 struct _MessageToWriteData
878 {
879   GDBusWorker  *worker;
880   GDBusMessage *message;
881   gchar        *blob;
882   gsize         blob_size;
883
884   gsize               total_written;
885   GSimpleAsyncResult *simple;
886
887 };
888
889 static void
890 message_to_write_data_free (MessageToWriteData *data)
891 {
892   _g_dbus_worker_unref (data->worker);
893   if (data->message)
894     g_object_unref (data->message);
895   g_free (data->blob);
896   g_free (data);
897 }
898
899 /* ---------------------------------------------------------------------------------------------------- */
900
901 static void write_message_continue_writing (MessageToWriteData *data);
902
903 /* called in private thread shared by all GDBusConnection instances
904  *
905  * write-lock is not held on entry
906  * output_pending is true on entry
907  */
908 static void
909 write_message_async_cb (GObject      *source_object,
910                         GAsyncResult *res,
911                         gpointer      user_data)
912 {
913   MessageToWriteData *data = user_data;
914   GSimpleAsyncResult *simple;
915   gssize bytes_written;
916   GError *error;
917
918   /* Note: we can't access data->simple after calling g_async_result_complete () because the
919    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
920    */
921   simple = data->simple;
922
923   error = NULL;
924   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
925                                                 res,
926                                                 &error);
927   if (bytes_written == -1)
928     {
929       g_simple_async_result_take_error (simple, error);
930       g_simple_async_result_complete (simple);
931       g_object_unref (simple);
932       goto out;
933     }
934   g_assert (bytes_written > 0); /* zero is never returned */
935
936   write_message_print_transport_debug (bytes_written, data);
937
938   data->total_written += bytes_written;
939   g_assert (data->total_written <= data->blob_size);
940   if (data->total_written == data->blob_size)
941     {
942       g_simple_async_result_complete (simple);
943       g_object_unref (simple);
944       goto out;
945     }
946
947   write_message_continue_writing (data);
948
949  out:
950   ;
951 }
952
953 /* called in private thread shared by all GDBusConnection instances
954  *
955  * write-lock is not held on entry
956  * output_pending is true on entry
957  */
958 static gboolean
959 on_socket_ready (GSocket      *socket,
960                  GIOCondition  condition,
961                  gpointer      user_data)
962 {
963   MessageToWriteData *data = user_data;
964   write_message_continue_writing (data);
965   return FALSE; /* remove source */
966 }
967
968 /* called in private thread shared by all GDBusConnection instances
969  *
970  * write-lock is not held on entry
971  * output_pending is true on entry
972  */
973 static void
974 write_message_continue_writing (MessageToWriteData *data)
975 {
976   GOutputStream *ostream;
977   GSimpleAsyncResult *simple;
978 #ifdef G_OS_UNIX
979   GUnixFDList *fd_list;
980 #endif
981
982   /* Note: we can't access data->simple after calling g_async_result_complete () because the
983    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
984    */
985   simple = data->simple;
986
987   ostream = g_io_stream_get_output_stream (data->worker->stream);
988 #ifdef G_OS_UNIX
989   fd_list = g_dbus_message_get_unix_fd_list (data->message);
990 #endif
991
992   g_assert (!g_output_stream_has_pending (ostream));
993   g_assert_cmpint (data->total_written, <, data->blob_size);
994
995   if (FALSE)
996     {
997     }
998 #ifdef G_OS_UNIX
999   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
1000     {
1001       GOutputVector vector;
1002       GSocketControlMessage *control_message;
1003       gssize bytes_written;
1004       GError *error;
1005
1006       vector.buffer = data->blob;
1007       vector.size = data->blob_size;
1008
1009       control_message = NULL;
1010       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
1011         {
1012           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
1013             {
1014               g_simple_async_result_set_error (simple,
1015                                                G_IO_ERROR,
1016                                                G_IO_ERROR_FAILED,
1017                                                "Tried sending a file descriptor but remote peer does not support this capability");
1018               g_simple_async_result_complete (simple);
1019               g_object_unref (simple);
1020               goto out;
1021             }
1022           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
1023         }
1024
1025       error = NULL;
1026       bytes_written = g_socket_send_message (data->worker->socket,
1027                                              NULL, /* address */
1028                                              &vector,
1029                                              1,
1030                                              control_message != NULL ? &control_message : NULL,
1031                                              control_message != NULL ? 1 : 0,
1032                                              G_SOCKET_MSG_NONE,
1033                                              data->worker->cancellable,
1034                                              &error);
1035       if (control_message != NULL)
1036         g_object_unref (control_message);
1037
1038       if (bytes_written == -1)
1039         {
1040           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1041           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1042             {
1043               GSource *source;
1044               source = g_socket_create_source (data->worker->socket,
1045                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
1046                                                data->worker->cancellable);
1047               g_source_set_callback (source,
1048                                      (GSourceFunc) on_socket_ready,
1049                                      data,
1050                                      NULL); /* GDestroyNotify */
1051               g_source_attach (source, g_main_context_get_thread_default ());
1052               g_source_unref (source);
1053               g_error_free (error);
1054               goto out;
1055             }
1056           g_simple_async_result_take_error (simple, error);
1057           g_simple_async_result_complete (simple);
1058           g_object_unref (simple);
1059           goto out;
1060         }
1061       g_assert (bytes_written > 0); /* zero is never returned */
1062
1063       write_message_print_transport_debug (bytes_written, data);
1064
1065       data->total_written += bytes_written;
1066       g_assert (data->total_written <= data->blob_size);
1067       if (data->total_written == data->blob_size)
1068         {
1069           g_simple_async_result_complete (simple);
1070           g_object_unref (simple);
1071           goto out;
1072         }
1073
1074       write_message_continue_writing (data);
1075     }
1076 #endif
1077   else
1078     {
1079 #ifdef G_OS_UNIX
1080       if (fd_list != NULL)
1081         {
1082           g_simple_async_result_set_error (simple,
1083                                            G_IO_ERROR,
1084                                            G_IO_ERROR_FAILED,
1085                                            "Tried sending a file descriptor on unsupported stream of type %s",
1086                                            g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1087           g_simple_async_result_complete (simple);
1088           g_object_unref (simple);
1089           goto out;
1090         }
1091 #endif
1092
1093       g_output_stream_write_async (ostream,
1094                                    (const gchar *) data->blob + data->total_written,
1095                                    data->blob_size - data->total_written,
1096                                    G_PRIORITY_DEFAULT,
1097                                    data->worker->cancellable,
1098                                    write_message_async_cb,
1099                                    data);
1100     }
1101  out:
1102   ;
1103 }
1104
1105 /* called in private thread shared by all GDBusConnection instances
1106  *
1107  * write-lock is not held on entry
1108  * output_pending is true on entry
1109  */
1110 static void
1111 write_message_async (GDBusWorker         *worker,
1112                      MessageToWriteData  *data,
1113                      GAsyncReadyCallback  callback,
1114                      gpointer             user_data)
1115 {
1116   data->simple = g_simple_async_result_new (NULL,
1117                                             callback,
1118                                             user_data,
1119                                             write_message_async);
1120   data->total_written = 0;
1121   write_message_continue_writing (data);
1122 }
1123
1124 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1125 static gboolean
1126 write_message_finish (GAsyncResult   *res,
1127                       GError        **error)
1128 {
1129   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1130   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1131     return FALSE;
1132   else
1133     return TRUE;
1134 }
1135 /* ---------------------------------------------------------------------------------------------------- */
1136
1137 static void maybe_write_next_message (GDBusWorker *worker);
1138
1139 typedef struct
1140 {
1141   GDBusWorker *worker;
1142   GList *flushers;
1143 } FlushAsyncData;
1144
1145 static void
1146 flush_data_list_complete (const GList  *flushers,
1147                           const GError *error)
1148 {
1149   const GList *l;
1150
1151   for (l = flushers; l != NULL; l = l->next)
1152     {
1153       FlushData *f = l->data;
1154
1155       f->error = error != NULL ? g_error_copy (error) : NULL;
1156
1157       g_mutex_lock (&f->mutex);
1158       g_cond_signal (&f->cond);
1159       g_mutex_unlock (&f->mutex);
1160     }
1161 }
1162
1163 /* called in private thread shared by all GDBusConnection instances
1164  *
1165  * write-lock is not held on entry
1166  * output_pending is true on entry
1167  */
1168 static void
1169 ostream_flush_cb (GObject      *source_object,
1170                   GAsyncResult *res,
1171                   gpointer      user_data)
1172 {
1173   FlushAsyncData *data = user_data;
1174   GError *error;
1175
1176   error = NULL;
1177   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1178                                 res,
1179                                 &error);
1180
1181   if (error == NULL)
1182     {
1183       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1184         {
1185           _g_dbus_debug_print_lock ();
1186           g_print ("========================================================================\n"
1187                    "GDBus-debug:Transport:\n"
1188                    "  ---- FLUSHED stream of type %s\n",
1189                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1190           _g_dbus_debug_print_unlock ();
1191         }
1192     }
1193
1194   g_assert (data->flushers != NULL);
1195   flush_data_list_complete (data->flushers, error);
1196   g_list_free (data->flushers);
1197
1198   if (error != NULL)
1199     g_error_free (error);
1200
1201   /* Make sure we tell folks that we don't have additional
1202      flushes pending */
1203   g_mutex_lock (&data->worker->write_lock);
1204   g_assert (data->worker->output_pending);
1205   data->worker->output_pending = FALSE;
1206   g_mutex_unlock (&data->worker->write_lock);
1207
1208   /* OK, cool, finally kick off the next write */
1209   maybe_write_next_message (data->worker);
1210
1211   _g_dbus_worker_unref (data->worker);
1212   g_free (data);
1213 }
1214
1215 /* called in private thread shared by all GDBusConnection instances
1216  *
1217  * write-lock is not held on entry
1218  * output_pending is false on entry
1219  */
1220 static void
1221 message_written (GDBusWorker *worker,
1222                  MessageToWriteData *message_data)
1223 {
1224   GList *l;
1225   GList *ll;
1226   GList *flushers;
1227
1228   /* first log the fact that we wrote a message */
1229   if (G_UNLIKELY (_g_dbus_debug_message ()))
1230     {
1231       gchar *s;
1232       _g_dbus_debug_print_lock ();
1233       g_print ("========================================================================\n"
1234                "GDBus-debug:Message:\n"
1235                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1236                message_data->blob_size);
1237       s = g_dbus_message_print (message_data->message, 2);
1238       g_print ("%s", s);
1239       g_free (s);
1240       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1241         {
1242           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1243           g_print ("%s\n", s);
1244           g_free (s);
1245         }
1246       _g_dbus_debug_print_unlock ();
1247     }
1248
1249   /* then first wake up pending flushes and, if needed, flush the stream */
1250   flushers = NULL;
1251   g_mutex_lock (&worker->write_lock);
1252   worker->write_num_messages_written += 1;
1253   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1254     {
1255       FlushData *f = l->data;
1256       ll = l->next;
1257
1258       if (f->number_to_wait_for == worker->write_num_messages_written)
1259         {
1260           flushers = g_list_append (flushers, f);
1261           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1262         }
1263     }
1264   if (flushers != NULL)
1265     {
1266       g_assert (!worker->output_pending);
1267       worker->output_pending = TRUE;
1268     }
1269   g_mutex_unlock (&worker->write_lock);
1270
1271   if (flushers != NULL)
1272     {
1273       FlushAsyncData *data;
1274       data = g_new0 (FlushAsyncData, 1);
1275       data->worker = _g_dbus_worker_ref (worker);
1276       data->flushers = flushers;
1277       /* flush the stream before writing the next message */
1278       g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
1279                                    G_PRIORITY_DEFAULT,
1280                                    worker->cancellable,
1281                                    ostream_flush_cb,
1282                                    data);
1283     }
1284   else
1285     {
1286       /* kick off the next write! */
1287       maybe_write_next_message (worker);
1288     }
1289 }
1290
1291 /* called in private thread shared by all GDBusConnection instances
1292  *
1293  * write-lock is not held on entry
1294  * output_pending is true on entry
1295  */
1296 static void
1297 write_message_cb (GObject       *source_object,
1298                   GAsyncResult  *res,
1299                   gpointer       user_data)
1300 {
1301   MessageToWriteData *data = user_data;
1302   GError *error;
1303
1304   g_mutex_lock (&data->worker->write_lock);
1305   g_assert (data->worker->output_pending);
1306   data->worker->output_pending = FALSE;
1307   g_mutex_unlock (&data->worker->write_lock);
1308
1309   error = NULL;
1310   if (!write_message_finish (res, &error))
1311     {
1312       /* TODO: handle */
1313       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1314       g_error_free (error);
1315     }
1316
1317   /* this function will also kick of the next write (it might need to
1318    * flush so writing the next message might happen much later
1319    * e.g. async)
1320    */
1321   message_written (data->worker, data);
1322
1323   message_to_write_data_free (data);
1324 }
1325
1326 /* called in private thread shared by all GDBusConnection instances
1327  *
1328  * write-lock is not held on entry
1329  * output_pending is true on entry
1330  */
1331 static void
1332 iostream_close_cb (GObject      *source_object,
1333                    GAsyncResult *res,
1334                    gpointer      user_data)
1335 {
1336   GDBusWorker *worker = user_data;
1337   GError *error = NULL;
1338   GList *pending_close_attempts, *pending_flush_attempts;
1339   GQueue *send_queue;
1340
1341   g_io_stream_close_finish (worker->stream, res, &error);
1342
1343   g_mutex_lock (&worker->write_lock);
1344
1345   pending_close_attempts = worker->pending_close_attempts;
1346   worker->pending_close_attempts = NULL;
1347
1348   pending_flush_attempts = worker->write_pending_flushes;
1349   worker->write_pending_flushes = NULL;
1350
1351   send_queue = worker->write_queue;
1352   worker->write_queue = g_queue_new ();
1353
1354   g_assert (worker->output_pending);
1355   worker->output_pending = FALSE;
1356
1357   g_mutex_unlock (&worker->write_lock);
1358
1359   while (pending_close_attempts != NULL)
1360     {
1361       CloseData *close_data = pending_close_attempts->data;
1362
1363       pending_close_attempts = g_list_delete_link (pending_close_attempts,
1364                                                    pending_close_attempts);
1365
1366       if (close_data->result != NULL)
1367         {
1368           if (error != NULL)
1369             g_simple_async_result_set_from_error (close_data->result, error);
1370
1371           /* this must be in an idle because the result is likely to be
1372            * intended for another thread
1373            */
1374           g_simple_async_result_complete_in_idle (close_data->result);
1375         }
1376
1377       close_data_free (close_data);
1378     }
1379
1380   g_clear_error (&error);
1381
1382   /* all messages queued for sending are discarded */
1383   g_queue_foreach (send_queue, (GFunc) message_to_write_data_free, NULL);
1384   g_queue_free (send_queue);
1385
1386   /* all queued flushes fail */
1387   error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
1388                        _("Operation was cancelled"));
1389   flush_data_list_complete (pending_flush_attempts, error);
1390   g_list_free (pending_flush_attempts);
1391   g_clear_error (&error);
1392
1393   _g_dbus_worker_unref (worker);
1394 }
1395
1396 /* called in private thread shared by all GDBusConnection instances
1397  *
1398  * write-lock is not held on entry
1399  * output_pending must be false on entry
1400  */
1401 static void
1402 maybe_write_next_message (GDBusWorker *worker)
1403 {
1404   MessageToWriteData *data;
1405
1406  write_next:
1407   /* we mustn't try to write two things at once */
1408   g_assert (!worker->output_pending);
1409
1410   g_mutex_lock (&worker->write_lock);
1411
1412   /* if we want to close the connection, that takes precedence */
1413   if (worker->pending_close_attempts != NULL)
1414     {
1415       worker->close_expected = TRUE;
1416       worker->output_pending = TRUE;
1417
1418       g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
1419                                NULL, iostream_close_cb,
1420                                _g_dbus_worker_ref (worker));
1421       data = NULL;
1422     }
1423   else
1424     {
1425       data = g_queue_pop_head (worker->write_queue);
1426
1427       if (data != NULL)
1428         worker->output_pending = TRUE;
1429     }
1430
1431   g_mutex_unlock (&worker->write_lock);
1432
1433   /* Note that write_lock is only used for protecting the @write_queue
1434    * and @output_pending fields of the GDBusWorker struct ... which we
1435    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1436    *
1437    * Therefore, it's fine to drop it here when calling back into user
1438    * code and then writing the message out onto the GIOStream since this
1439    * function only runs on the worker thread.
1440    */
1441   if (data != NULL)
1442     {
1443       GDBusMessage *old_message;
1444       guchar *new_blob;
1445       gsize new_blob_size;
1446       GError *error;
1447
1448       old_message = data->message;
1449       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1450       if (data->message == old_message)
1451         {
1452           /* filters had no effect - do nothing */
1453         }
1454       else if (data->message == NULL)
1455         {
1456           /* filters dropped message */
1457           g_mutex_lock (&worker->write_lock);
1458           worker->output_pending = FALSE;
1459           g_mutex_unlock (&worker->write_lock);
1460           message_to_write_data_free (data);
1461           goto write_next;
1462         }
1463       else
1464         {
1465           /* filters altered the message -> reencode */
1466           error = NULL;
1467           new_blob = g_dbus_message_to_blob (data->message,
1468                                              &new_blob_size,
1469                                              worker->capabilities,
1470                                              &error);
1471           if (new_blob == NULL)
1472             {
1473               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1474                * the old message instead
1475                */
1476               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1477                          g_dbus_message_get_serial (data->message),
1478                          error->message);
1479               g_error_free (error);
1480             }
1481           else
1482             {
1483               g_free (data->blob);
1484               data->blob = (gchar *) new_blob;
1485               data->blob_size = new_blob_size;
1486             }
1487         }
1488
1489       write_message_async (worker,
1490                            data,
1491                            write_message_cb,
1492                            data);
1493     }
1494 }
1495
1496 /* called in private thread shared by all GDBusConnection instances
1497  *
1498  * write-lock is not held on entry
1499  * output_pending may be true or false
1500  */
1501 static gboolean
1502 write_message_in_idle_cb (gpointer user_data)
1503 {
1504   GDBusWorker *worker = user_data;
1505
1506   /* Because this is the worker thread, we can read this struct member
1507    * without holding the lock: no other thread ever modifies it.
1508    */
1509   if (!worker->output_pending)
1510     maybe_write_next_message (worker);
1511
1512   return FALSE;
1513 }
1514
1515 /*
1516  * @write_data: (transfer full) (allow-none):
1517  * @close_data: (transfer full) (allow-none):
1518  *
1519  * Can be called from any thread
1520  *
1521  * write_lock is not held on entry
1522  * output_pending may be true or false
1523  */
1524 static void
1525 schedule_write_in_worker_thread (GDBusWorker        *worker,
1526                                  MessageToWriteData *write_data,
1527                                  CloseData          *close_data)
1528 {
1529   g_mutex_lock (&worker->write_lock);
1530
1531   if (write_data != NULL)
1532     g_queue_push_tail (worker->write_queue, write_data);
1533
1534   if (close_data != NULL)
1535     worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
1536                                                      close_data);
1537
1538   if (!worker->output_pending)
1539     {
1540       GSource *idle_source;
1541       idle_source = g_idle_source_new ();
1542       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1543       g_source_set_callback (idle_source,
1544                              write_message_in_idle_cb,
1545                              _g_dbus_worker_ref (worker),
1546                              (GDestroyNotify) _g_dbus_worker_unref);
1547       g_source_attach (idle_source, worker->shared_thread_data->context);
1548       g_source_unref (idle_source);
1549     }
1550
1551   g_mutex_unlock (&worker->write_lock);
1552 }
1553
1554 /* ---------------------------------------------------------------------------------------------------- */
1555
1556 /* can be called from any thread - steals blob
1557  *
1558  * write_lock is not held on entry
1559  * output_pending may be true or false
1560  */
1561 void
1562 _g_dbus_worker_send_message (GDBusWorker    *worker,
1563                              GDBusMessage   *message,
1564                              gchar          *blob,
1565                              gsize           blob_len)
1566 {
1567   MessageToWriteData *data;
1568
1569   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1570   g_return_if_fail (blob != NULL);
1571   g_return_if_fail (blob_len > 16);
1572
1573   data = g_new0 (MessageToWriteData, 1);
1574   data->worker = _g_dbus_worker_ref (worker);
1575   data->message = g_object_ref (message);
1576   data->blob = blob; /* steal! */
1577   data->blob_size = blob_len;
1578
1579   schedule_write_in_worker_thread (worker, data, NULL);
1580 }
1581
1582 /* ---------------------------------------------------------------------------------------------------- */
1583
1584 GDBusWorker *
1585 _g_dbus_worker_new (GIOStream                              *stream,
1586                     GDBusCapabilityFlags                    capabilities,
1587                     gboolean                                initially_frozen,
1588                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1589                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1590                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1591                     gpointer                                user_data)
1592 {
1593   GDBusWorker *worker;
1594   GSource *idle_source;
1595
1596   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1597   g_return_val_if_fail (message_received_callback != NULL, NULL);
1598   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1599   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1600
1601   worker = g_new0 (GDBusWorker, 1);
1602   worker->ref_count = 1;
1603
1604   g_mutex_init (&worker->read_lock);
1605   worker->message_received_callback = message_received_callback;
1606   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1607   worker->disconnected_callback = disconnected_callback;
1608   worker->user_data = user_data;
1609   worker->stream = g_object_ref (stream);
1610   worker->capabilities = capabilities;
1611   worker->cancellable = g_cancellable_new ();
1612   worker->output_pending = FALSE;
1613
1614   worker->frozen = initially_frozen;
1615   worker->received_messages_while_frozen = g_queue_new ();
1616
1617   g_mutex_init (&worker->write_lock);
1618   worker->write_queue = g_queue_new ();
1619
1620   if (G_IS_SOCKET_CONNECTION (worker->stream))
1621     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1622
1623   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1624
1625   /* begin reading */
1626   idle_source = g_idle_source_new ();
1627   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1628   g_source_set_callback (idle_source,
1629                          _g_dbus_worker_do_initial_read,
1630                          _g_dbus_worker_ref (worker),
1631                          (GDestroyNotify) _g_dbus_worker_unref);
1632   g_source_attach (idle_source, worker->shared_thread_data->context);
1633   g_source_unref (idle_source);
1634
1635   return worker;
1636 }
1637
1638 /* ---------------------------------------------------------------------------------------------------- */
1639
1640 /* can be called from any thread
1641  *
1642  * write_lock is not held on entry
1643  * output_pending may be true or false
1644  */
1645 void
1646 _g_dbus_worker_close (GDBusWorker         *worker,
1647                       GCancellable        *cancellable,
1648                       GSimpleAsyncResult  *result)
1649 {
1650   CloseData *close_data;
1651
1652   close_data = g_slice_new0 (CloseData);
1653   close_data->worker = _g_dbus_worker_ref (worker);
1654   close_data->cancellable =
1655       (cancellable == NULL ? NULL : g_object_ref (cancellable));
1656   close_data->result = (result == NULL ? NULL : g_object_ref (result));
1657
1658   /* Don't set worker->close_expected here - we're in the wrong thread.
1659    * It'll be set before the actual close happens.
1660    */
1661   g_cancellable_cancel (worker->cancellable);
1662   schedule_write_in_worker_thread (worker, NULL, close_data);
1663 }
1664
1665 /* This can be called from any thread - frees worker. Note that
1666  * callbacks might still happen if called from another thread than the
1667  * worker - use your own synchronization primitive in the callbacks.
1668  *
1669  * write_lock is not held on entry
1670  * output_pending may be true or false
1671  */
1672 void
1673 _g_dbus_worker_stop (GDBusWorker *worker)
1674 {
1675   g_atomic_int_set (&worker->stopped, TRUE);
1676
1677   /* Cancel any pending operations and schedule a close of the underlying I/O
1678    * stream in the worker thread
1679    */
1680   _g_dbus_worker_close (worker, NULL, NULL);
1681
1682   /* _g_dbus_worker_close holds a ref until after an idle in the the worker
1683    * thread has run, so we no longer need to unref in an idle like in
1684    * commit 322e25b535
1685    */
1686   _g_dbus_worker_unref (worker);
1687 }
1688
1689 /* ---------------------------------------------------------------------------------------------------- */
1690
1691 /* can be called from any thread (except the worker thread) - blocks
1692  * calling thread until all queued outgoing messages are written and
1693  * the transport has been flushed
1694  *
1695  * write_lock is not held on entry
1696  * output_pending may be true or false
1697  */
1698 gboolean
1699 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1700                            GCancellable   *cancellable,
1701                            GError        **error)
1702 {
1703   gboolean ret;
1704   FlushData *data;
1705
1706   data = NULL;
1707   ret = TRUE;
1708
1709   /* if the queue is empty, there's nothing to wait for */
1710   g_mutex_lock (&worker->write_lock);
1711   if (g_queue_get_length (worker->write_queue) > 0)
1712     {
1713       data = g_new0 (FlushData, 1);
1714       g_mutex_init (&data->mutex);
1715       g_cond_init (&data->cond);
1716       data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
1717       g_mutex_lock (&data->mutex);
1718       worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
1719     }
1720   g_mutex_unlock (&worker->write_lock);
1721
1722   if (data != NULL)
1723     {
1724       g_cond_wait (&data->cond, &data->mutex);
1725       g_mutex_unlock (&data->mutex);
1726
1727       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1728       g_cond_clear (&data->cond);
1729       g_mutex_clear (&data->mutex);
1730       if (data->error != NULL)
1731         {
1732           ret = FALSE;
1733           g_propagate_error (error, data->error);
1734         }
1735       g_free (data);
1736     }
1737
1738   return ret;
1739 }
1740
1741 /* ---------------------------------------------------------------------------------------------------- */
1742
1743 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1744 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1745 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1746 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1747 #define G_DBUS_DEBUG_CALL           (1<<4)
1748 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1749 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1750 #define G_DBUS_DEBUG_RETURN         (1<<7)
1751 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1752 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1753
1754 static gint _gdbus_debug_flags = 0;
1755
1756 gboolean
1757 _g_dbus_debug_authentication (void)
1758 {
1759   _g_dbus_initialize ();
1760   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1761 }
1762
1763 gboolean
1764 _g_dbus_debug_transport (void)
1765 {
1766   _g_dbus_initialize ();
1767   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1768 }
1769
1770 gboolean
1771 _g_dbus_debug_message (void)
1772 {
1773   _g_dbus_initialize ();
1774   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1775 }
1776
1777 gboolean
1778 _g_dbus_debug_payload (void)
1779 {
1780   _g_dbus_initialize ();
1781   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1782 }
1783
1784 gboolean
1785 _g_dbus_debug_call (void)
1786 {
1787   _g_dbus_initialize ();
1788   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1789 }
1790
1791 gboolean
1792 _g_dbus_debug_signal (void)
1793 {
1794   _g_dbus_initialize ();
1795   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1796 }
1797
1798 gboolean
1799 _g_dbus_debug_incoming (void)
1800 {
1801   _g_dbus_initialize ();
1802   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1803 }
1804
1805 gboolean
1806 _g_dbus_debug_return (void)
1807 {
1808   _g_dbus_initialize ();
1809   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1810 }
1811
1812 gboolean
1813 _g_dbus_debug_emission (void)
1814 {
1815   _g_dbus_initialize ();
1816   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1817 }
1818
1819 gboolean
1820 _g_dbus_debug_address (void)
1821 {
1822   _g_dbus_initialize ();
1823   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1824 }
1825
1826 G_LOCK_DEFINE_STATIC (print_lock);
1827
1828 void
1829 _g_dbus_debug_print_lock (void)
1830 {
1831   G_LOCK (print_lock);
1832 }
1833
1834 void
1835 _g_dbus_debug_print_unlock (void)
1836 {
1837   G_UNLOCK (print_lock);
1838 }
1839
1840 /*
1841  * _g_dbus_initialize:
1842  *
1843  * Does various one-time init things such as
1844  *
1845  *  - registering the G_DBUS_ERROR error domain
1846  *  - parses the G_DBUS_DEBUG environment variable
1847  */
1848 void
1849 _g_dbus_initialize (void)
1850 {
1851   static volatile gsize initialized = 0;
1852
1853   if (g_once_init_enter (&initialized))
1854     {
1855       volatile GQuark g_dbus_error_domain;
1856       const gchar *debug;
1857
1858       g_dbus_error_domain = G_DBUS_ERROR;
1859       (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
1860
1861       debug = g_getenv ("G_DBUS_DEBUG");
1862       if (debug != NULL)
1863         {
1864           const GDebugKey keys[] = {
1865             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1866             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
1867             { "message",        G_DBUS_DEBUG_MESSAGE        },
1868             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
1869             { "call",           G_DBUS_DEBUG_CALL           },
1870             { "signal",         G_DBUS_DEBUG_SIGNAL         },
1871             { "incoming",       G_DBUS_DEBUG_INCOMING       },
1872             { "return",         G_DBUS_DEBUG_RETURN         },
1873             { "emission",       G_DBUS_DEBUG_EMISSION       },
1874             { "address",        G_DBUS_DEBUG_ADDRESS        }
1875           };
1876
1877           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1878           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1879             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1880         }
1881
1882       g_once_init_leave (&initialized, 1);
1883     }
1884 }
1885
1886 /* ---------------------------------------------------------------------------------------------------- */
1887
1888 GVariantType *
1889 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1890 {
1891   const GVariantType *arg_types[256];
1892   guint n;
1893
1894   if (args)
1895     for (n = 0; args[n] != NULL; n++)
1896       {
1897         /* DBus places a hard limit of 255 on signature length.
1898          * therefore number of args must be less than 256.
1899          */
1900         g_assert (n < 256);
1901
1902         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1903
1904         if G_UNLIKELY (arg_types[n] == NULL)
1905           return NULL;
1906       }
1907   else
1908     n = 0;
1909
1910   return g_variant_type_new_tuple (arg_types, n);
1911 }
1912
1913 /* ---------------------------------------------------------------------------------------------------- */
1914
1915 #ifdef G_OS_WIN32
1916
1917 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1918
1919 gchar *
1920 _g_dbus_win32_get_user_sid (void)
1921 {
1922   HANDLE h;
1923   TOKEN_USER *user;
1924   DWORD token_information_len;
1925   PSID psid;
1926   gchar *sid;
1927   gchar *ret;
1928
1929   ret = NULL;
1930   user = NULL;
1931   h = INVALID_HANDLE_VALUE;
1932
1933   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
1934     {
1935       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
1936       goto out;
1937     }
1938
1939   /* Get length of buffer */
1940   token_information_len = 0;
1941   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
1942     {
1943       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
1944         {
1945           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1946           goto out;
1947         }
1948     }
1949   user = g_malloc (token_information_len);
1950   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
1951     {
1952       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1953       goto out;
1954     }
1955
1956   psid = user->User.Sid;
1957   if (!IsValidSid (psid))
1958     {
1959       g_warning ("Invalid SID");
1960       goto out;
1961     }
1962
1963   if (!ConvertSidToStringSidA (psid, &sid))
1964     {
1965       g_warning ("Invalid SID");
1966       goto out;
1967     }
1968
1969   ret = g_strdup (sid);
1970   LocalFree (sid);
1971
1972 out:
1973   g_free (user);
1974   if (h != INVALID_HANDLE_VALUE)
1975     CloseHandle (h);
1976   return ret;
1977 }
1978 #endif
1979
1980 /* ---------------------------------------------------------------------------------------------------- */
1981
1982 gchar *
1983 _g_dbus_get_machine_id (GError **error)
1984 {
1985   gchar *ret;
1986   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
1987   ret = NULL;
1988   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
1989                             &ret,
1990                             NULL,
1991                             error))
1992     {
1993       g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: "));
1994     }
1995   else
1996     {
1997       /* TODO: validate value */
1998       g_strstrip (ret);
1999     }
2000   return ret;
2001 }
2002
2003 /* ---------------------------------------------------------------------------------------------------- */
2004
2005 gchar *
2006 _g_dbus_enum_to_string (GType enum_type, gint value)
2007 {
2008   gchar *ret;
2009   GEnumClass *klass;
2010   GEnumValue *enum_value;
2011
2012   klass = g_type_class_ref (enum_type);
2013   enum_value = g_enum_get_value (klass, value);
2014   if (enum_value != NULL)
2015     ret = g_strdup (enum_value->value_nick);
2016   else
2017     ret = g_strdup_printf ("unknown (value %d)", value);
2018   g_type_class_unref (klass);
2019   return ret;
2020 }
2021
2022 /* ---------------------------------------------------------------------------------------------------- */
2023
2024 static void
2025 write_message_print_transport_debug (gssize bytes_written,
2026                                      MessageToWriteData *data)
2027 {
2028   if (G_LIKELY (!_g_dbus_debug_transport ()))
2029     goto out;
2030
2031   _g_dbus_debug_print_lock ();
2032   g_print ("========================================================================\n"
2033            "GDBus-debug:Transport:\n"
2034            "  >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
2035            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
2036            bytes_written,
2037            g_dbus_message_get_serial (data->message),
2038            data->blob_size,
2039            data->total_written,
2040            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
2041   _g_dbus_debug_print_unlock ();
2042  out:
2043   ;
2044 }
2045
2046 /* ---------------------------------------------------------------------------------------------------- */
2047
2048 static void
2049 read_message_print_transport_debug (gssize bytes_read,
2050                                     GDBusWorker *worker)
2051 {
2052   gsize size;
2053   gint32 serial;
2054   gint32 message_length;
2055
2056   if (G_LIKELY (!_g_dbus_debug_transport ()))
2057     goto out;
2058
2059   size = bytes_read + worker->read_buffer_cur_size;
2060   serial = 0;
2061   message_length = 0;
2062   if (size >= 16)
2063     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
2064   if (size >= 1)
2065     {
2066       switch (worker->read_buffer[0])
2067         {
2068         case 'l':
2069           if (size >= 12)
2070             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
2071           break;
2072         case 'B':
2073           if (size >= 12)
2074             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
2075           break;
2076         default:
2077           /* an error will be set elsewhere if this happens */
2078           goto out;
2079         }
2080     }
2081
2082     _g_dbus_debug_print_lock ();
2083   g_print ("========================================================================\n"
2084            "GDBus-debug:Transport:\n"
2085            "  <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
2086            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
2087            bytes_read,
2088            serial,
2089            message_length,
2090            worker->read_buffer_cur_size,
2091            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
2092   _g_dbus_debug_print_unlock ();
2093  out:
2094   ;
2095 }
2096
2097 /* ---------------------------------------------------------------------------------------------------- */
2098
2099 gboolean
2100 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
2101                                      GValue                *return_accu,
2102                                      const GValue          *handler_return,
2103                                      gpointer               dummy)
2104 {
2105   gboolean continue_emission;
2106   gboolean signal_return;
2107
2108   signal_return = g_value_get_boolean (handler_return);
2109   g_value_set_boolean (return_accu, signal_return);
2110   continue_emission = signal_return;
2111
2112   return continue_emission;
2113 }