8bc6e5bf8f5511b279b50ec03a75327e3895328c
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: David Zeuthen <davidz@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <stdlib.h>
26 #include <string.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30
31 #include "giotypes.h"
32 #include "gsocket.h"
33 #include "gdbusprivate.h"
34 #include "gdbusmessage.h"
35 #include "gdbuserror.h"
36 #include "gdbusintrospection.h"
37 #include "gasyncresult.h"
38 #include "gsimpleasyncresult.h"
39 #include "ginputstream.h"
40 #include "gmemoryinputstream.h"
41 #include "giostream.h"
42 #include "gsocketcontrolmessage.h"
43 #include "gsocketconnection.h"
44 #include "gsocketoutputstream.h"
45
46 #ifdef G_OS_UNIX
47 #include "gunixfdmessage.h"
48 #include "gunixconnection.h"
49 #include "gunixcredentialsmessage.h"
50 #endif
51
52 #ifdef G_OS_WIN32
53 #include <windows.h>
54 #endif
55
56 #include "glibintl.h"
57
58 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
59
60 /* ---------------------------------------------------------------------------------------------------- */
61
62 gchar *
63 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
64 {
65  guint n, m;
66  GString *ret;
67
68  ret = g_string_new (NULL);
69
70  for (n = 0; n < len; n += 16)
71    {
72      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
73
74      for (m = n; m < n + 16; m++)
75        {
76          if (m > n && (m%4) == 0)
77            g_string_append_c (ret, ' ');
78          if (m < len)
79            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
80          else
81            g_string_append (ret, "   ");
82        }
83
84      g_string_append (ret, "   ");
85
86      for (m = n; m < len && m < n + 16; m++)
87        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
88
89      g_string_append_c (ret, '\n');
90    }
91
92  return g_string_free (ret, FALSE);
93 }
94
95 /* ---------------------------------------------------------------------------------------------------- */
96
97 /* Unfortunately ancillary messages are discarded when reading from a
98  * socket using the GSocketInputStream abstraction. So we provide a
99  * very GInputStream-ish API that uses GSocket in this case (very
100  * similar to GSocketInputStream).
101  */
102
103 typedef struct
104 {
105   GSocket *socket;
106   GCancellable *cancellable;
107
108   void *buffer;
109   gsize count;
110
111   GSocketControlMessage ***messages;
112   gint *num_messages;
113
114   GSimpleAsyncResult *simple;
115
116   gboolean from_mainloop;
117 } ReadWithControlData;
118
119 static void
120 read_with_control_data_free (ReadWithControlData *data)
121 {
122   g_object_unref (data->socket);
123   if (data->cancellable != NULL)
124     g_object_unref (data->cancellable);
125   g_object_unref (data->simple);
126   g_free (data);
127 }
128
129 static gboolean
130 _g_socket_read_with_control_messages_ready (GSocket      *socket,
131                                             GIOCondition  condition,
132                                             gpointer      user_data)
133 {
134   ReadWithControlData *data = user_data;
135   GError *error;
136   gssize result;
137   GInputVector vector;
138
139   error = NULL;
140   vector.buffer = data->buffer;
141   vector.size = data->count;
142   result = g_socket_receive_message (data->socket,
143                                      NULL, /* address */
144                                      &vector,
145                                      1,
146                                      data->messages,
147                                      data->num_messages,
148                                      NULL,
149                                      data->cancellable,
150                                      &error);
151   if (result >= 0)
152     {
153       g_simple_async_result_set_op_res_gssize (data->simple, result);
154     }
155   else
156     {
157       g_assert (error != NULL);
158       g_simple_async_result_take_error (data->simple, error);
159     }
160
161   if (data->from_mainloop)
162     g_simple_async_result_complete (data->simple);
163   else
164     g_simple_async_result_complete_in_idle (data->simple);
165
166   return FALSE;
167 }
168
169 static void
170 _g_socket_read_with_control_messages (GSocket                 *socket,
171                                       void                    *buffer,
172                                       gsize                    count,
173                                       GSocketControlMessage ***messages,
174                                       gint                    *num_messages,
175                                       gint                     io_priority,
176                                       GCancellable            *cancellable,
177                                       GAsyncReadyCallback      callback,
178                                       gpointer                 user_data)
179 {
180   ReadWithControlData *data;
181
182   data = g_new0 (ReadWithControlData, 1);
183   data->socket = g_object_ref (socket);
184   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
185   data->buffer = buffer;
186   data->count = count;
187   data->messages = messages;
188   data->num_messages = num_messages;
189
190   data->simple = g_simple_async_result_new (G_OBJECT (socket),
191                                             callback,
192                                             user_data,
193                                             _g_socket_read_with_control_messages);
194
195   if (!g_socket_condition_check (socket, G_IO_IN))
196     {
197       GSource *source;
198       data->from_mainloop = TRUE;
199       source = g_socket_create_source (data->socket,
200                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
201                                        cancellable);
202       g_source_set_callback (source,
203                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
204                              data,
205                              (GDestroyNotify) read_with_control_data_free);
206       g_source_attach (source, g_main_context_get_thread_default ());
207       g_source_unref (source);
208     }
209   else
210     {
211       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
212       read_with_control_data_free (data);
213     }
214 }
215
216 static gssize
217 _g_socket_read_with_control_messages_finish (GSocket       *socket,
218                                              GAsyncResult  *result,
219                                              GError       **error)
220 {
221   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
222
223   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
224   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
225
226   if (g_simple_async_result_propagate_error (simple, error))
227       return -1;
228   else
229     return g_simple_async_result_get_op_res_gssize (simple);
230 }
231
232 /* ---------------------------------------------------------------------------------------------------- */
233
234 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
235
236 static GPtrArray *ensured_classes = NULL;
237
238 static void
239 ensure_type (GType gtype)
240 {
241   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
242 }
243
244 static void
245 release_required_types (void)
246 {
247   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
248   g_ptr_array_unref (ensured_classes);
249   ensured_classes = NULL;
250 }
251
252 static void
253 ensure_required_types (void)
254 {
255   g_assert (ensured_classes == NULL);
256   ensured_classes = g_ptr_array_new ();
257   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
258   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
259 }
260 /* ---------------------------------------------------------------------------------------------------- */
261
262 typedef struct
263 {
264   volatile gint refcount;
265   GThread *thread;
266   GMainContext *context;
267   GMainLoop *loop;
268 } SharedThreadData;
269
270 static gpointer
271 gdbus_shared_thread_func (gpointer user_data)
272 {
273   SharedThreadData *data = user_data;
274
275   g_main_context_push_thread_default (data->context);
276   g_main_loop_run (data->loop);
277   g_main_context_pop_thread_default (data->context);
278
279   release_required_types ();
280
281   return NULL;
282 }
283
284 /* ---------------------------------------------------------------------------------------------------- */
285
286 static SharedThreadData *
287 _g_dbus_shared_thread_ref (void)
288 {
289   static gsize shared_thread_data = 0;
290   GError *error = NULL;
291   SharedThreadData *ret;
292
293   if (g_once_init_enter (&shared_thread_data))
294     {
295       SharedThreadData *data;
296
297       /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
298       ensure_required_types ();
299
300       data = g_new0 (SharedThreadData, 1);
301       data->refcount = 0;
302       
303       data->context = g_main_context_new ();
304       data->loop = g_main_loop_new (data->context, FALSE);
305       data->thread = g_thread_create (gdbus_shared_thread_func,
306                                       data,
307                                       TRUE,
308                                       &error);
309       g_assert_no_error (error);
310       /* We can cast between gsize and gpointer safely */
311       g_once_init_leave (&shared_thread_data, (gsize) data);
312     }
313
314   ret = (SharedThreadData*) shared_thread_data;
315   g_atomic_int_inc (&ret->refcount);
316   return ret;
317 }
318
319 static void
320 _g_dbus_shared_thread_unref (SharedThreadData *data)
321 {
322   /* TODO: actually destroy the shared thread here */
323 #if 0
324   g_assert (data != NULL);
325   if (g_atomic_int_dec_and_test (&data->refcount))
326     {
327       g_main_loop_quit (data->loop);
328       //g_thread_join (data->thread);
329       g_main_loop_unref (data->loop);
330       g_main_context_unref (data->context);
331     }
332 #endif
333 }
334
335 /* ---------------------------------------------------------------------------------------------------- */
336
337 struct GDBusWorker
338 {
339   volatile gint                       ref_count;
340
341   SharedThreadData                   *shared_thread_data;
342
343   gboolean                            stopped;
344
345   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
346    * only affects messages received from the other peer (since GDBusServer is the
347    * only user) - we might want it to affect messages sent to the other peer too?
348    */
349   gboolean                            frozen;
350   GDBusCapabilityFlags                capabilities;
351   GQueue                             *received_messages_while_frozen;
352
353   GIOStream                          *stream;
354   GCancellable                       *cancellable;
355   GDBusWorkerMessageReceivedCallback  message_received_callback;
356   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
357   GDBusWorkerDisconnectedCallback     disconnected_callback;
358   gpointer                            user_data;
359
360   /* if not NULL, stream is GSocketConnection */
361   GSocket *socket;
362
363   /* used for reading */
364   GMutex                             *read_lock;
365   gchar                              *read_buffer;
366   gsize                               read_buffer_allocated_size;
367   gsize                               read_buffer_cur_size;
368   gsize                               read_buffer_bytes_wanted;
369   GUnixFDList                        *read_fd_list;
370   GSocketControlMessage             **read_ancillary_messages;
371   gint                                read_num_ancillary_messages;
372
373   /* TRUE if an async write, flush or close is pending.
374    * Only the worker thread may change its value, and only with the write_lock.
375    * Other threads may read its value when holding the write_lock.
376    * The worker thread may read its value at any time.
377    */
378   gboolean                            output_pending;
379   /* used for writing */
380   GMutex                             *write_lock;
381   /* queue of MessageToWriteData, protected by write_lock */
382   GQueue                             *write_queue;
383   /* protected by write_lock */
384   guint64                             write_num_messages_written;
385   /* list of FlushData, protected by write_lock */
386   GList                              *write_pending_flushes;
387   /* list of CloseData, protected by write_lock */
388   GList                              *pending_close_attempts;
389 };
390
391 static void _g_dbus_worker_unref (GDBusWorker *worker);
392
393 /* ---------------------------------------------------------------------------------------------------- */
394
395 typedef struct
396 {
397   GMutex *mutex;
398   GCond *cond;
399   guint64 number_to_wait_for;
400   GError *error;
401 } FlushData;
402
403 struct _MessageToWriteData ;
404 typedef struct _MessageToWriteData MessageToWriteData;
405
406 static void message_to_write_data_free (MessageToWriteData *data);
407
408 static void read_message_print_transport_debug (gssize bytes_read,
409                                                 GDBusWorker *worker);
410
411 static void write_message_print_transport_debug (gssize bytes_written,
412                                                  MessageToWriteData *data);
413
414 typedef struct {
415     GDBusWorker *worker;
416     GCancellable *cancellable;
417     GSimpleAsyncResult *result;
418 } CloseData;
419
420 static void close_data_free (CloseData *close_data)
421 {
422   if (close_data->cancellable != NULL)
423     g_object_unref (close_data->cancellable);
424
425   if (close_data->result != NULL)
426     g_object_unref (close_data->result);
427
428   _g_dbus_worker_unref (close_data->worker);
429   g_slice_free (CloseData, close_data);
430 }
431
432 /* ---------------------------------------------------------------------------------------------------- */
433
434 static GDBusWorker *
435 _g_dbus_worker_ref (GDBusWorker *worker)
436 {
437   g_atomic_int_inc (&worker->ref_count);
438   return worker;
439 }
440
441 static void
442 _g_dbus_worker_unref (GDBusWorker *worker)
443 {
444   if (g_atomic_int_dec_and_test (&worker->ref_count))
445     {
446       g_assert (worker->write_pending_flushes == NULL);
447
448       _g_dbus_shared_thread_unref (worker->shared_thread_data);
449
450       g_object_unref (worker->stream);
451
452       g_mutex_free (worker->read_lock);
453       g_object_unref (worker->cancellable);
454       if (worker->read_fd_list != NULL)
455         g_object_unref (worker->read_fd_list);
456
457       g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
458       g_queue_free (worker->received_messages_while_frozen);
459
460       g_mutex_free (worker->write_lock);
461       g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
462       g_queue_free (worker->write_queue);
463
464       g_free (worker->read_buffer);
465
466       g_free (worker);
467     }
468 }
469
470 static void
471 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
472                                   gboolean      remote_peer_vanished,
473                                   GError       *error)
474 {
475   if (!worker->stopped)
476     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
477 }
478
479 static void
480 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
481                                       GDBusMessage *message)
482 {
483   if (!worker->stopped)
484     worker->message_received_callback (worker, message, worker->user_data);
485 }
486
487 static GDBusMessage *
488 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
489                                               GDBusMessage *message)
490 {
491   GDBusMessage *ret;
492   if (!worker->stopped)
493     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
494   else
495     ret = message;
496   return ret;
497 }
498
499 /* can only be called from private thread with read-lock held - takes ownership of @message */
500 static void
501 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
502                                                   GDBusMessage *message)
503 {
504   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
505     {
506       /* queue up */
507       g_queue_push_tail (worker->received_messages_while_frozen, message);
508     }
509   else
510     {
511       /* not frozen, nor anything in queue */
512       _g_dbus_worker_emit_message_received (worker, message);
513       g_object_unref (message);
514     }
515 }
516
517 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
518 static gboolean
519 unfreeze_in_idle_cb (gpointer user_data)
520 {
521   GDBusWorker *worker = user_data;
522   GDBusMessage *message;
523
524   g_mutex_lock (worker->read_lock);
525   if (worker->frozen)
526     {
527       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
528         {
529           _g_dbus_worker_emit_message_received (worker, message);
530           g_object_unref (message);
531         }
532       worker->frozen = FALSE;
533     }
534   else
535     {
536       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
537     }
538   g_mutex_unlock (worker->read_lock);
539   return FALSE;
540 }
541
542 /* can be called from any thread */
543 void
544 _g_dbus_worker_unfreeze (GDBusWorker *worker)
545 {
546   GSource *idle_source;
547   idle_source = g_idle_source_new ();
548   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
549   g_source_set_callback (idle_source,
550                          unfreeze_in_idle_cb,
551                          _g_dbus_worker_ref (worker),
552                          (GDestroyNotify) _g_dbus_worker_unref);
553   g_source_attach (idle_source, worker->shared_thread_data->context);
554   g_source_unref (idle_source);
555 }
556
557 /* ---------------------------------------------------------------------------------------------------- */
558
559 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
560
561 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
562 static void
563 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
564                            GAsyncResult  *res,
565                            gpointer       user_data)
566 {
567   GDBusWorker *worker = user_data;
568   GError *error;
569   gssize bytes_read;
570
571   g_mutex_lock (worker->read_lock);
572
573   /* If already stopped, don't even process the reply */
574   if (worker->stopped)
575     goto out;
576
577   error = NULL;
578   if (worker->socket == NULL)
579     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
580                                              res,
581                                              &error);
582   else
583     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
584                                                               res,
585                                                               &error);
586   if (worker->read_num_ancillary_messages > 0)
587     {
588       gint n;
589       for (n = 0; n < worker->read_num_ancillary_messages; n++)
590         {
591           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
592
593           if (FALSE)
594             {
595             }
596 #ifdef G_OS_UNIX
597           else if (G_IS_UNIX_FD_MESSAGE (control_message))
598             {
599               GUnixFDMessage *fd_message;
600               gint *fds;
601               gint num_fds;
602
603               fd_message = G_UNIX_FD_MESSAGE (control_message);
604               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
605               if (worker->read_fd_list == NULL)
606                 {
607                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
608                 }
609               else
610                 {
611                   gint n;
612                   for (n = 0; n < num_fds; n++)
613                     {
614                       /* TODO: really want a append_steal() */
615                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
616                       close (fds[n]);
617                     }
618                 }
619               g_free (fds);
620             }
621           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
622             {
623               /* do nothing */
624             }
625 #endif
626           else
627             {
628               if (error == NULL)
629                 {
630                   g_set_error (&error,
631                                G_IO_ERROR,
632                                G_IO_ERROR_FAILED,
633                                "Unexpected ancillary message of type %s received from peer",
634                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
635                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
636                   g_error_free (error);
637                   g_object_unref (control_message);
638                   n++;
639                   while (n < worker->read_num_ancillary_messages)
640                     g_object_unref (worker->read_ancillary_messages[n++]);
641                   g_free (worker->read_ancillary_messages);
642                   goto out;
643                 }
644             }
645           g_object_unref (control_message);
646         }
647       g_free (worker->read_ancillary_messages);
648     }
649
650   if (bytes_read == -1)
651     {
652       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
653       g_error_free (error);
654       goto out;
655     }
656
657 #if 0
658   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
659            (gint) bytes_read,
660            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
661            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
662            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
663                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
664            worker->stream,
665            worker);
666 #endif
667
668   /* TODO: hmm, hmm... */
669   if (bytes_read == 0)
670     {
671       g_set_error (&error,
672                    G_IO_ERROR,
673                    G_IO_ERROR_FAILED,
674                    "Underlying GIOStream returned 0 bytes on an async read");
675       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
676       g_error_free (error);
677       goto out;
678     }
679
680   read_message_print_transport_debug (bytes_read, worker);
681
682   worker->read_buffer_cur_size += bytes_read;
683   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
684     {
685       /* OK, got what we asked for! */
686       if (worker->read_buffer_bytes_wanted == 16)
687         {
688           gssize message_len;
689           /* OK, got the header - determine how many more bytes are needed */
690           error = NULL;
691           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
692                                                      16,
693                                                      &error);
694           if (message_len == -1)
695             {
696               g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
697               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
698               g_error_free (error);
699               goto out;
700             }
701
702           worker->read_buffer_bytes_wanted = message_len;
703           _g_dbus_worker_do_read_unlocked (worker);
704         }
705       else
706         {
707           GDBusMessage *message;
708           error = NULL;
709
710           /* TODO: use connection->priv->auth to decode the message */
711
712           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
713                                                   worker->read_buffer_cur_size,
714                                                   worker->capabilities,
715                                                   &error);
716           if (message == NULL)
717             {
718               gchar *s;
719               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
720               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
721                          "The error is: %s\n"
722                          "The payload is as follows:\n"
723                          "%s\n",
724                          worker->read_buffer_cur_size,
725                          error->message,
726                          s);
727               g_free (s);
728               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
729               g_error_free (error);
730               goto out;
731             }
732
733 #ifdef G_OS_UNIX
734           if (worker->read_fd_list != NULL)
735             {
736               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
737               g_object_unref (worker->read_fd_list);
738               worker->read_fd_list = NULL;
739             }
740 #endif
741
742           if (G_UNLIKELY (_g_dbus_debug_message ()))
743             {
744               gchar *s;
745               _g_dbus_debug_print_lock ();
746               g_print ("========================================================================\n"
747                        "GDBus-debug:Message:\n"
748                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
749                        worker->read_buffer_cur_size);
750               s = g_dbus_message_print (message, 2);
751               g_print ("%s", s);
752               g_free (s);
753               if (G_UNLIKELY (_g_dbus_debug_payload ()))
754                 {
755                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
756                   g_print ("%s\n", s);
757                   g_free (s);
758                 }
759               _g_dbus_debug_print_unlock ();
760             }
761
762           /* yay, got a message, go deliver it */
763           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
764
765           /* start reading another message! */
766           worker->read_buffer_bytes_wanted = 0;
767           worker->read_buffer_cur_size = 0;
768           _g_dbus_worker_do_read_unlocked (worker);
769         }
770     }
771   else
772     {
773       /* didn't get all the bytes we requested - so repeat the request... */
774       _g_dbus_worker_do_read_unlocked (worker);
775     }
776
777  out:
778   g_mutex_unlock (worker->read_lock);
779
780   /* gives up the reference acquired when calling g_input_stream_read_async() */
781   _g_dbus_worker_unref (worker);
782 }
783
784 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
785 static void
786 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
787 {
788   /* if bytes_wanted is zero, it means start reading a message */
789   if (worker->read_buffer_bytes_wanted == 0)
790     {
791       worker->read_buffer_cur_size = 0;
792       worker->read_buffer_bytes_wanted = 16;
793     }
794
795   /* ensure we have a (big enough) buffer */
796   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
797     {
798       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
799       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
800       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
801     }
802
803   if (worker->socket == NULL)
804     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
805                                worker->read_buffer + worker->read_buffer_cur_size,
806                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
807                                G_PRIORITY_DEFAULT,
808                                worker->cancellable,
809                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
810                                _g_dbus_worker_ref (worker));
811   else
812     {
813       worker->read_ancillary_messages = NULL;
814       worker->read_num_ancillary_messages = 0;
815       _g_socket_read_with_control_messages (worker->socket,
816                                             worker->read_buffer + worker->read_buffer_cur_size,
817                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
818                                             &worker->read_ancillary_messages,
819                                             &worker->read_num_ancillary_messages,
820                                             G_PRIORITY_DEFAULT,
821                                             worker->cancellable,
822                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
823                                             _g_dbus_worker_ref (worker));
824     }
825 }
826
827 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
828 static gboolean
829 _g_dbus_worker_do_initial_read (gpointer data)
830 {
831   GDBusWorker *worker = data;
832   g_mutex_lock (worker->read_lock);
833   _g_dbus_worker_do_read_unlocked (worker);
834   g_mutex_unlock (worker->read_lock);
835   return FALSE;
836 }
837
838 /* ---------------------------------------------------------------------------------------------------- */
839
840 struct _MessageToWriteData
841 {
842   GDBusWorker  *worker;
843   GDBusMessage *message;
844   gchar        *blob;
845   gsize         blob_size;
846
847   gsize               total_written;
848   GSimpleAsyncResult *simple;
849
850 };
851
852 static void
853 message_to_write_data_free (MessageToWriteData *data)
854 {
855   _g_dbus_worker_unref (data->worker);
856   if (data->message)
857     g_object_unref (data->message);
858   g_free (data->blob);
859   g_free (data);
860 }
861
862 /* ---------------------------------------------------------------------------------------------------- */
863
864 static void write_message_continue_writing (MessageToWriteData *data);
865
866 /* called in private thread shared by all GDBusConnection instances
867  *
868  * write-lock is not held on entry
869  * output_pending is true on entry
870  */
871 static void
872 write_message_async_cb (GObject      *source_object,
873                         GAsyncResult *res,
874                         gpointer      user_data)
875 {
876   MessageToWriteData *data = user_data;
877   GSimpleAsyncResult *simple;
878   gssize bytes_written;
879   GError *error;
880
881   /* Note: we can't access data->simple after calling g_async_result_complete () because the
882    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
883    */
884   simple = data->simple;
885
886   error = NULL;
887   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
888                                                 res,
889                                                 &error);
890   if (bytes_written == -1)
891     {
892       g_simple_async_result_take_error (simple, error);
893       g_simple_async_result_complete (simple);
894       g_object_unref (simple);
895       goto out;
896     }
897   g_assert (bytes_written > 0); /* zero is never returned */
898
899   write_message_print_transport_debug (bytes_written, data);
900
901   data->total_written += bytes_written;
902   g_assert (data->total_written <= data->blob_size);
903   if (data->total_written == data->blob_size)
904     {
905       g_simple_async_result_complete (simple);
906       g_object_unref (simple);
907       goto out;
908     }
909
910   write_message_continue_writing (data);
911
912  out:
913   ;
914 }
915
916 /* called in private thread shared by all GDBusConnection instances
917  *
918  * write-lock is not held on entry
919  * output_pending is true on entry
920  */
921 static gboolean
922 on_socket_ready (GSocket      *socket,
923                  GIOCondition  condition,
924                  gpointer      user_data)
925 {
926   MessageToWriteData *data = user_data;
927   write_message_continue_writing (data);
928   return FALSE; /* remove source */
929 }
930
931 /* called in private thread shared by all GDBusConnection instances
932  *
933  * write-lock is not held on entry
934  * output_pending is true on entry
935  */
936 static void
937 write_message_continue_writing (MessageToWriteData *data)
938 {
939   GOutputStream *ostream;
940   GSimpleAsyncResult *simple;
941 #ifdef G_OS_UNIX
942   GUnixFDList *fd_list;
943 #endif
944
945   /* Note: we can't access data->simple after calling g_async_result_complete () because the
946    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
947    */
948   simple = data->simple;
949
950   ostream = g_io_stream_get_output_stream (data->worker->stream);
951 #ifdef G_OS_UNIX
952   fd_list = g_dbus_message_get_unix_fd_list (data->message);
953 #endif
954
955   g_assert (!g_output_stream_has_pending (ostream));
956   g_assert_cmpint (data->total_written, <, data->blob_size);
957
958   if (FALSE)
959     {
960     }
961 #ifdef G_OS_UNIX
962   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
963     {
964       GOutputVector vector;
965       GSocketControlMessage *control_message;
966       gssize bytes_written;
967       GError *error;
968
969       vector.buffer = data->blob;
970       vector.size = data->blob_size;
971
972       control_message = NULL;
973       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
974         {
975           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
976             {
977               g_simple_async_result_set_error (simple,
978                                                G_IO_ERROR,
979                                                G_IO_ERROR_FAILED,
980                                                "Tried sending a file descriptor but remote peer does not support this capability");
981               g_simple_async_result_complete (simple);
982               g_object_unref (simple);
983               goto out;
984             }
985           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
986         }
987
988       error = NULL;
989       bytes_written = g_socket_send_message (data->worker->socket,
990                                              NULL, /* address */
991                                              &vector,
992                                              1,
993                                              control_message != NULL ? &control_message : NULL,
994                                              control_message != NULL ? 1 : 0,
995                                              G_SOCKET_MSG_NONE,
996                                              data->worker->cancellable,
997                                              &error);
998       if (control_message != NULL)
999         g_object_unref (control_message);
1000
1001       if (bytes_written == -1)
1002         {
1003           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1004           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1005             {
1006               GSource *source;
1007               source = g_socket_create_source (data->worker->socket,
1008                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
1009                                                data->worker->cancellable);
1010               g_source_set_callback (source,
1011                                      (GSourceFunc) on_socket_ready,
1012                                      data,
1013                                      NULL); /* GDestroyNotify */
1014               g_source_attach (source, g_main_context_get_thread_default ());
1015               g_source_unref (source);
1016               g_error_free (error);
1017               goto out;
1018             }
1019           g_simple_async_result_take_error (simple, error);
1020           g_simple_async_result_complete (simple);
1021           g_object_unref (simple);
1022           goto out;
1023         }
1024       g_assert (bytes_written > 0); /* zero is never returned */
1025
1026       write_message_print_transport_debug (bytes_written, data);
1027
1028       data->total_written += bytes_written;
1029       g_assert (data->total_written <= data->blob_size);
1030       if (data->total_written == data->blob_size)
1031         {
1032           g_simple_async_result_complete (simple);
1033           g_object_unref (simple);
1034           goto out;
1035         }
1036
1037       write_message_continue_writing (data);
1038     }
1039 #endif
1040   else
1041     {
1042 #ifdef G_OS_UNIX
1043       if (fd_list != NULL)
1044         {
1045           g_simple_async_result_set_error (simple,
1046                                            G_IO_ERROR,
1047                                            G_IO_ERROR_FAILED,
1048                                            "Tried sending a file descriptor on unsupported stream of type %s",
1049                                            g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1050           g_simple_async_result_complete (simple);
1051           g_object_unref (simple);
1052           goto out;
1053         }
1054 #endif
1055
1056       g_output_stream_write_async (ostream,
1057                                    (const gchar *) data->blob + data->total_written,
1058                                    data->blob_size - data->total_written,
1059                                    G_PRIORITY_DEFAULT,
1060                                    data->worker->cancellable,
1061                                    write_message_async_cb,
1062                                    data);
1063     }
1064  out:
1065   ;
1066 }
1067
1068 /* called in private thread shared by all GDBusConnection instances
1069  *
1070  * write-lock is not held on entry
1071  * output_pending is true on entry
1072  */
1073 static void
1074 write_message_async (GDBusWorker         *worker,
1075                      MessageToWriteData  *data,
1076                      GAsyncReadyCallback  callback,
1077                      gpointer             user_data)
1078 {
1079   data->simple = g_simple_async_result_new (NULL,
1080                                             callback,
1081                                             user_data,
1082                                             write_message_async);
1083   data->total_written = 0;
1084   write_message_continue_writing (data);
1085 }
1086
1087 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1088 static gboolean
1089 write_message_finish (GAsyncResult   *res,
1090                       GError        **error)
1091 {
1092   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1093   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1094     return FALSE;
1095   else
1096     return TRUE;
1097 }
1098 /* ---------------------------------------------------------------------------------------------------- */
1099
1100 static void maybe_write_next_message (GDBusWorker *worker);
1101
1102 typedef struct
1103 {
1104   GDBusWorker *worker;
1105   GList *flushers;
1106 } FlushAsyncData;
1107
1108 static void
1109 flush_data_list_complete (const GList  *flushers,
1110                           const GError *error)
1111 {
1112   const GList *l;
1113
1114   for (l = flushers; l != NULL; l = l->next)
1115     {
1116       FlushData *f = l->data;
1117
1118       f->error = error != NULL ? g_error_copy (error) : NULL;
1119
1120       g_mutex_lock (f->mutex);
1121       g_cond_signal (f->cond);
1122       g_mutex_unlock (f->mutex);
1123     }
1124 }
1125
1126 /* called in private thread shared by all GDBusConnection instances
1127  *
1128  * write-lock is not held on entry
1129  * output_pending is true on entry
1130  */
1131 static void
1132 ostream_flush_cb (GObject      *source_object,
1133                   GAsyncResult *res,
1134                   gpointer      user_data)
1135 {
1136   FlushAsyncData *data = user_data;
1137   GError *error;
1138
1139   error = NULL;
1140   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1141                                 res,
1142                                 &error);
1143
1144   if (error == NULL)
1145     {
1146       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1147         {
1148           _g_dbus_debug_print_lock ();
1149           g_print ("========================================================================\n"
1150                    "GDBus-debug:Transport:\n"
1151                    "  ---- FLUSHED stream of type %s\n",
1152                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1153           _g_dbus_debug_print_unlock ();
1154         }
1155     }
1156
1157   g_assert (data->flushers != NULL);
1158   flush_data_list_complete (data->flushers, error);
1159   g_list_free (data->flushers);
1160
1161   if (error != NULL)
1162     g_error_free (error);
1163
1164   /* Make sure we tell folks that we don't have additional
1165      flushes pending */
1166   g_mutex_lock (data->worker->write_lock);
1167   g_assert (data->worker->output_pending);
1168   data->worker->output_pending = FALSE;
1169   g_mutex_unlock (data->worker->write_lock);
1170
1171   /* OK, cool, finally kick off the next write */
1172   maybe_write_next_message (data->worker);
1173
1174   _g_dbus_worker_unref (data->worker);
1175   g_free (data);
1176 }
1177
1178 /* called in private thread shared by all GDBusConnection instances
1179  *
1180  * write-lock is not held on entry
1181  * output_pending is false on entry
1182  */
1183 static void
1184 message_written (GDBusWorker *worker,
1185                  MessageToWriteData *message_data)
1186 {
1187   GList *l;
1188   GList *ll;
1189   GList *flushers;
1190
1191   /* first log the fact that we wrote a message */
1192   if (G_UNLIKELY (_g_dbus_debug_message ()))
1193     {
1194       gchar *s;
1195       _g_dbus_debug_print_lock ();
1196       g_print ("========================================================================\n"
1197                "GDBus-debug:Message:\n"
1198                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1199                message_data->blob_size);
1200       s = g_dbus_message_print (message_data->message, 2);
1201       g_print ("%s", s);
1202       g_free (s);
1203       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1204         {
1205           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1206           g_print ("%s\n", s);
1207           g_free (s);
1208         }
1209       _g_dbus_debug_print_unlock ();
1210     }
1211
1212   /* then first wake up pending flushes and, if needed, flush the stream */
1213   flushers = NULL;
1214   g_mutex_lock (worker->write_lock);
1215   worker->write_num_messages_written += 1;
1216   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1217     {
1218       FlushData *f = l->data;
1219       ll = l->next;
1220
1221       if (f->number_to_wait_for == worker->write_num_messages_written)
1222         {
1223           flushers = g_list_append (flushers, f);
1224           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1225         }
1226     }
1227   if (flushers != NULL)
1228     {
1229       g_assert (!worker->output_pending);
1230       worker->output_pending = TRUE;
1231     }
1232   g_mutex_unlock (worker->write_lock);
1233
1234   if (flushers != NULL)
1235     {
1236       FlushAsyncData *data;
1237       data = g_new0 (FlushAsyncData, 1);
1238       data->worker = _g_dbus_worker_ref (worker);
1239       data->flushers = flushers;
1240       /* flush the stream before writing the next message */
1241       g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
1242                                    G_PRIORITY_DEFAULT,
1243                                    worker->cancellable,
1244                                    ostream_flush_cb,
1245                                    data);
1246     }
1247   else
1248     {
1249       /* kick off the next write! */
1250       maybe_write_next_message (worker);
1251     }
1252 }
1253
1254 /* called in private thread shared by all GDBusConnection instances
1255  *
1256  * write-lock is not held on entry
1257  * output_pending is true on entry
1258  */
1259 static void
1260 write_message_cb (GObject       *source_object,
1261                   GAsyncResult  *res,
1262                   gpointer       user_data)
1263 {
1264   MessageToWriteData *data = user_data;
1265   GError *error;
1266
1267   g_mutex_lock (data->worker->write_lock);
1268   g_assert (data->worker->output_pending);
1269   data->worker->output_pending = FALSE;
1270   g_mutex_unlock (data->worker->write_lock);
1271
1272   error = NULL;
1273   if (!write_message_finish (res, &error))
1274     {
1275       /* TODO: handle */
1276       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1277       g_error_free (error);
1278     }
1279
1280   /* this function will also kick of the next write (it might need to
1281    * flush so writing the next message might happen much later
1282    * e.g. async)
1283    */
1284   message_written (data->worker, data);
1285
1286   message_to_write_data_free (data);
1287 }
1288
1289 /* called in private thread shared by all GDBusConnection instances
1290  *
1291  * write-lock is not held on entry
1292  * output_pending is true on entry
1293  */
1294 static void
1295 iostream_close_cb (GObject      *source_object,
1296                    GAsyncResult *res,
1297                    gpointer      user_data)
1298 {
1299   GDBusWorker *worker = user_data;
1300   GError *error = NULL;
1301   GList *pending_close_attempts, *pending_flush_attempts;
1302   GQueue *send_queue;
1303
1304   g_io_stream_close_finish (worker->stream, res, &error);
1305
1306   g_mutex_lock (worker->write_lock);
1307
1308   pending_close_attempts = worker->pending_close_attempts;
1309   worker->pending_close_attempts = NULL;
1310
1311   pending_flush_attempts = worker->write_pending_flushes;
1312   worker->write_pending_flushes = NULL;
1313
1314   send_queue = worker->write_queue;
1315   worker->write_queue = g_queue_new ();
1316
1317   g_assert (worker->output_pending);
1318   worker->output_pending = FALSE;
1319
1320   g_mutex_unlock (worker->write_lock);
1321
1322   while (pending_close_attempts != NULL)
1323     {
1324       CloseData *close_data = pending_close_attempts->data;
1325
1326       pending_close_attempts = g_list_delete_link (pending_close_attempts,
1327                                                    pending_close_attempts);
1328
1329       if (close_data->result != NULL)
1330         {
1331           if (error != NULL)
1332             g_simple_async_result_set_from_error (close_data->result, error);
1333
1334           /* this must be in an idle because the result is likely to be
1335            * intended for another thread
1336            */
1337           g_simple_async_result_complete_in_idle (close_data->result);
1338         }
1339
1340       close_data_free (close_data);
1341     }
1342
1343   g_clear_error (&error);
1344
1345   /* all messages queued for sending are discarded */
1346   g_queue_foreach (send_queue, (GFunc) message_to_write_data_free, NULL);
1347   g_queue_free (send_queue);
1348
1349   /* all queued flushes fail */
1350   error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
1351                        _("Operation was cancelled"));
1352   flush_data_list_complete (pending_flush_attempts, error);
1353   g_list_free (pending_flush_attempts);
1354   g_clear_error (&error);
1355
1356   _g_dbus_worker_unref (worker);
1357 }
1358
1359 /* called in private thread shared by all GDBusConnection instances
1360  *
1361  * write-lock is not held on entry
1362  * output_pending must be false on entry
1363  */
1364 static void
1365 maybe_write_next_message (GDBusWorker *worker)
1366 {
1367   MessageToWriteData *data;
1368
1369  write_next:
1370   /* we mustn't try to write two things at once */
1371   g_assert (!worker->output_pending);
1372
1373   g_mutex_lock (worker->write_lock);
1374
1375   /* if we want to close the connection, that takes precedence */
1376   if (worker->pending_close_attempts != NULL)
1377     {
1378       worker->output_pending = TRUE;
1379
1380       g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
1381                                NULL, iostream_close_cb,
1382                                _g_dbus_worker_ref (worker));
1383       data = NULL;
1384     }
1385   else
1386     {
1387       data = g_queue_pop_head (worker->write_queue);
1388
1389       if (data != NULL)
1390         worker->output_pending = TRUE;
1391     }
1392
1393   g_mutex_unlock (worker->write_lock);
1394
1395   /* Note that write_lock is only used for protecting the @write_queue
1396    * and @output_pending fields of the GDBusWorker struct ... which we
1397    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1398    *
1399    * Therefore, it's fine to drop it here when calling back into user
1400    * code and then writing the message out onto the GIOStream since this
1401    * function only runs on the worker thread.
1402    */
1403   if (data != NULL)
1404     {
1405       GDBusMessage *old_message;
1406       guchar *new_blob;
1407       gsize new_blob_size;
1408       GError *error;
1409
1410       old_message = data->message;
1411       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1412       if (data->message == old_message)
1413         {
1414           /* filters had no effect - do nothing */
1415         }
1416       else if (data->message == NULL)
1417         {
1418           /* filters dropped message */
1419           g_mutex_lock (worker->write_lock);
1420           worker->output_pending = FALSE;
1421           g_mutex_unlock (worker->write_lock);
1422           message_to_write_data_free (data);
1423           goto write_next;
1424         }
1425       else
1426         {
1427           /* filters altered the message -> reencode */
1428           error = NULL;
1429           new_blob = g_dbus_message_to_blob (data->message,
1430                                              &new_blob_size,
1431                                              worker->capabilities,
1432                                              &error);
1433           if (new_blob == NULL)
1434             {
1435               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1436                * the old message instead
1437                */
1438               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1439                          g_dbus_message_get_serial (data->message),
1440                          error->message);
1441               g_error_free (error);
1442             }
1443           else
1444             {
1445               g_free (data->blob);
1446               data->blob = (gchar *) new_blob;
1447               data->blob_size = new_blob_size;
1448             }
1449         }
1450
1451       write_message_async (worker,
1452                            data,
1453                            write_message_cb,
1454                            data);
1455     }
1456 }
1457
1458 /* called in private thread shared by all GDBusConnection instances
1459  *
1460  * write-lock is not held on entry
1461  * output_pending may be true or false
1462  */
1463 static gboolean
1464 write_message_in_idle_cb (gpointer user_data)
1465 {
1466   GDBusWorker *worker = user_data;
1467
1468   /* Because this is the worker thread, we can read this struct member
1469    * without holding the lock: no other thread ever modifies it.
1470    */
1471   if (!worker->output_pending)
1472     maybe_write_next_message (worker);
1473
1474   return FALSE;
1475 }
1476
1477 /*
1478  * @write_data: (transfer full) (allow-none):
1479  * @close_data: (transfer full) (allow-none):
1480  *
1481  * Can be called from any thread
1482  *
1483  * write_lock is not held on entry
1484  * output_pending may be true or false
1485  */
1486 static void
1487 schedule_write_in_worker_thread (GDBusWorker        *worker,
1488                                  MessageToWriteData *write_data,
1489                                  CloseData          *close_data)
1490 {
1491   g_mutex_lock (worker->write_lock);
1492
1493   if (write_data != NULL)
1494     g_queue_push_tail (worker->write_queue, write_data);
1495
1496   if (close_data != NULL)
1497     worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
1498                                                      close_data);
1499
1500   if (!worker->output_pending)
1501     {
1502       GSource *idle_source;
1503       idle_source = g_idle_source_new ();
1504       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1505       g_source_set_callback (idle_source,
1506                              write_message_in_idle_cb,
1507                              _g_dbus_worker_ref (worker),
1508                              (GDestroyNotify) _g_dbus_worker_unref);
1509       g_source_attach (idle_source, worker->shared_thread_data->context);
1510       g_source_unref (idle_source);
1511     }
1512
1513   g_mutex_unlock (worker->write_lock);
1514 }
1515
1516 /* ---------------------------------------------------------------------------------------------------- */
1517
1518 /* can be called from any thread - steals blob
1519  *
1520  * write_lock is not held on entry
1521  * output_pending may be true or false
1522  */
1523 void
1524 _g_dbus_worker_send_message (GDBusWorker    *worker,
1525                              GDBusMessage   *message,
1526                              gchar          *blob,
1527                              gsize           blob_len)
1528 {
1529   MessageToWriteData *data;
1530
1531   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1532   g_return_if_fail (blob != NULL);
1533   g_return_if_fail (blob_len > 16);
1534
1535   data = g_new0 (MessageToWriteData, 1);
1536   data->worker = _g_dbus_worker_ref (worker);
1537   data->message = g_object_ref (message);
1538   data->blob = blob; /* steal! */
1539   data->blob_size = blob_len;
1540
1541   schedule_write_in_worker_thread (worker, data, NULL);
1542 }
1543
1544 /* ---------------------------------------------------------------------------------------------------- */
1545
1546 GDBusWorker *
1547 _g_dbus_worker_new (GIOStream                              *stream,
1548                     GDBusCapabilityFlags                    capabilities,
1549                     gboolean                                initially_frozen,
1550                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1551                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1552                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1553                     gpointer                                user_data)
1554 {
1555   GDBusWorker *worker;
1556   GSource *idle_source;
1557
1558   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1559   g_return_val_if_fail (message_received_callback != NULL, NULL);
1560   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1561   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1562
1563   worker = g_new0 (GDBusWorker, 1);
1564   worker->ref_count = 1;
1565
1566   worker->read_lock = g_mutex_new ();
1567   worker->message_received_callback = message_received_callback;
1568   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1569   worker->disconnected_callback = disconnected_callback;
1570   worker->user_data = user_data;
1571   worker->stream = g_object_ref (stream);
1572   worker->capabilities = capabilities;
1573   worker->cancellable = g_cancellable_new ();
1574   worker->output_pending = FALSE;
1575
1576   worker->frozen = initially_frozen;
1577   worker->received_messages_while_frozen = g_queue_new ();
1578
1579   worker->write_lock = g_mutex_new ();
1580   worker->write_queue = g_queue_new ();
1581
1582   if (G_IS_SOCKET_CONNECTION (worker->stream))
1583     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1584
1585   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1586
1587   /* begin reading */
1588   idle_source = g_idle_source_new ();
1589   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1590   g_source_set_callback (idle_source,
1591                          _g_dbus_worker_do_initial_read,
1592                          worker,
1593                          NULL);
1594   g_source_attach (idle_source, worker->shared_thread_data->context);
1595   g_source_unref (idle_source);
1596
1597   return worker;
1598 }
1599
1600 /* ---------------------------------------------------------------------------------------------------- */
1601
1602 /* can be called from any thread
1603  *
1604  * write_lock is not held on entry
1605  * output_pending may be true or false
1606  */
1607 void
1608 _g_dbus_worker_close (GDBusWorker         *worker,
1609                       GCancellable        *cancellable,
1610                       GSimpleAsyncResult  *result)
1611 {
1612   CloseData *close_data;
1613
1614   close_data = g_slice_new0 (CloseData);
1615   close_data->worker = _g_dbus_worker_ref (worker);
1616   close_data->cancellable =
1617       (cancellable == NULL ? NULL : g_object_ref (cancellable));
1618   close_data->result = (result == NULL ? NULL : g_object_ref (result));
1619
1620   g_cancellable_cancel (worker->cancellable);
1621   schedule_write_in_worker_thread (worker, NULL, close_data);
1622 }
1623
1624 /* This can be called from any thread - frees worker. Note that
1625  * callbacks might still happen if called from another thread than the
1626  * worker - use your own synchronization primitive in the callbacks.
1627  *
1628  * write_lock is not held on entry
1629  * output_pending may be true or false
1630  */
1631 void
1632 _g_dbus_worker_stop (GDBusWorker *worker)
1633 {
1634   worker->stopped = TRUE;
1635
1636   /* Cancel any pending operations and schedule a close of the underlying I/O
1637    * stream in the worker thread
1638    */
1639   _g_dbus_worker_close (worker, NULL, NULL);
1640
1641   /* _g_dbus_worker_close holds a ref until after an idle in the the worker
1642    * thread has run, so we no longer need to unref in an idle like in
1643    * commit 322e25b535
1644    */
1645   _g_dbus_worker_unref (worker);
1646 }
1647
1648 /* ---------------------------------------------------------------------------------------------------- */
1649
1650 /* can be called from any thread (except the worker thread) - blocks
1651  * calling thread until all queued outgoing messages are written and
1652  * the transport has been flushed
1653  *
1654  * write_lock is not held on entry
1655  * output_pending may be true or false
1656  */
1657 gboolean
1658 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1659                            GCancellable   *cancellable,
1660                            GError        **error)
1661 {
1662   gboolean ret;
1663   FlushData *data;
1664
1665   data = NULL;
1666   ret = TRUE;
1667
1668   /* if the queue is empty, there's nothing to wait for */
1669   g_mutex_lock (worker->write_lock);
1670   if (g_queue_get_length (worker->write_queue) > 0)
1671     {
1672       data = g_new0 (FlushData, 1);
1673       data->mutex = g_mutex_new ();
1674       data->cond = g_cond_new ();
1675       data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
1676       g_mutex_lock (data->mutex);
1677       worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
1678     }
1679   g_mutex_unlock (worker->write_lock);
1680
1681   if (data != NULL)
1682     {
1683       g_cond_wait (data->cond, data->mutex);
1684       g_mutex_unlock (data->mutex);
1685
1686       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1687       g_cond_free (data->cond);
1688       g_mutex_free (data->mutex);
1689       if (data->error != NULL)
1690         {
1691           ret = FALSE;
1692           g_propagate_error (error, data->error);
1693         }
1694       g_free (data);
1695     }
1696
1697   return ret;
1698 }
1699
1700 /* ---------------------------------------------------------------------------------------------------- */
1701
1702 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1703 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1704 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1705 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1706 #define G_DBUS_DEBUG_CALL           (1<<4)
1707 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1708 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1709 #define G_DBUS_DEBUG_RETURN         (1<<7)
1710 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1711 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1712
1713 static gint _gdbus_debug_flags = 0;
1714
1715 gboolean
1716 _g_dbus_debug_authentication (void)
1717 {
1718   _g_dbus_initialize ();
1719   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1720 }
1721
1722 gboolean
1723 _g_dbus_debug_transport (void)
1724 {
1725   _g_dbus_initialize ();
1726   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1727 }
1728
1729 gboolean
1730 _g_dbus_debug_message (void)
1731 {
1732   _g_dbus_initialize ();
1733   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1734 }
1735
1736 gboolean
1737 _g_dbus_debug_payload (void)
1738 {
1739   _g_dbus_initialize ();
1740   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1741 }
1742
1743 gboolean
1744 _g_dbus_debug_call (void)
1745 {
1746   _g_dbus_initialize ();
1747   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1748 }
1749
1750 gboolean
1751 _g_dbus_debug_signal (void)
1752 {
1753   _g_dbus_initialize ();
1754   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1755 }
1756
1757 gboolean
1758 _g_dbus_debug_incoming (void)
1759 {
1760   _g_dbus_initialize ();
1761   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1762 }
1763
1764 gboolean
1765 _g_dbus_debug_return (void)
1766 {
1767   _g_dbus_initialize ();
1768   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1769 }
1770
1771 gboolean
1772 _g_dbus_debug_emission (void)
1773 {
1774   _g_dbus_initialize ();
1775   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1776 }
1777
1778 gboolean
1779 _g_dbus_debug_address (void)
1780 {
1781   _g_dbus_initialize ();
1782   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1783 }
1784
1785 G_LOCK_DEFINE_STATIC (print_lock);
1786
1787 void
1788 _g_dbus_debug_print_lock (void)
1789 {
1790   G_LOCK (print_lock);
1791 }
1792
1793 void
1794 _g_dbus_debug_print_unlock (void)
1795 {
1796   G_UNLOCK (print_lock);
1797 }
1798
1799 /*
1800  * _g_dbus_initialize:
1801  *
1802  * Does various one-time init things such as
1803  *
1804  *  - registering the G_DBUS_ERROR error domain
1805  *  - parses the G_DBUS_DEBUG environment variable
1806  */
1807 void
1808 _g_dbus_initialize (void)
1809 {
1810   static volatile gsize initialized = 0;
1811
1812   if (g_once_init_enter (&initialized))
1813     {
1814       volatile GQuark g_dbus_error_domain;
1815       const gchar *debug;
1816
1817       g_dbus_error_domain = G_DBUS_ERROR;
1818       (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
1819
1820       debug = g_getenv ("G_DBUS_DEBUG");
1821       if (debug != NULL)
1822         {
1823           const GDebugKey keys[] = {
1824             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1825             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
1826             { "message",        G_DBUS_DEBUG_MESSAGE        },
1827             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
1828             { "call",           G_DBUS_DEBUG_CALL           },
1829             { "signal",         G_DBUS_DEBUG_SIGNAL         },
1830             { "incoming",       G_DBUS_DEBUG_INCOMING       },
1831             { "return",         G_DBUS_DEBUG_RETURN         },
1832             { "emission",       G_DBUS_DEBUG_EMISSION       },
1833             { "address",        G_DBUS_DEBUG_ADDRESS        }
1834           };
1835
1836           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1837           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1838             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1839         }
1840
1841       g_once_init_leave (&initialized, 1);
1842     }
1843 }
1844
1845 /* ---------------------------------------------------------------------------------------------------- */
1846
1847 GVariantType *
1848 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1849 {
1850   const GVariantType *arg_types[256];
1851   guint n;
1852
1853   if (args)
1854     for (n = 0; args[n] != NULL; n++)
1855       {
1856         /* DBus places a hard limit of 255 on signature length.
1857          * therefore number of args must be less than 256.
1858          */
1859         g_assert (n < 256);
1860
1861         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1862
1863         if G_UNLIKELY (arg_types[n] == NULL)
1864           return NULL;
1865       }
1866   else
1867     n = 0;
1868
1869   return g_variant_type_new_tuple (arg_types, n);
1870 }
1871
1872 /* ---------------------------------------------------------------------------------------------------- */
1873
1874 #ifdef G_OS_WIN32
1875
1876 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1877
1878 gchar *
1879 _g_dbus_win32_get_user_sid (void)
1880 {
1881   HANDLE h;
1882   TOKEN_USER *user;
1883   DWORD token_information_len;
1884   PSID psid;
1885   gchar *sid;
1886   gchar *ret;
1887
1888   ret = NULL;
1889   user = NULL;
1890   h = INVALID_HANDLE_VALUE;
1891
1892   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
1893     {
1894       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
1895       goto out;
1896     }
1897
1898   /* Get length of buffer */
1899   token_information_len = 0;
1900   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
1901     {
1902       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
1903         {
1904           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1905           goto out;
1906         }
1907     }
1908   user = g_malloc (token_information_len);
1909   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
1910     {
1911       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1912       goto out;
1913     }
1914
1915   psid = user->User.Sid;
1916   if (!IsValidSid (psid))
1917     {
1918       g_warning ("Invalid SID");
1919       goto out;
1920     }
1921
1922   if (!ConvertSidToStringSidA (psid, &sid))
1923     {
1924       g_warning ("Invalid SID");
1925       goto out;
1926     }
1927
1928   ret = g_strdup (sid);
1929   LocalFree (sid);
1930
1931 out:
1932   g_free (user);
1933   if (h != INVALID_HANDLE_VALUE)
1934     CloseHandle (h);
1935   return ret;
1936 }
1937 #endif
1938
1939 /* ---------------------------------------------------------------------------------------------------- */
1940
1941 gchar *
1942 _g_dbus_get_machine_id (GError **error)
1943 {
1944   gchar *ret;
1945   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
1946   ret = NULL;
1947   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
1948                             &ret,
1949                             NULL,
1950                             error))
1951     {
1952       g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: "));
1953     }
1954   else
1955     {
1956       /* TODO: validate value */
1957       g_strstrip (ret);
1958     }
1959   return ret;
1960 }
1961
1962 /* ---------------------------------------------------------------------------------------------------- */
1963
1964 gchar *
1965 _g_dbus_enum_to_string (GType enum_type, gint value)
1966 {
1967   gchar *ret;
1968   GEnumClass *klass;
1969   GEnumValue *enum_value;
1970
1971   klass = g_type_class_ref (enum_type);
1972   enum_value = g_enum_get_value (klass, value);
1973   if (enum_value != NULL)
1974     ret = g_strdup (enum_value->value_nick);
1975   else
1976     ret = g_strdup_printf ("unknown (value %d)", value);
1977   g_type_class_unref (klass);
1978   return ret;
1979 }
1980
1981 /* ---------------------------------------------------------------------------------------------------- */
1982
1983 static void
1984 write_message_print_transport_debug (gssize bytes_written,
1985                                      MessageToWriteData *data)
1986 {
1987   if (G_LIKELY (!_g_dbus_debug_transport ()))
1988     goto out;
1989
1990   _g_dbus_debug_print_lock ();
1991   g_print ("========================================================================\n"
1992            "GDBus-debug:Transport:\n"
1993            "  >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
1994            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
1995            bytes_written,
1996            g_dbus_message_get_serial (data->message),
1997            data->blob_size,
1998            data->total_written,
1999            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
2000   _g_dbus_debug_print_unlock ();
2001  out:
2002   ;
2003 }
2004
2005 /* ---------------------------------------------------------------------------------------------------- */
2006
2007 static void
2008 read_message_print_transport_debug (gssize bytes_read,
2009                                     GDBusWorker *worker)
2010 {
2011   gsize size;
2012   gint32 serial;
2013   gint32 message_length;
2014
2015   if (G_LIKELY (!_g_dbus_debug_transport ()))
2016     goto out;
2017
2018   size = bytes_read + worker->read_buffer_cur_size;
2019   serial = 0;
2020   message_length = 0;
2021   if (size >= 16)
2022     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
2023   if (size >= 1)
2024     {
2025       switch (worker->read_buffer[0])
2026         {
2027         case 'l':
2028           if (size >= 12)
2029             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
2030           break;
2031         case 'B':
2032           if (size >= 12)
2033             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
2034           break;
2035         default:
2036           /* an error will be set elsewhere if this happens */
2037           goto out;
2038         }
2039     }
2040
2041     _g_dbus_debug_print_lock ();
2042   g_print ("========================================================================\n"
2043            "GDBus-debug:Transport:\n"
2044            "  <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
2045            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
2046            bytes_read,
2047            serial,
2048            message_length,
2049            worker->read_buffer_cur_size,
2050            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
2051   _g_dbus_debug_print_unlock ();
2052  out:
2053   ;
2054 }
2055
2056 /* ---------------------------------------------------------------------------------------------------- */
2057
2058 gboolean
2059 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
2060                                      GValue                *return_accu,
2061                                      const GValue          *handler_return,
2062                                      gpointer               dummy)
2063 {
2064   gboolean continue_emission;
2065   gboolean signal_return;
2066
2067   signal_return = g_value_get_boolean (handler_return);
2068   g_value_set_boolean (return_accu, signal_return);
2069   continue_emission = signal_return;
2070
2071   return continue_emission;
2072 }