60f9bbfdd94635cd114964a532ccd29e6ea7863e
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: David Zeuthen <davidz@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <stdlib.h>
26 #include <string.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30
31 #include "giotypes.h"
32 #include "gsocket.h"
33 #include "gdbusprivate.h"
34 #include "gdbusmessage.h"
35 #include "gdbuserror.h"
36 #include "gdbusintrospection.h"
37 #include "gasyncresult.h"
38 #include "gsimpleasyncresult.h"
39 #include "ginputstream.h"
40 #include "giostream.h"
41 #include "gsocketcontrolmessage.h"
42 #include "gsocketconnection.h"
43 #include "gsocketoutputstream.h"
44
45 #ifdef G_OS_UNIX
46 #include "gunixfdmessage.h"
47 #include "gunixconnection.h"
48 #include "gunixcredentialsmessage.h"
49 #endif
50
51 #ifdef G_OS_WIN32
52 #include <windows.h>
53 #endif
54
55 #include "glibintl.h"
56
57 /* ---------------------------------------------------------------------------------------------------- */
58
59 gchar *
60 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
61 {
62  guint n, m;
63  GString *ret;
64
65  ret = g_string_new (NULL);
66
67  for (n = 0; n < len; n += 16)
68    {
69      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
70
71      for (m = n; m < n + 16; m++)
72        {
73          if (m > n && (m%4) == 0)
74            g_string_append_c (ret, ' ');
75          if (m < len)
76            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
77          else
78            g_string_append (ret, "   ");
79        }
80
81      g_string_append (ret, "   ");
82
83      for (m = n; m < len && m < n + 16; m++)
84        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
85
86      g_string_append_c (ret, '\n');
87    }
88
89  return g_string_free (ret, FALSE);
90 }
91
92 /* ---------------------------------------------------------------------------------------------------- */
93
94 /* Unfortunately ancillary messages are discarded when reading from a
95  * socket using the GSocketInputStream abstraction. So we provide a
96  * very GInputStream-ish API that uses GSocket in this case (very
97  * similar to GSocketInputStream).
98  */
99
100 typedef struct
101 {
102   GSocket *socket;
103   GCancellable *cancellable;
104
105   void *buffer;
106   gsize count;
107
108   GSocketControlMessage ***messages;
109   gint *num_messages;
110
111   GSimpleAsyncResult *simple;
112
113   gboolean from_mainloop;
114 } ReadWithControlData;
115
116 static void
117 read_with_control_data_free (ReadWithControlData *data)
118 {
119   g_object_unref (data->socket);
120   if (data->cancellable != NULL)
121     g_object_unref (data->cancellable);
122   g_object_unref (data->simple);
123   g_free (data);
124 }
125
126 static gboolean
127 _g_socket_read_with_control_messages_ready (GSocket      *socket,
128                                             GIOCondition  condition,
129                                             gpointer      user_data)
130 {
131   ReadWithControlData *data = user_data;
132   GError *error;
133   gssize result;
134   GInputVector vector;
135
136   error = NULL;
137   vector.buffer = data->buffer;
138   vector.size = data->count;
139   result = g_socket_receive_message (data->socket,
140                                      NULL, /* address */
141                                      &vector,
142                                      1,
143                                      data->messages,
144                                      data->num_messages,
145                                      NULL,
146                                      data->cancellable,
147                                      &error);
148   if (result >= 0)
149     {
150       g_simple_async_result_set_op_res_gssize (data->simple, result);
151     }
152   else
153     {
154       g_assert (error != NULL);
155       g_simple_async_result_set_from_error (data->simple, error);
156       g_error_free (error);
157     }
158
159   if (data->from_mainloop)
160     g_simple_async_result_complete (data->simple);
161   else
162     g_simple_async_result_complete_in_idle (data->simple);
163
164   return FALSE;
165 }
166
167 static void
168 _g_socket_read_with_control_messages (GSocket                 *socket,
169                                       void                    *buffer,
170                                       gsize                    count,
171                                       GSocketControlMessage ***messages,
172                                       gint                    *num_messages,
173                                       gint                     io_priority,
174                                       GCancellable            *cancellable,
175                                       GAsyncReadyCallback      callback,
176                                       gpointer                 user_data)
177 {
178   ReadWithControlData *data;
179
180   data = g_new0 (ReadWithControlData, 1);
181   data->socket = g_object_ref (socket);
182   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
183   data->buffer = buffer;
184   data->count = count;
185   data->messages = messages;
186   data->num_messages = num_messages;
187
188   data->simple = g_simple_async_result_new (G_OBJECT (socket),
189                                             callback,
190                                             user_data,
191                                             _g_socket_read_with_control_messages);
192
193   if (!g_socket_condition_check (socket, G_IO_IN))
194     {
195       GSource *source;
196       data->from_mainloop = TRUE;
197       source = g_socket_create_source (data->socket,
198                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
199                                        cancellable);
200       g_source_set_callback (source,
201                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
202                              data,
203                              (GDestroyNotify) read_with_control_data_free);
204       g_source_attach (source, g_main_context_get_thread_default ());
205       g_source_unref (source);
206     }
207   else
208     {
209       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
210       read_with_control_data_free (data);
211     }
212 }
213
214 static gssize
215 _g_socket_read_with_control_messages_finish (GSocket       *socket,
216                                              GAsyncResult  *result,
217                                              GError       **error)
218 {
219   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
220
221   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
222   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
223
224   if (g_simple_async_result_propagate_error (simple, error))
225       return -1;
226   else
227     return g_simple_async_result_get_op_res_gssize (simple);
228 }
229
230 /* ---------------------------------------------------------------------------------------------------- */
231
232 G_LOCK_DEFINE_STATIC (shared_thread_lock);
233
234 typedef struct
235 {
236   gint num_users;
237   GThread *thread;
238   GMainContext *context;
239   GMainLoop *loop;
240 } SharedThreadData;
241
242 static SharedThreadData *shared_thread_data = NULL;
243
244 static gpointer
245 shared_thread_func (gpointer data)
246 {
247   g_main_context_push_thread_default (shared_thread_data->context);
248   g_main_loop_run (shared_thread_data->loop);
249   g_main_context_pop_thread_default (shared_thread_data->context);
250   return NULL;
251 }
252
253 typedef void (*GDBusSharedThreadFunc) (gpointer user_data);
254
255 typedef struct
256 {
257   GDBusSharedThreadFunc func;
258   gpointer              user_data;
259   gboolean              done;
260 } CallerData;
261
262 static gboolean
263 invoke_caller (gpointer user_data)
264 {
265   CallerData *data = user_data;
266   data->func (data->user_data);
267   data->done = TRUE;
268   return FALSE;
269 }
270
271 static void
272 _g_dbus_shared_thread_ref (GDBusSharedThreadFunc func,
273                            gpointer              user_data)
274 {
275   GError *error;
276   GSource *idle_source;
277   CallerData *data;
278
279   G_LOCK (shared_thread_lock);
280
281   if (shared_thread_data != NULL)
282     {
283       shared_thread_data->num_users += 1;
284       goto have_thread;
285     }
286
287   shared_thread_data = g_new0 (SharedThreadData, 1);
288   shared_thread_data->num_users = 1;
289
290   error = NULL;
291   shared_thread_data->context = g_main_context_new ();
292   shared_thread_data->loop = g_main_loop_new (shared_thread_data->context, FALSE);
293   shared_thread_data->thread = g_thread_create (shared_thread_func,
294                                                 NULL,
295                                                 TRUE,
296                                                 &error);
297   g_assert_no_error (error);
298
299  have_thread:
300
301   data = g_new0 (CallerData, 1);
302   data->func = func;
303   data->user_data = user_data;
304   data->done = FALSE;
305
306   idle_source = g_idle_source_new ();
307   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
308   g_source_set_callback (idle_source,
309                          invoke_caller,
310                          data,
311                          NULL);
312   g_source_attach (idle_source, shared_thread_data->context);
313   g_source_unref (idle_source);
314
315   /* wait for the user code to run.. hmm.. probably use a condition variable instead */
316   while (!data->done)
317     g_thread_yield ();
318
319   g_free (data);
320
321   G_UNLOCK (shared_thread_lock);
322 }
323
324 static void
325 _g_dbus_shared_thread_unref (void)
326 {
327   /* TODO: actually destroy the shared thread here */
328 #if 0
329   G_LOCK (shared_thread_lock);
330   g_assert (shared_thread_data != NULL);
331   shared_thread_data->num_users -= 1;
332   if (shared_thread_data->num_users == 0)
333     {
334       g_main_loop_quit (shared_thread_data->loop);
335       //g_thread_join (shared_thread_data->thread);
336       g_main_loop_unref (shared_thread_data->loop);
337       g_main_context_unref (shared_thread_data->context);
338       g_free (shared_thread_data);
339       shared_thread_data = NULL;
340       G_UNLOCK (shared_thread_lock);
341     }
342   else
343     {
344       G_UNLOCK (shared_thread_lock);
345     }
346 #endif
347 }
348
349 /* ---------------------------------------------------------------------------------------------------- */
350
351 struct GDBusWorker
352 {
353   volatile gint                       ref_count;
354
355   gboolean                            stopped;
356
357   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
358    * only affects messages received from the other peer (since GDBusServer is the
359    * only user) - we might want it to affect messages sent to the other peer too?
360    */
361   gboolean                            frozen;
362   GQueue                             *received_messages_while_frozen;
363
364   GIOStream                          *stream;
365   GDBusCapabilityFlags                capabilities;
366   GCancellable                       *cancellable;
367   GDBusWorkerMessageReceivedCallback  message_received_callback;
368   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
369   GDBusWorkerDisconnectedCallback     disconnected_callback;
370   gpointer                            user_data;
371
372   GThread                            *thread;
373
374   /* if not NULL, stream is GSocketConnection */
375   GSocket *socket;
376
377   /* used for reading */
378   GMutex                             *read_lock;
379   gchar                              *read_buffer;
380   gsize                               read_buffer_allocated_size;
381   gsize                               read_buffer_cur_size;
382   gsize                               read_buffer_bytes_wanted;
383   GUnixFDList                        *read_fd_list;
384   GSocketControlMessage             **read_ancillary_messages;
385   gint                                read_num_ancillary_messages;
386
387   /* used for writing */
388   GMutex                             *write_lock;
389   GQueue                             *write_queue;
390   gint                                num_writes_pending;
391   guint64                             write_num_messages_written;
392   GList                              *write_pending_flushes;
393 };
394
395 /* ---------------------------------------------------------------------------------------------------- */
396
397 typedef struct
398 {
399   GMutex *mutex;
400   GCond *cond;
401   guint64 number_to_wait_for;
402   GError *error;
403 } FlushData;
404
405 struct _MessageToWriteData ;
406 typedef struct _MessageToWriteData MessageToWriteData;
407
408 static void message_to_write_data_free (MessageToWriteData *data);
409
410 static void read_message_print_transport_debug (gssize bytes_read,
411                                                 GDBusWorker *worker);
412
413 static void write_message_print_transport_debug (gssize bytes_written,
414                                                  MessageToWriteData *data);
415
416 /* ---------------------------------------------------------------------------------------------------- */
417
418 static GDBusWorker *
419 _g_dbus_worker_ref (GDBusWorker *worker)
420 {
421   g_atomic_int_inc (&worker->ref_count);
422   return worker;
423 }
424
425 static void
426 _g_dbus_worker_unref (GDBusWorker *worker)
427 {
428   if (g_atomic_int_dec_and_test (&worker->ref_count))
429     {
430       g_assert (worker->write_pending_flushes == NULL);
431
432       _g_dbus_shared_thread_unref ();
433
434       g_object_unref (worker->stream);
435
436       g_mutex_free (worker->read_lock);
437       g_object_unref (worker->cancellable);
438       if (worker->read_fd_list != NULL)
439         g_object_unref (worker->read_fd_list);
440
441       g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
442       g_queue_free (worker->received_messages_while_frozen);
443
444       g_mutex_free (worker->write_lock);
445       g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
446       g_queue_free (worker->write_queue);
447
448       g_free (worker);
449     }
450 }
451
452 static void
453 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
454                                   gboolean      remote_peer_vanished,
455                                   GError       *error)
456 {
457   if (!worker->stopped)
458     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
459 }
460
461 static void
462 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
463                                       GDBusMessage *message)
464 {
465   if (!worker->stopped)
466     worker->message_received_callback (worker, message, worker->user_data);
467 }
468
469 static gboolean
470 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
471                                               GDBusMessage *message)
472 {
473   gboolean ret;
474   ret = FALSE;
475   if (!worker->stopped)
476     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
477   return ret;
478 }
479
480 /* can only be called from private thread with read-lock held - takes ownership of @message */
481 static void
482 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
483                                                   GDBusMessage *message)
484 {
485   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
486     {
487       /* queue up */
488       g_queue_push_tail (worker->received_messages_while_frozen, message);
489     }
490   else
491     {
492       /* not frozen, nor anything in queue */
493       _g_dbus_worker_emit_message_received (worker, message);
494       g_object_unref (message);
495     }
496 }
497
498 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
499 static gboolean
500 unfreeze_in_idle_cb (gpointer user_data)
501 {
502   GDBusWorker *worker = user_data;
503   GDBusMessage *message;
504
505   g_mutex_lock (worker->read_lock);
506   if (worker->frozen)
507     {
508       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
509         {
510           _g_dbus_worker_emit_message_received (worker, message);
511           g_object_unref (message);
512         }
513       worker->frozen = FALSE;
514     }
515   else
516     {
517       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
518     }
519   g_mutex_unlock (worker->read_lock);
520   return FALSE;
521 }
522
523 /* can be called from any thread */
524 void
525 _g_dbus_worker_unfreeze (GDBusWorker *worker)
526 {
527   GSource *idle_source;
528   idle_source = g_idle_source_new ();
529   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
530   g_source_set_callback (idle_source,
531                          unfreeze_in_idle_cb,
532                          _g_dbus_worker_ref (worker),
533                          (GDestroyNotify) _g_dbus_worker_unref);
534   g_source_attach (idle_source, shared_thread_data->context);
535   g_source_unref (idle_source);
536 }
537
538 /* ---------------------------------------------------------------------------------------------------- */
539
540 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
541
542 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
543 static void
544 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
545                            GAsyncResult  *res,
546                            gpointer       user_data)
547 {
548   GDBusWorker *worker = user_data;
549   GError *error;
550   gssize bytes_read;
551
552   g_mutex_lock (worker->read_lock);
553
554   /* If already stopped, don't even process the reply */
555   if (worker->stopped)
556     goto out;
557
558   error = NULL;
559   if (worker->socket == NULL)
560     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
561                                              res,
562                                              &error);
563   else
564     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
565                                                               res,
566                                                               &error);
567   if (worker->read_num_ancillary_messages > 0)
568     {
569       gint n;
570       for (n = 0; n < worker->read_num_ancillary_messages; n++)
571         {
572           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
573
574           if (FALSE)
575             {
576             }
577 #ifdef G_OS_UNIX
578           else if (G_IS_UNIX_FD_MESSAGE (control_message))
579             {
580               GUnixFDMessage *fd_message;
581               gint *fds;
582               gint num_fds;
583
584               fd_message = G_UNIX_FD_MESSAGE (control_message);
585               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
586               if (worker->read_fd_list == NULL)
587                 {
588                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
589                 }
590               else
591                 {
592                   gint n;
593                   for (n = 0; n < num_fds; n++)
594                     {
595                       /* TODO: really want a append_steal() */
596                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
597                       close (fds[n]);
598                     }
599                 }
600               g_free (fds);
601             }
602           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
603             {
604               /* do nothing */
605             }
606 #endif
607           else
608             {
609               if (error == NULL)
610                 {
611                   g_set_error (&error,
612                                G_IO_ERROR,
613                                G_IO_ERROR_FAILED,
614                                "Unexpected ancillary message of type %s received from peer",
615                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
616                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
617                   g_error_free (error);
618                   g_object_unref (control_message);
619                   n++;
620                   while (n < worker->read_num_ancillary_messages)
621                     g_object_unref (worker->read_ancillary_messages[n++]);
622                   g_free (worker->read_ancillary_messages);
623                   goto out;
624                 }
625             }
626           g_object_unref (control_message);
627         }
628       g_free (worker->read_ancillary_messages);
629     }
630
631   if (bytes_read == -1)
632     {
633       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
634       g_error_free (error);
635       goto out;
636     }
637
638 #if 0
639   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
640            (gint) bytes_read,
641            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
642            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
643            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
644                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
645            worker->stream,
646            worker);
647 #endif
648
649   /* TODO: hmm, hmm... */
650   if (bytes_read == 0)
651     {
652       g_set_error (&error,
653                    G_IO_ERROR,
654                    G_IO_ERROR_FAILED,
655                    "Underlying GIOStream returned 0 bytes on an async read");
656       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
657       g_error_free (error);
658       goto out;
659     }
660
661   read_message_print_transport_debug (bytes_read, worker);
662
663   worker->read_buffer_cur_size += bytes_read;
664   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
665     {
666       /* OK, got what we asked for! */
667       if (worker->read_buffer_bytes_wanted == 16)
668         {
669           gssize message_len;
670           /* OK, got the header - determine how many more bytes are needed */
671           error = NULL;
672           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
673                                                      16,
674                                                      &error);
675           if (message_len == -1)
676             {
677               g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
678               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
679               g_error_free (error);
680               goto out;
681             }
682
683           worker->read_buffer_bytes_wanted = message_len;
684           _g_dbus_worker_do_read_unlocked (worker);
685         }
686       else
687         {
688           GDBusMessage *message;
689           error = NULL;
690
691           /* TODO: use connection->priv->auth to decode the message */
692
693           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
694                                                   worker->read_buffer_cur_size,
695                                                   worker->capabilities,
696                                                   &error);
697           if (message == NULL)
698             {
699               gchar *s;
700               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
701               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
702                          "The error is: %s\n"
703                          "The payload is as follows:\n"
704                          "%s\n",
705                          worker->read_buffer_cur_size,
706                          error->message,
707                          s);
708               g_free (s);
709               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
710               g_error_free (error);
711               goto out;
712             }
713
714 #ifdef G_OS_UNIX
715           if (worker->read_fd_list != NULL)
716             {
717               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
718               worker->read_fd_list = NULL;
719             }
720 #endif
721
722           if (G_UNLIKELY (_g_dbus_debug_message ()))
723             {
724               gchar *s;
725               _g_dbus_debug_print_lock ();
726               g_print ("========================================================================\n"
727                        "GDBus-debug:Message:\n"
728                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
729                        worker->read_buffer_cur_size);
730               s = g_dbus_message_print (message, 2);
731               g_print ("%s", s);
732               g_free (s);
733               if (G_UNLIKELY (_g_dbus_debug_payload ()))
734                 {
735                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
736                   g_print ("%s\n", s);
737                   g_free (s);
738                 }
739               _g_dbus_debug_print_unlock ();
740             }
741
742           /* yay, got a message, go deliver it */
743           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
744
745           /* start reading another message! */
746           worker->read_buffer_bytes_wanted = 0;
747           worker->read_buffer_cur_size = 0;
748           _g_dbus_worker_do_read_unlocked (worker);
749         }
750     }
751   else
752     {
753       /* didn't get all the bytes we requested - so repeat the request... */
754       _g_dbus_worker_do_read_unlocked (worker);
755     }
756
757  out:
758   g_mutex_unlock (worker->read_lock);
759
760   /* gives up the reference acquired when calling g_input_stream_read_async() */
761   _g_dbus_worker_unref (worker);
762 }
763
764 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
765 static void
766 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
767 {
768   /* if bytes_wanted is zero, it means start reading a message */
769   if (worker->read_buffer_bytes_wanted == 0)
770     {
771       worker->read_buffer_cur_size = 0;
772       worker->read_buffer_bytes_wanted = 16;
773     }
774
775   /* ensure we have a (big enough) buffer */
776   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
777     {
778       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
779       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
780       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
781     }
782
783   if (worker->socket == NULL)
784     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
785                                worker->read_buffer + worker->read_buffer_cur_size,
786                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
787                                G_PRIORITY_DEFAULT,
788                                worker->cancellable,
789                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
790                                _g_dbus_worker_ref (worker));
791   else
792     {
793       worker->read_ancillary_messages = NULL;
794       worker->read_num_ancillary_messages = 0;
795       _g_socket_read_with_control_messages (worker->socket,
796                                             worker->read_buffer + worker->read_buffer_cur_size,
797                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
798                                             &worker->read_ancillary_messages,
799                                             &worker->read_num_ancillary_messages,
800                                             G_PRIORITY_DEFAULT,
801                                             worker->cancellable,
802                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
803                                             _g_dbus_worker_ref (worker));
804     }
805 }
806
807 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
808 static void
809 _g_dbus_worker_do_read (GDBusWorker *worker)
810 {
811   g_mutex_lock (worker->read_lock);
812   _g_dbus_worker_do_read_unlocked (worker);
813   g_mutex_unlock (worker->read_lock);
814 }
815
816 /* ---------------------------------------------------------------------------------------------------- */
817
818 struct _MessageToWriteData
819 {
820   GDBusWorker  *worker;
821   GDBusMessage *message;
822   gchar        *blob;
823   gsize         blob_size;
824
825   gsize               total_written;
826   GSimpleAsyncResult *simple;
827
828 };
829
830 static void
831 message_to_write_data_free (MessageToWriteData *data)
832 {
833   _g_dbus_worker_unref (data->worker);
834   g_object_unref (data->message);
835   g_free (data->blob);
836   g_free (data);
837 }
838
839 /* ---------------------------------------------------------------------------------------------------- */
840
841 static void write_message_continue_writing (MessageToWriteData *data);
842
843 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
844 static void
845 write_message_async_cb (GObject      *source_object,
846                         GAsyncResult *res,
847                         gpointer      user_data)
848 {
849   MessageToWriteData *data = user_data;
850   GSimpleAsyncResult *simple;
851   gssize bytes_written;
852   GError *error;
853
854   /* Note: we can't access data->simple after calling g_async_result_complete () because the
855    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
856    */
857   simple = data->simple;
858
859   error = NULL;
860   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
861                                                 res,
862                                                 &error);
863   if (bytes_written == -1)
864     {
865       g_simple_async_result_set_from_error (simple, error);
866       g_error_free (error);
867       g_simple_async_result_complete (simple);
868       g_object_unref (simple);
869       goto out;
870     }
871   g_assert (bytes_written > 0); /* zero is never returned */
872
873   write_message_print_transport_debug (bytes_written, data);
874
875   data->total_written += bytes_written;
876   g_assert (data->total_written <= data->blob_size);
877   if (data->total_written == data->blob_size)
878     {
879       g_simple_async_result_complete (simple);
880       g_object_unref (simple);
881       goto out;
882     }
883
884   write_message_continue_writing (data);
885
886  out:
887   ;
888 }
889
890 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
891 static gboolean
892 on_socket_ready (GSocket      *socket,
893                  GIOCondition  condition,
894                  gpointer      user_data)
895 {
896   MessageToWriteData *data = user_data;
897   write_message_continue_writing (data);
898   return FALSE; /* remove source */
899 }
900
901 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
902 static void
903 write_message_continue_writing (MessageToWriteData *data)
904 {
905   GOutputStream *ostream;
906   GSimpleAsyncResult *simple;
907 #ifdef G_OS_UNIX
908   GUnixFDList *fd_list;
909 #endif
910
911   /* Note: we can't access data->simple after calling g_async_result_complete () because the
912    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
913    */
914   simple = data->simple;
915
916   ostream = g_io_stream_get_output_stream (data->worker->stream);
917 #ifdef G_OS_UNIX
918   fd_list = g_dbus_message_get_unix_fd_list (data->message);
919 #endif
920
921   g_assert (!g_output_stream_has_pending (ostream));
922   g_assert_cmpint (data->total_written, <, data->blob_size);
923
924   if (FALSE)
925     {
926     }
927 #ifdef G_OS_UNIX
928   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
929     {
930       GOutputVector vector;
931       GSocketControlMessage *control_message;
932       gssize bytes_written;
933       GError *error;
934
935       vector.buffer = data->blob;
936       vector.size = data->blob_size;
937
938       control_message = NULL;
939       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
940         {
941           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
942             {
943               g_simple_async_result_set_error (simple,
944                                                G_IO_ERROR,
945                                                G_IO_ERROR_FAILED,
946                                                "Tried sending a file descriptor but remote peer does not support this capability");
947               g_simple_async_result_complete (simple);
948               g_object_unref (simple);
949               goto out;
950             }
951           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
952         }
953
954       error = NULL;
955       bytes_written = g_socket_send_message (data->worker->socket,
956                                              NULL, /* address */
957                                              &vector,
958                                              1,
959                                              control_message != NULL ? &control_message : NULL,
960                                              control_message != NULL ? 1 : 0,
961                                              G_SOCKET_MSG_NONE,
962                                              data->worker->cancellable,
963                                              &error);
964       if (control_message != NULL)
965         g_object_unref (control_message);
966
967       if (bytes_written == -1)
968         {
969           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
970           if (error->domain == G_IO_ERROR && error->code == G_IO_ERROR_WOULD_BLOCK)
971             {
972               GSource *source;
973               source = g_socket_create_source (data->worker->socket,
974                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
975                                                data->worker->cancellable);
976               g_source_set_callback (source,
977                                      (GSourceFunc) on_socket_ready,
978                                      data,
979                                      NULL); /* GDestroyNotify */
980               g_source_attach (source, g_main_context_get_thread_default ());
981               g_source_unref (source);
982               goto out;
983             }
984           g_simple_async_result_set_from_error (simple, error);
985           g_error_free (error);
986           g_simple_async_result_complete (simple);
987           g_object_unref (simple);
988           goto out;
989         }
990       g_assert (bytes_written > 0); /* zero is never returned */
991
992       write_message_print_transport_debug (bytes_written, data);
993
994       data->total_written += bytes_written;
995       g_assert (data->total_written <= data->blob_size);
996       if (data->total_written == data->blob_size)
997         {
998           g_simple_async_result_complete (simple);
999           g_object_unref (simple);
1000           goto out;
1001         }
1002
1003       write_message_continue_writing (data);
1004     }
1005 #endif
1006   else
1007     {
1008       if (fd_list != NULL)
1009         {
1010           g_simple_async_result_set_error (simple,
1011                                            G_IO_ERROR,
1012                                            G_IO_ERROR_FAILED,
1013                                            "Tried sending a file descriptor on unsupported stream of type %s",
1014                                            g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1015           g_simple_async_result_complete (simple);
1016           g_object_unref (simple);
1017           goto out;
1018         }
1019
1020       g_output_stream_write_async (ostream,
1021                                    (const gchar *) data->blob + data->total_written,
1022                                    data->blob_size - data->total_written,
1023                                    G_PRIORITY_DEFAULT,
1024                                    data->worker->cancellable,
1025                                    write_message_async_cb,
1026                                    data);
1027     }
1028  out:
1029   ;
1030 }
1031
1032 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1033 static void
1034 write_message_async (GDBusWorker         *worker,
1035                      MessageToWriteData  *data,
1036                      GAsyncReadyCallback  callback,
1037                      gpointer             user_data)
1038 {
1039   data->simple = g_simple_async_result_new (NULL,
1040                                             callback,
1041                                             user_data,
1042                                             write_message_async);
1043   data->total_written = 0;
1044   write_message_continue_writing (data);
1045 }
1046
1047 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1048 static gboolean
1049 write_message_finish (GAsyncResult   *res,
1050                       GError        **error)
1051 {
1052   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1053   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1054     return FALSE;
1055   else
1056     return TRUE;
1057 }
1058 /* ---------------------------------------------------------------------------------------------------- */
1059
1060 static void maybe_write_next_message (GDBusWorker *worker);
1061
1062 typedef struct
1063 {
1064   GDBusWorker *worker;
1065   GList *flushers;
1066 } FlushAsyncData;
1067
1068 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1069 static void
1070 ostream_flush_cb (GObject      *source_object,
1071                   GAsyncResult *res,
1072                   gpointer      user_data)
1073 {
1074   FlushAsyncData *data = user_data;
1075   GError *error;
1076   GList *l;
1077
1078   error = NULL;
1079   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1080                                 res,
1081                                 &error);
1082
1083   if (error == NULL)
1084     {
1085       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1086         {
1087           _g_dbus_debug_print_lock ();
1088           g_print ("========================================================================\n"
1089                    "GDBus-debug:Transport:\n"
1090                    "  ---- FLUSHED stream of type %s\n",
1091                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1092           _g_dbus_debug_print_unlock ();
1093         }
1094     }
1095
1096   g_assert (data->flushers != NULL);
1097   for (l = data->flushers; l != NULL; l = l->next)
1098     {
1099       FlushData *f = l->data;
1100
1101       f->error = error != NULL ? g_error_copy (error) : NULL;
1102
1103       g_mutex_lock (f->mutex);
1104       g_cond_signal (f->cond);
1105       g_mutex_unlock (f->mutex);
1106     }
1107   g_list_free (data->flushers);
1108
1109   if (error != NULL)
1110     g_error_free (error);
1111
1112   /* OK, cool, finally kick off the next write */
1113   maybe_write_next_message (data->worker);
1114
1115   _g_dbus_worker_unref (data->worker);
1116   g_free (data);
1117 }
1118
1119 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1120 static void
1121 message_written (GDBusWorker *worker,
1122                  MessageToWriteData *message_data)
1123 {
1124   GList *l;
1125   GList *ll;
1126   GList *flushers;
1127
1128   /* first log the fact that we wrote a message */
1129   if (G_UNLIKELY (_g_dbus_debug_message ()))
1130     {
1131       gchar *s;
1132       _g_dbus_debug_print_lock ();
1133       g_print ("========================================================================\n"
1134                "GDBus-debug:Message:\n"
1135                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1136                message_data->blob_size);
1137       s = g_dbus_message_print (message_data->message, 2);
1138       g_print ("%s", s);
1139       g_free (s);
1140       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1141         {
1142           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1143           g_print ("%s\n", s);
1144           g_free (s);
1145         }
1146       _g_dbus_debug_print_unlock ();
1147     }
1148
1149   /* then first wake up pending flushes and, if needed, flush the stream */
1150   flushers = NULL;
1151   g_mutex_lock (worker->write_lock);
1152   worker->write_num_messages_written += 1;
1153   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1154     {
1155       FlushData *f = l->data;
1156       ll = l->next;
1157
1158       if (f->number_to_wait_for == worker->write_num_messages_written)
1159         {
1160           flushers = g_list_append (flushers, f);
1161           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1162         }
1163     }
1164   g_mutex_unlock (worker->write_lock);
1165
1166   if (flushers != NULL)
1167     {
1168       FlushAsyncData *data;
1169       data = g_new0 (FlushAsyncData, 1);
1170       data->worker = _g_dbus_worker_ref (worker);
1171       data->flushers = flushers;
1172       /* flush the stream before writing the next message */
1173       g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
1174                                    G_PRIORITY_DEFAULT,
1175                                    worker->cancellable,
1176                                    ostream_flush_cb,
1177                                    data);
1178     }
1179   else
1180     {
1181       /* kick off the next write! */
1182       maybe_write_next_message (worker);
1183     }
1184 }
1185
1186 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1187 static void
1188 write_message_cb (GObject       *source_object,
1189                   GAsyncResult  *res,
1190                   gpointer       user_data)
1191 {
1192   MessageToWriteData *data = user_data;
1193   GError *error;
1194
1195   g_mutex_lock (data->worker->write_lock);
1196   data->worker->num_writes_pending -= 1;
1197   g_mutex_unlock (data->worker->write_lock);
1198
1199   error = NULL;
1200   if (!write_message_finish (res, &error))
1201     {
1202       /* TODO: handle */
1203       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1204       g_error_free (error);
1205     }
1206
1207   /* this function will also kick of the next write (it might need to
1208    * flush so writing the next message might happen much later
1209    * e.g. async)
1210    */
1211   message_written (data->worker, data);
1212
1213   message_to_write_data_free (data);
1214 }
1215
1216 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1217 static void
1218 maybe_write_next_message (GDBusWorker *worker)
1219 {
1220   MessageToWriteData *data;
1221
1222  write_next:
1223
1224   g_mutex_lock (worker->write_lock);
1225   data = g_queue_pop_head (worker->write_queue);
1226   if (data != NULL)
1227     worker->num_writes_pending += 1;
1228   g_mutex_unlock (worker->write_lock);
1229
1230   /* Note that write_lock is only used for protecting the @write_queue
1231    * and @num_writes_pending fields of the GDBusWorker struct ... which we
1232    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1233    *
1234    * Therefore, it's fine to drop it here when calling back into user
1235    * code and then writing the message out onto the GIOStream since this
1236    * function only runs on the worker thread.
1237    */
1238   if (data != NULL)
1239     {
1240       gboolean message_was_dropped;
1241       message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1242       if (G_UNLIKELY (message_was_dropped))
1243         {
1244           g_mutex_lock (worker->write_lock);
1245           worker->num_writes_pending -= 1;
1246           g_mutex_unlock (worker->write_lock);
1247           message_to_write_data_free (data);
1248           goto write_next;
1249         }
1250       else
1251         {
1252           write_message_async (worker,
1253                                data,
1254                                write_message_cb,
1255                                data);
1256         }
1257     }
1258 }
1259
1260 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1261 static gboolean
1262 write_message_in_idle_cb (gpointer user_data)
1263 {
1264   GDBusWorker *worker = user_data;
1265   if (worker->num_writes_pending == 0)
1266     maybe_write_next_message (worker);
1267   return FALSE;
1268 }
1269
1270 /* ---------------------------------------------------------------------------------------------------- */
1271
1272 /* can be called from any thread - steals blob */
1273 void
1274 _g_dbus_worker_send_message (GDBusWorker    *worker,
1275                              GDBusMessage   *message,
1276                              gchar          *blob,
1277                              gsize           blob_len)
1278 {
1279   MessageToWriteData *data;
1280
1281   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1282   g_return_if_fail (blob != NULL);
1283   g_return_if_fail (blob_len > 16);
1284
1285   data = g_new0 (MessageToWriteData, 1);
1286   data->worker = _g_dbus_worker_ref (worker);
1287   data->message = g_object_ref (message);
1288   data->blob = blob; /* steal! */
1289   data->blob_size = blob_len;
1290
1291   g_mutex_lock (worker->write_lock);
1292   g_queue_push_tail (worker->write_queue, data);
1293   if (worker->num_writes_pending == 0)
1294     {
1295       GSource *idle_source;
1296       idle_source = g_idle_source_new ();
1297       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1298       g_source_set_callback (idle_source,
1299                              write_message_in_idle_cb,
1300                              _g_dbus_worker_ref (worker),
1301                              (GDestroyNotify) _g_dbus_worker_unref);
1302       g_source_attach (idle_source, shared_thread_data->context);
1303       g_source_unref (idle_source);
1304     }
1305   g_mutex_unlock (worker->write_lock);
1306 }
1307
1308 /* ---------------------------------------------------------------------------------------------------- */
1309
1310 static void
1311 _g_dbus_worker_thread_begin_func (gpointer user_data)
1312 {
1313   GDBusWorker *worker = user_data;
1314
1315   worker->thread = g_thread_self ();
1316
1317   /* begin reading */
1318   _g_dbus_worker_do_read (worker);
1319 }
1320
1321 GDBusWorker *
1322 _g_dbus_worker_new (GIOStream                              *stream,
1323                     GDBusCapabilityFlags                    capabilities,
1324                     gboolean                                initially_frozen,
1325                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1326                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1327                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1328                     gpointer                                user_data)
1329 {
1330   GDBusWorker *worker;
1331
1332   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1333   g_return_val_if_fail (message_received_callback != NULL, NULL);
1334   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1335   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1336
1337   worker = g_new0 (GDBusWorker, 1);
1338   worker->ref_count = 1;
1339
1340   worker->read_lock = g_mutex_new ();
1341   worker->message_received_callback = message_received_callback;
1342   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1343   worker->disconnected_callback = disconnected_callback;
1344   worker->user_data = user_data;
1345   worker->stream = g_object_ref (stream);
1346   worker->capabilities = capabilities;
1347   worker->cancellable = g_cancellable_new ();
1348
1349   worker->frozen = initially_frozen;
1350   worker->received_messages_while_frozen = g_queue_new ();
1351
1352   worker->write_lock = g_mutex_new ();
1353   worker->write_queue = g_queue_new ();
1354
1355   if (G_IS_SOCKET_CONNECTION (worker->stream))
1356     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1357
1358   _g_dbus_shared_thread_ref (_g_dbus_worker_thread_begin_func, worker);
1359
1360   return worker;
1361 }
1362
1363 /* ---------------------------------------------------------------------------------------------------- */
1364
1365 /* This can be called from any thread - frees worker - guarantees no callbacks
1366  * will ever be issued again
1367  */
1368 void
1369 _g_dbus_worker_stop (GDBusWorker *worker)
1370 {
1371   /* If we're called in the worker thread it means we are called from
1372    * a worker callback. This is fine, we just can't lock in that case since
1373    * we're already holding the lock...
1374    */
1375   if (g_thread_self () != worker->thread)
1376     g_mutex_lock (worker->read_lock);
1377   worker->stopped = TRUE;
1378   if (g_thread_self () != worker->thread)
1379     g_mutex_unlock (worker->read_lock);
1380
1381   g_cancellable_cancel (worker->cancellable);
1382   _g_dbus_worker_unref (worker);
1383 }
1384
1385 /* ---------------------------------------------------------------------------------------------------- */
1386
1387 /* can be called from any thread (except the worker thread) - blocks
1388  * calling thread until all queued outgoing messages are written and
1389  * the transport has been flushed
1390  */
1391 gboolean
1392 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1393                            GCancellable   *cancellable,
1394                            GError        **error)
1395 {
1396   gboolean ret;
1397   FlushData *data;
1398
1399   data = NULL;
1400   ret = TRUE;
1401
1402   /* if the queue is empty, there's nothing to wait for */
1403   g_mutex_lock (worker->write_lock);
1404   if (g_queue_get_length (worker->write_queue) > 0)
1405     {
1406       data = g_new0 (FlushData, 1);
1407       data->mutex = g_mutex_new ();
1408       data->cond = g_cond_new ();
1409       data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
1410       g_mutex_lock (data->mutex);
1411       worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
1412     }
1413   g_mutex_unlock (worker->write_lock);
1414
1415   if (data != NULL)
1416     {
1417       g_cond_wait (data->cond, data->mutex);
1418       g_mutex_unlock (data->mutex);
1419
1420       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1421       g_cond_free (data->cond);
1422       g_mutex_free (data->mutex);
1423       if (data->error != NULL)
1424         {
1425           ret = FALSE;
1426           g_propagate_error (error, data->error);
1427         }
1428       g_free (data);
1429     }
1430
1431   return ret;
1432 }
1433
1434 /* ---------------------------------------------------------------------------------------------------- */
1435
1436 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1437 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1438 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1439 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1440 #define G_DBUS_DEBUG_CALL           (1<<4)
1441 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1442 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1443 #define G_DBUS_DEBUG_RETURN         (1<<7)
1444 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1445 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1446
1447 static gint _gdbus_debug_flags = 0;
1448
1449 gboolean
1450 _g_dbus_debug_authentication (void)
1451 {
1452   _g_dbus_initialize ();
1453   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1454 }
1455
1456 gboolean
1457 _g_dbus_debug_transport (void)
1458 {
1459   _g_dbus_initialize ();
1460   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1461 }
1462
1463 gboolean
1464 _g_dbus_debug_message (void)
1465 {
1466   _g_dbus_initialize ();
1467   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1468 }
1469
1470 gboolean
1471 _g_dbus_debug_payload (void)
1472 {
1473   _g_dbus_initialize ();
1474   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1475 }
1476
1477 gboolean
1478 _g_dbus_debug_call (void)
1479 {
1480   _g_dbus_initialize ();
1481   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1482 }
1483
1484 gboolean
1485 _g_dbus_debug_signal (void)
1486 {
1487   _g_dbus_initialize ();
1488   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1489 }
1490
1491 gboolean
1492 _g_dbus_debug_incoming (void)
1493 {
1494   _g_dbus_initialize ();
1495   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1496 }
1497
1498 gboolean
1499 _g_dbus_debug_return (void)
1500 {
1501   _g_dbus_initialize ();
1502   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1503 }
1504
1505 gboolean
1506 _g_dbus_debug_emission (void)
1507 {
1508   _g_dbus_initialize ();
1509   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1510 }
1511
1512 gboolean
1513 _g_dbus_debug_address (void)
1514 {
1515   _g_dbus_initialize ();
1516   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1517 }
1518
1519 G_LOCK_DEFINE_STATIC (print_lock);
1520
1521 void
1522 _g_dbus_debug_print_lock (void)
1523 {
1524   G_LOCK (print_lock);
1525 }
1526
1527 void
1528 _g_dbus_debug_print_unlock (void)
1529 {
1530   G_UNLOCK (print_lock);
1531 }
1532
1533 /*
1534  * _g_dbus_initialize:
1535  *
1536  * Does various one-time init things such as
1537  *
1538  *  - registering the G_DBUS_ERROR error domain
1539  *  - parses the G_DBUS_DEBUG environment variable
1540  */
1541 void
1542 _g_dbus_initialize (void)
1543 {
1544   static volatile gsize initialized = 0;
1545
1546   if (g_once_init_enter (&initialized))
1547     {
1548       volatile GQuark g_dbus_error_domain;
1549       const gchar *debug;
1550
1551       g_dbus_error_domain = G_DBUS_ERROR;
1552
1553       debug = g_getenv ("G_DBUS_DEBUG");
1554       if (debug != NULL)
1555         {
1556           const GDebugKey keys[] = {
1557             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1558             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
1559             { "message",        G_DBUS_DEBUG_MESSAGE        },
1560             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
1561             { "call",           G_DBUS_DEBUG_CALL           },
1562             { "signal",         G_DBUS_DEBUG_SIGNAL         },
1563             { "incoming",       G_DBUS_DEBUG_INCOMING       },
1564             { "return",         G_DBUS_DEBUG_RETURN         },
1565             { "emission",       G_DBUS_DEBUG_EMISSION       },
1566             { "address",        G_DBUS_DEBUG_ADDRESS        }
1567           };
1568
1569           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1570           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1571             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1572         }
1573
1574       g_once_init_leave (&initialized, 1);
1575     }
1576 }
1577
1578 /* ---------------------------------------------------------------------------------------------------- */
1579
1580 GVariantType *
1581 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1582 {
1583   const GVariantType *arg_types[256];
1584   guint n;
1585
1586   if (args)
1587     for (n = 0; args[n] != NULL; n++)
1588       {
1589         /* DBus places a hard limit of 255 on signature length.
1590          * therefore number of args must be less than 256.
1591          */
1592         g_assert (n < 256);
1593
1594         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1595
1596         if G_UNLIKELY (arg_types[n] == NULL)
1597           return NULL;
1598       }
1599   else
1600     n = 0;
1601
1602   return g_variant_type_new_tuple (arg_types, n);
1603 }
1604
1605 /* ---------------------------------------------------------------------------------------------------- */
1606
1607 #ifdef G_OS_WIN32
1608
1609 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1610
1611 gchar *
1612 _g_dbus_win32_get_user_sid (void)
1613 {
1614   HANDLE h;
1615   TOKEN_USER *user;
1616   DWORD token_information_len;
1617   PSID psid;
1618   gchar *sid;
1619   gchar *ret;
1620
1621   ret = NULL;
1622   user = NULL;
1623   h = INVALID_HANDLE_VALUE;
1624
1625   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
1626     {
1627       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
1628       goto out;
1629     }
1630
1631   /* Get length of buffer */
1632   token_information_len = 0;
1633   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
1634     {
1635       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
1636         {
1637           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1638           goto out;
1639         }
1640     }
1641   user = g_malloc (token_information_len);
1642   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
1643     {
1644       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1645       goto out;
1646     }
1647
1648   psid = user->User.Sid;
1649   if (!IsValidSid (psid))
1650     {
1651       g_warning ("Invalid SID");
1652       goto out;
1653     }
1654
1655   if (!ConvertSidToStringSidA (psid, &sid))
1656     {
1657       g_warning ("Invalid SID");
1658       goto out;
1659     }
1660
1661   ret = g_strdup (sid);
1662   LocalFree (sid);
1663
1664 out:
1665   g_free (user);
1666   if (h != INVALID_HANDLE_VALUE)
1667     CloseHandle (h);
1668   return ret;
1669 }
1670 #endif
1671
1672 /* ---------------------------------------------------------------------------------------------------- */
1673
1674 gchar *
1675 _g_dbus_get_machine_id (GError **error)
1676 {
1677   gchar *ret;
1678   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
1679   ret = NULL;
1680   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
1681                             &ret,
1682                             NULL,
1683                             error))
1684     {
1685       g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: "));
1686     }
1687   else
1688     {
1689       /* TODO: validate value */
1690       g_strstrip (ret);
1691     }
1692   return ret;
1693 }
1694
1695 /* ---------------------------------------------------------------------------------------------------- */
1696
1697 gchar *
1698 _g_dbus_enum_to_string (GType enum_type, gint value)
1699 {
1700   gchar *ret;
1701   GEnumClass *klass;
1702   GEnumValue *enum_value;
1703
1704   klass = g_type_class_ref (enum_type);
1705   enum_value = g_enum_get_value (klass, value);
1706   if (enum_value != NULL)
1707     ret = g_strdup (enum_value->value_nick);
1708   else
1709     ret = g_strdup_printf ("unknown (value %d)", value);
1710   g_type_class_unref (klass);
1711   return ret;
1712 }
1713
1714 /* ---------------------------------------------------------------------------------------------------- */
1715
1716 static void
1717 write_message_print_transport_debug (gssize bytes_written,
1718                                      MessageToWriteData *data)
1719 {
1720   if (G_LIKELY (!_g_dbus_debug_transport ()))
1721     goto out;
1722
1723   _g_dbus_debug_print_lock ();
1724   g_print ("========================================================================\n"
1725            "GDBus-debug:Transport:\n"
1726            "  >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
1727            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
1728            bytes_written,
1729            g_dbus_message_get_serial (data->message),
1730            data->blob_size,
1731            data->total_written,
1732            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1733   _g_dbus_debug_print_unlock ();
1734  out:
1735   ;
1736 }
1737
1738 /* ---------------------------------------------------------------------------------------------------- */
1739
1740 static void
1741 read_message_print_transport_debug (gssize bytes_read,
1742                                     GDBusWorker *worker)
1743 {
1744   gsize size;
1745   gint32 serial;
1746   gint32 message_length;
1747
1748   if (G_LIKELY (!_g_dbus_debug_transport ()))
1749     goto out;
1750
1751   size = bytes_read + worker->read_buffer_cur_size;
1752   serial = 0;
1753   message_length = 0;
1754   if (size >= 16)
1755     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
1756   if (size >= 1)
1757     {
1758       switch (worker->read_buffer[0])
1759         {
1760         case 'l':
1761           if (size >= 12)
1762             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
1763           break;
1764         case 'B':
1765           if (size >= 12)
1766             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
1767           break;
1768         default:
1769           /* an error will be set elsewhere if this happens */
1770           goto out;
1771         }
1772     }
1773
1774     _g_dbus_debug_print_lock ();
1775   g_print ("========================================================================\n"
1776            "GDBus-debug:Transport:\n"
1777            "  <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
1778            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
1779            bytes_read,
1780            serial,
1781            message_length,
1782            worker->read_buffer_cur_size,
1783            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
1784   _g_dbus_debug_print_unlock ();
1785  out:
1786   ;
1787 }