GDBus: Complain via g_warning() if an invalid message is received
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: David Zeuthen <davidz@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <stdlib.h>
26 #include <string.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30
31 #include "giotypes.h"
32 #include "gsocket.h"
33 #include "gdbusprivate.h"
34 #include "gdbusmessage.h"
35 #include "gdbuserror.h"
36 #include "gdbusintrospection.h"
37 #include "gasyncresult.h"
38 #include "gsimpleasyncresult.h"
39 #include "ginputstream.h"
40 #include "giostream.h"
41 #include "gsocketcontrolmessage.h"
42 #include "gsocketconnection.h"
43
44 #ifdef G_OS_UNIX
45 #include "gunixfdmessage.h"
46 #include "gunixconnection.h"
47 #include "gunixcredentialsmessage.h"
48 #endif
49
50 #ifdef G_OS_WIN32
51 #include <windows.h>
52 #endif
53
54 #include "glibintl.h"
55 #include "gioalias.h"
56
57 /* ---------------------------------------------------------------------------------------------------- */
58
59 static gchar *
60 hexdump (const gchar *data, gsize len, guint indent)
61 {
62  guint n, m;
63  GString *ret;
64
65  ret = g_string_new (NULL);
66
67  for (n = 0; n < len; n += 16)
68    {
69      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
70
71      for (m = n; m < n + 16; m++)
72        {
73          if (m > n && (m%4) == 0)
74            g_string_append_c (ret, ' ');
75          if (m < len)
76            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
77          else
78            g_string_append (ret, "   ");
79        }
80
81      g_string_append (ret, "   ");
82
83      for (m = n; m < len && m < n + 16; m++)
84        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
85
86      g_string_append_c (ret, '\n');
87    }
88
89  return g_string_free (ret, FALSE);
90 }
91
92 /* ---------------------------------------------------------------------------------------------------- */
93
94 /* Unfortunately ancillary messages are discarded when reading from a
95  * socket using the GSocketInputStream abstraction. So we provide a
96  * very GInputStream-ish API that uses GSocket in this case (very
97  * similar to GSocketInputStream).
98  */
99
100 typedef struct
101 {
102   GSocket *socket;
103   GCancellable *cancellable;
104
105   void *buffer;
106   gsize count;
107
108   GSocketControlMessage ***messages;
109   gint *num_messages;
110
111   GSimpleAsyncResult *simple;
112
113   gboolean from_mainloop;
114 } ReadWithControlData;
115
116 static void
117 read_with_control_data_free (ReadWithControlData *data)
118 {
119   g_object_unref (data->socket);
120   if (data->cancellable != NULL)
121     g_object_unref (data->cancellable);
122   g_free (data);
123 }
124
125 static gboolean
126 _g_socket_read_with_control_messages_ready (GSocket      *socket,
127                                             GIOCondition  condition,
128                                             gpointer      user_data)
129 {
130   ReadWithControlData *data = user_data;
131   GError *error;
132   gssize result;
133   GInputVector vector;
134
135   error = NULL;
136   vector.buffer = data->buffer;
137   vector.size = data->count;
138   result = g_socket_receive_message (data->socket,
139                                      NULL, /* address */
140                                      &vector,
141                                      1,
142                                      data->messages,
143                                      data->num_messages,
144                                      NULL,
145                                      data->cancellable,
146                                      &error);
147   if (result >= 0)
148     {
149       g_simple_async_result_set_op_res_gssize (data->simple, result);
150     }
151   else
152     {
153       g_assert (error != NULL);
154       g_simple_async_result_set_from_error (data->simple, error);
155       g_error_free (error);
156     }
157
158   if (data->from_mainloop)
159     g_simple_async_result_complete (data->simple);
160   else
161     g_simple_async_result_complete_in_idle (data->simple);
162
163   return FALSE;
164 }
165
166 static void
167 _g_socket_read_with_control_messages (GSocket                 *socket,
168                                       void                    *buffer,
169                                       gsize                    count,
170                                       GSocketControlMessage ***messages,
171                                       gint                    *num_messages,
172                                       gint                     io_priority,
173                                       GCancellable            *cancellable,
174                                       GAsyncReadyCallback      callback,
175                                       gpointer                 user_data)
176 {
177   ReadWithControlData *data;
178
179   data = g_new0 (ReadWithControlData, 1);
180   data->socket = g_object_ref (socket);
181   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
182   data->buffer = buffer;
183   data->count = count;
184   data->messages = messages;
185   data->num_messages = num_messages;
186
187   data->simple = g_simple_async_result_new (G_OBJECT (socket),
188                                             callback,
189                                             user_data,
190                                             _g_socket_read_with_control_messages);
191
192   if (!g_socket_condition_check (socket, G_IO_IN))
193     {
194       GSource *source;
195       data->from_mainloop = TRUE;
196       source = g_socket_create_source (data->socket,
197                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
198                                        cancellable);
199       g_source_set_callback (source,
200                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
201                              data,
202                              (GDestroyNotify) read_with_control_data_free);
203       g_source_attach (source, g_main_context_get_thread_default ());
204       g_source_unref (source);
205     }
206   else
207     {
208       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
209       read_with_control_data_free (data);
210     }
211 }
212
213 static gssize
214 _g_socket_read_with_control_messages_finish (GSocket       *socket,
215                                              GAsyncResult  *result,
216                                              GError       **error)
217 {
218   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
219
220   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
221   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
222
223   if (g_simple_async_result_propagate_error (simple, error))
224       return -1;
225   else
226     return g_simple_async_result_get_op_res_gssize (simple);
227 }
228
229 /* ---------------------------------------------------------------------------------------------------- */
230
231 G_LOCK_DEFINE_STATIC (shared_thread_lock);
232
233 typedef struct
234 {
235   gint num_users;
236   GThread *thread;
237   GMainContext *context;
238   GMainLoop *loop;
239 } SharedThreadData;
240
241 static SharedThreadData *shared_thread_data = NULL;
242
243 static gpointer
244 shared_thread_func (gpointer data)
245 {
246   g_main_context_push_thread_default (shared_thread_data->context);
247   g_main_loop_run (shared_thread_data->loop);
248   g_main_context_pop_thread_default (shared_thread_data->context);
249   return NULL;
250 }
251
252 typedef void (*GDBusSharedThreadFunc) (gpointer user_data);
253
254 typedef struct
255 {
256   GDBusSharedThreadFunc func;
257   gpointer              user_data;
258   gboolean              done;
259 } CallerData;
260
261 static gboolean
262 invoke_caller (gpointer user_data)
263 {
264   CallerData *data = user_data;
265   data->func (data->user_data);
266   data->done = TRUE;
267   return FALSE;
268 }
269
270 static void
271 _g_dbus_shared_thread_ref (GDBusSharedThreadFunc func,
272                            gpointer              user_data)
273 {
274   GError *error;
275   GSource *idle_source;
276   CallerData *data;
277
278   G_LOCK (shared_thread_lock);
279
280   if (shared_thread_data != NULL)
281     {
282       shared_thread_data->num_users += 1;
283       goto have_thread;
284     }
285
286   shared_thread_data = g_new0 (SharedThreadData, 1);
287   shared_thread_data->num_users = 1;
288
289   error = NULL;
290   shared_thread_data->context = g_main_context_new ();
291   shared_thread_data->loop = g_main_loop_new (shared_thread_data->context, FALSE);
292   shared_thread_data->thread = g_thread_create (shared_thread_func,
293                                                 NULL,
294                                                 TRUE,
295                                                 &error);
296   g_assert_no_error (error);
297
298  have_thread:
299
300   data = g_new0 (CallerData, 1);
301   data->func = func;
302   data->user_data = user_data;
303   data->done = FALSE;
304
305   idle_source = g_idle_source_new ();
306   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
307   g_source_set_callback (idle_source,
308                          invoke_caller,
309                          data,
310                          NULL);
311   g_source_attach (idle_source, shared_thread_data->context);
312   g_source_unref (idle_source);
313
314   /* wait for the user code to run.. hmm.. probably use a condition variable instead */
315   while (!data->done)
316     g_thread_yield ();
317
318   g_free (data);
319
320   G_UNLOCK (shared_thread_lock);
321 }
322
323 static void
324 _g_dbus_shared_thread_unref (void)
325 {
326   /* TODO: actually destroy the shared thread here */
327 #if 0
328   G_LOCK (shared_thread_lock);
329   g_assert (shared_thread_data != NULL);
330   shared_thread_data->num_users -= 1;
331   if (shared_thread_data->num_users == 0)
332     {
333       g_main_loop_quit (shared_thread_data->loop);
334       //g_thread_join (shared_thread_data->thread);
335       g_main_loop_unref (shared_thread_data->loop);
336       g_main_context_unref (shared_thread_data->context);
337       g_free (shared_thread_data);
338       shared_thread_data = NULL;
339       G_UNLOCK (shared_thread_lock);
340     }
341   else
342     {
343       G_UNLOCK (shared_thread_lock);
344     }
345 #endif
346 }
347
348 /* ---------------------------------------------------------------------------------------------------- */
349
350 struct GDBusWorker
351 {
352   volatile gint                       ref_count;
353   gboolean                            stopped;
354   GIOStream                          *stream;
355   GDBusCapabilityFlags                capabilities;
356   GCancellable                       *cancellable;
357   GDBusWorkerMessageReceivedCallback  message_received_callback;
358   GDBusWorkerDisconnectedCallback     disconnected_callback;
359   gpointer                            user_data;
360
361   GThread                            *thread;
362
363   /* if not NULL, stream is GSocketConnection */
364   GSocket *socket;
365
366   /* used for reading */
367   GMutex                             *read_lock;
368   gchar                              *read_buffer;
369   gsize                               read_buffer_allocated_size;
370   gsize                               read_buffer_cur_size;
371   gsize                               read_buffer_bytes_wanted;
372   GUnixFDList                        *read_fd_list;
373   GSocketControlMessage             **read_ancillary_messages;
374   gint                                read_num_ancillary_messages;
375
376   /* used for writing */
377   GMutex                             *write_lock;
378   GQueue                             *write_queue;
379   gboolean                            write_is_pending;
380 };
381
382 struct _MessageToWriteData ;
383 typedef struct _MessageToWriteData MessageToWriteData;
384
385 static void message_to_write_data_free (MessageToWriteData *data);
386
387 static GDBusWorker *
388 _g_dbus_worker_ref (GDBusWorker *worker)
389 {
390   g_atomic_int_inc (&worker->ref_count);
391   return worker;
392 }
393
394 static void
395 _g_dbus_worker_unref (GDBusWorker *worker)
396 {
397   if (g_atomic_int_dec_and_test (&worker->ref_count))
398     {
399       _g_dbus_shared_thread_unref ();
400
401       g_object_unref (worker->stream);
402
403       g_mutex_free (worker->read_lock);
404       g_object_unref (worker->cancellable);
405       if (worker->read_fd_list != NULL)
406         g_object_unref (worker->read_fd_list);
407
408       g_mutex_free (worker->write_lock);
409       g_queue_foreach (worker->write_queue,
410                        (GFunc) message_to_write_data_free,
411                        NULL);
412       g_queue_free (worker->write_queue);
413       g_free (worker);
414     }
415 }
416
417 static void
418 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
419                                   gboolean      remote_peer_vanished,
420                                   GError       *error)
421 {
422   if (!worker->stopped)
423     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
424 }
425
426 static void
427 _g_dbus_worker_emit_message (GDBusWorker  *worker,
428                              GDBusMessage *message)
429 {
430   if (!worker->stopped)
431     worker->message_received_callback (worker, message, worker->user_data);
432 }
433
434 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
435
436 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
437 static void
438 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
439                            GAsyncResult  *res,
440                            gpointer       user_data)
441 {
442   GDBusWorker *worker = user_data;
443   GError *error;
444   gssize bytes_read;
445
446   g_mutex_lock (worker->read_lock);
447
448   /* If already stopped, don't even process the reply */
449   if (worker->stopped)
450     goto out;
451
452   error = NULL;
453   if (worker->socket == NULL)
454     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
455                                              res,
456                                              &error);
457   else
458     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
459                                                               res,
460                                                               &error);
461   if (worker->read_num_ancillary_messages > 0)
462     {
463       gint n;
464       for (n = 0; n < worker->read_num_ancillary_messages; n++)
465         {
466           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
467
468           if (FALSE)
469             {
470             }
471 #ifdef G_OS_UNIX
472           else if (G_IS_UNIX_FD_MESSAGE (control_message))
473             {
474               GUnixFDMessage *fd_message;
475               gint *fds;
476               gint num_fds;
477
478               fd_message = G_UNIX_FD_MESSAGE (control_message);
479               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
480               if (worker->read_fd_list == NULL)
481                 {
482                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
483                 }
484               else
485                 {
486                   gint n;
487                   for (n = 0; n < num_fds; n++)
488                     {
489                       /* TODO: really want a append_steal() */
490                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
491                       close (fds[n]);
492                     }
493                 }
494               g_free (fds);
495             }
496           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
497             {
498               /* do nothing */
499             }
500 #endif
501           else
502             {
503               if (error == NULL)
504                 {
505                   g_set_error (&error,
506                                G_IO_ERROR,
507                                G_IO_ERROR_FAILED,
508                                "Unexpected ancillary message of type %s received from peer",
509                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
510                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
511                   g_error_free (error);
512                   g_object_unref (control_message);
513                   n++;
514                   while (n < worker->read_num_ancillary_messages)
515                     g_object_unref (worker->read_ancillary_messages[n++]);
516                   g_free (worker->read_ancillary_messages);
517                   goto out;
518                 }
519             }
520           g_object_unref (control_message);
521         }
522       g_free (worker->read_ancillary_messages);
523     }
524
525   if (bytes_read == -1)
526     {
527       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
528       g_error_free (error);
529       goto out;
530     }
531
532 #if 0
533   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
534            (gint) bytes_read,
535            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
536            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
537            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
538                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
539            worker->stream,
540            worker);
541 #endif
542
543   /* TODO: hmm, hmm... */
544   if (bytes_read == 0)
545     {
546       g_set_error (&error,
547                    G_IO_ERROR,
548                    G_IO_ERROR_FAILED,
549                    "Underlying GIOStream returned 0 bytes on an async read");
550       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
551       g_error_free (error);
552       goto out;
553     }
554
555   worker->read_buffer_cur_size += bytes_read;
556   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
557     {
558       /* OK, got what we asked for! */
559       if (worker->read_buffer_bytes_wanted == 16)
560         {
561           gssize message_len;
562           /* OK, got the header - determine how many more bytes are needed */
563           error = NULL;
564           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
565                                                      16,
566                                                      &error);
567           if (message_len == -1)
568             {
569               g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
570               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
571               g_error_free (error);
572               goto out;
573             }
574
575           worker->read_buffer_bytes_wanted = message_len;
576           _g_dbus_worker_do_read_unlocked (worker);
577         }
578       else
579         {
580           GDBusMessage *message;
581           error = NULL;
582
583           /* TODO: use connection->priv->auth to decode the message */
584
585           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
586                                                   worker->read_buffer_cur_size,
587                                                   worker->capabilities,
588                                                   &error);
589           if (message == NULL)
590             {
591               gchar *s;
592               s = hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
593               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
594                          "The error is: %s\n"
595                          "The payload is as follows:\n"
596                          "%s\n",
597                          worker->read_buffer_cur_size,
598                          error->message,
599                          s);
600               g_free (s);
601               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
602               g_error_free (error);
603               goto out;
604             }
605
606 #ifdef G_OS_UNIX
607           if (worker->read_fd_list != NULL)
608             {
609               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
610               worker->read_fd_list = NULL;
611             }
612 #endif
613
614           if (G_UNLIKELY (_g_dbus_debug_message ()))
615             {
616               gchar *s;
617               g_print ("========================================================================\n"
618                        "GDBus-debug:Message:\n"
619                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
620                        worker->read_buffer_cur_size);
621               s = g_dbus_message_print (message, 2);
622               g_print ("%s", s);
623               g_free (s);
624               s = hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
625               g_print ("%s\n", s);
626               g_free (s);
627             }
628
629           /* yay, got a message, go deliver it */
630           _g_dbus_worker_emit_message (worker, message);
631           g_object_unref (message);
632
633           /* start reading another message! */
634           worker->read_buffer_bytes_wanted = 0;
635           worker->read_buffer_cur_size = 0;
636           _g_dbus_worker_do_read_unlocked (worker);
637         }
638     }
639   else
640     {
641       /* didn't get all the bytes we requested - so repeat the request... */
642       _g_dbus_worker_do_read_unlocked (worker);
643     }
644
645  out:
646   g_mutex_unlock (worker->read_lock);
647
648   /* gives up the reference acquired when calling g_input_stream_read_async() */
649   _g_dbus_worker_unref (worker);
650 }
651
652 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
653 static void
654 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
655 {
656   /* if bytes_wanted is zero, it means start reading a message */
657   if (worker->read_buffer_bytes_wanted == 0)
658     {
659       worker->read_buffer_cur_size = 0;
660       worker->read_buffer_bytes_wanted = 16;
661     }
662
663   /* ensure we have a (big enough) buffer */
664   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
665     {
666       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
667       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
668       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
669     }
670
671   if (worker->socket == NULL)
672     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
673                                worker->read_buffer + worker->read_buffer_cur_size,
674                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
675                                G_PRIORITY_DEFAULT,
676                                worker->cancellable,
677                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
678                                _g_dbus_worker_ref (worker));
679   else
680     {
681       worker->read_ancillary_messages = NULL;
682       worker->read_num_ancillary_messages = 0;
683       _g_socket_read_with_control_messages (worker->socket,
684                                             worker->read_buffer + worker->read_buffer_cur_size,
685                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
686                                             &worker->read_ancillary_messages,
687                                             &worker->read_num_ancillary_messages,
688                                             G_PRIORITY_DEFAULT,
689                                             worker->cancellable,
690                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
691                                             _g_dbus_worker_ref (worker));
692     }
693 }
694
695 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
696 static void
697 _g_dbus_worker_do_read (GDBusWorker *worker)
698 {
699   g_mutex_lock (worker->read_lock);
700   _g_dbus_worker_do_read_unlocked (worker);
701   g_mutex_unlock (worker->read_lock);
702 }
703
704 /* ---------------------------------------------------------------------------------------------------- */
705
706 struct _MessageToWriteData
707 {
708   GDBusMessage *message;
709   gchar        *blob;
710   gsize         blob_size;
711 };
712
713 static void
714 message_to_write_data_free (MessageToWriteData *data)
715 {
716   g_object_unref (data->message);
717   g_free (data->blob);
718   g_free (data);
719 }
720
721 /* ---------------------------------------------------------------------------------------------------- */
722
723 /* called in private thread shared by all GDBusConnection instances (with write-lock held) */
724 static gboolean
725 write_message (GDBusWorker         *worker,
726                MessageToWriteData  *data,
727                GError             **error)
728 {
729   gboolean ret;
730
731   g_return_val_if_fail (data->blob_size > 16, FALSE);
732
733   ret = FALSE;
734
735   /* First, the initial 16 bytes - special case UNIX sockets here
736    * since it may involve writing an ancillary message with file
737    * descriptors
738    */
739 #ifdef G_OS_UNIX
740   {
741     GOutputVector vector;
742     GSocketControlMessage *message;
743     GUnixFDList *fd_list;
744     gssize bytes_written;
745
746     fd_list = g_dbus_message_get_unix_fd_list (data->message);
747
748     message = NULL;
749     if (fd_list != NULL)
750       {
751         if (!G_IS_UNIX_CONNECTION (worker->stream))
752           {
753             g_set_error (error,
754                          G_IO_ERROR,
755                          G_IO_ERROR_INVALID_ARGUMENT,
756                          "Tried sending a file descriptor on unsupported stream of type %s",
757                          g_type_name (G_TYPE_FROM_INSTANCE (worker->stream)));
758             goto out;
759           }
760         else if (!(worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
761           {
762             g_set_error_literal (error,
763                                  G_IO_ERROR,
764                                  G_IO_ERROR_INVALID_ARGUMENT,
765                                  "Tried sending a file descriptor but remote peer does not support this capability");
766             goto out;
767           }
768         message = g_unix_fd_message_new_with_fd_list (fd_list);
769       }
770
771     vector.buffer = data->blob;
772     vector.size = 16;
773
774     bytes_written = g_socket_send_message (worker->socket,
775                                            NULL, /* address */
776                                            &vector,
777                                            1,
778                                            message != NULL ? &message : NULL,
779                                            message != NULL ? 1 : 0,
780                                            G_SOCKET_MSG_NONE,
781                                            worker->cancellable,
782                                            error);
783     if (bytes_written == -1)
784       {
785         g_prefix_error (error, _("Error writing first 16 bytes of message to socket: "));
786         if (message != NULL)
787           g_object_unref (message);
788         goto out;
789       }
790     if (message != NULL)
791       g_object_unref (message);
792
793     if (bytes_written < 16)
794       {
795         /* TODO: I think this needs to be handled ... are we guaranteed that the ancillary
796          * messages are sent?
797          */
798         g_assert_not_reached ();
799       }
800   }
801 #else
802   /* write the first 16 bytes (guaranteed to return an error if everything can't be written) */
803   if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream),
804                                   (const gchar *) data->blob,
805                                   16,
806                                   NULL, /* bytes_written */
807                                   worker->cancellable, /* cancellable */
808                                   error))
809     goto out;
810 #endif
811
812   /* Then write the rest of the message (guaranteed to return an error if everything can't be written) */
813   if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream),
814                                   (const gchar *) data->blob + 16,
815                                   data->blob_size - 16,
816                                   NULL, /* bytes_written */
817                                   worker->cancellable, /* cancellable */
818                                   error))
819     goto out;
820
821   ret = TRUE;
822
823   if (G_UNLIKELY (_g_dbus_debug_message ()))
824     {
825       gchar *s;
826       g_print ("========================================================================\n"
827                "GDBus-debug:Message:\n"
828                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
829                data->blob_size);
830       s = g_dbus_message_print (data->message, 2);
831       g_print ("%s", s);
832       g_free (s);
833       s = hexdump (data->blob, data->blob_size, 2);
834       g_print ("%s\n", s);
835       g_free (s);
836     }
837
838  out:
839   return ret;
840 }
841
842 /* ---------------------------------------------------------------------------------------------------- */
843
844 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
845 static gboolean
846 write_message_in_idle_cb (gpointer user_data)
847 {
848   GDBusWorker *worker = user_data;
849   gboolean more_writes_are_pending;
850   MessageToWriteData *data;
851   GError *error;
852
853   g_mutex_lock (worker->write_lock);
854
855   data = g_queue_pop_head (worker->write_queue);
856   g_assert (data != NULL);
857
858   error = NULL;
859   if (!write_message (worker,
860                       data,
861                       &error))
862     {
863       /* TODO: handle */
864       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
865       g_error_free (error);
866     }
867   message_to_write_data_free (data);
868
869   more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0);
870
871   worker->write_is_pending = more_writes_are_pending;
872   g_mutex_unlock (worker->write_lock);
873
874   return more_writes_are_pending;
875 }
876
877 /* ---------------------------------------------------------------------------------------------------- */
878
879 /* can be called from any thread - steals blob */
880 void
881 _g_dbus_worker_send_message (GDBusWorker    *worker,
882                              GDBusMessage   *message,
883                              gchar          *blob,
884                              gsize           blob_len)
885 {
886   MessageToWriteData *data;
887
888   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
889   g_return_if_fail (blob != NULL);
890   g_return_if_fail (blob_len > 16);
891
892   data = g_new0 (MessageToWriteData, 1);
893   data->message = g_object_ref (message);
894   data->blob = blob; /* steal! */
895   data->blob_size = blob_len;
896
897   g_mutex_lock (worker->write_lock);
898   g_queue_push_tail (worker->write_queue, data);
899   if (!worker->write_is_pending)
900     {
901       GSource *idle_source;
902
903       worker->write_is_pending = TRUE;
904
905       idle_source = g_idle_source_new ();
906       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
907       g_source_set_callback (idle_source,
908                              write_message_in_idle_cb,
909                              _g_dbus_worker_ref (worker),
910                              (GDestroyNotify) _g_dbus_worker_unref);
911       g_source_attach (idle_source, shared_thread_data->context);
912       g_source_unref (idle_source);
913     }
914   g_mutex_unlock (worker->write_lock);
915 }
916
917 /* ---------------------------------------------------------------------------------------------------- */
918
919 static void
920 _g_dbus_worker_thread_begin_func (gpointer user_data)
921 {
922   GDBusWorker *worker = user_data;
923
924   worker->thread = g_thread_self ();
925
926   /* begin reading */
927   _g_dbus_worker_do_read (worker);
928 }
929
930 GDBusWorker *
931 _g_dbus_worker_new (GIOStream                          *stream,
932                     GDBusCapabilityFlags                capabilities,
933                     GDBusWorkerMessageReceivedCallback  message_received_callback,
934                     GDBusWorkerDisconnectedCallback     disconnected_callback,
935                     gpointer                            user_data)
936 {
937   GDBusWorker *worker;
938
939   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
940   g_return_val_if_fail (message_received_callback != NULL, NULL);
941   g_return_val_if_fail (disconnected_callback != NULL, NULL);
942
943   worker = g_new0 (GDBusWorker, 1);
944   worker->ref_count = 1;
945
946   worker->read_lock = g_mutex_new ();
947   worker->message_received_callback = message_received_callback;
948   worker->disconnected_callback = disconnected_callback;
949   worker->user_data = user_data;
950   worker->stream = g_object_ref (stream);
951   worker->capabilities = capabilities;
952   worker->cancellable = g_cancellable_new ();
953
954   worker->write_lock = g_mutex_new ();
955   worker->write_queue = g_queue_new ();
956
957   if (G_IS_SOCKET_CONNECTION (worker->stream))
958     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
959
960   _g_dbus_shared_thread_ref (_g_dbus_worker_thread_begin_func, worker);
961
962   return worker;
963 }
964
965 /* This can be called from any thread - frees worker - guarantees no callbacks
966  * will ever be issued again
967  */
968 void
969 _g_dbus_worker_stop (GDBusWorker *worker)
970 {
971   /* If we're called in the worker thread it means we are called from
972    * a worker callback. This is fine, we just can't lock in that case since
973    * we're already holding the lock...
974    */
975   if (g_thread_self () != worker->thread)
976     g_mutex_lock (worker->read_lock);
977   worker->stopped = TRUE;
978   if (g_thread_self () != worker->thread)
979     g_mutex_unlock (worker->read_lock);
980
981   g_cancellable_cancel (worker->cancellable);
982   _g_dbus_worker_unref (worker);
983 }
984
985 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
986 #define G_DBUS_DEBUG_MESSAGE        (1<<1)
987 #define G_DBUS_DEBUG_ALL            0xffffffff
988 static gint _gdbus_debug_flags = 0;
989
990 gboolean
991 _g_dbus_debug_authentication (void)
992 {
993   _g_dbus_initialize ();
994   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
995 }
996
997 gboolean
998 _g_dbus_debug_message (void)
999 {
1000   _g_dbus_initialize ();
1001   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1002 }
1003
1004 /*
1005  * _g_dbus_initialize:
1006  *
1007  * Does various one-time init things such as
1008  *
1009  *  - registering the G_DBUS_ERROR error domain
1010  *  - parses the G_DBUS_DEBUG environment variable
1011  */
1012 void
1013 _g_dbus_initialize (void)
1014 {
1015   static volatile gsize initialized = 0;
1016
1017   if (g_once_init_enter (&initialized))
1018     {
1019       volatile GQuark g_dbus_error_domain;
1020       const gchar *debug;
1021
1022       g_dbus_error_domain = G_DBUS_ERROR;
1023
1024       debug = g_getenv ("G_DBUS_DEBUG");
1025       if (debug != NULL)
1026         {
1027           gchar **tokens;
1028           guint n;
1029           tokens = g_strsplit (debug, ",", 0);
1030           for (n = 0; tokens[n] != NULL; n++)
1031             {
1032               if (g_strcmp0 (tokens[n], "authentication") == 0)
1033                 _gdbus_debug_flags |= G_DBUS_DEBUG_AUTHENTICATION;
1034               else if (g_strcmp0 (tokens[n], "message") == 0)
1035                 _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1036               else if (g_strcmp0 (tokens[n], "all") == 0)
1037                 _gdbus_debug_flags |= G_DBUS_DEBUG_ALL;
1038             }
1039           g_strfreev (tokens);
1040         }
1041
1042       g_once_init_leave (&initialized, 1);
1043     }
1044 }
1045
1046 /* ---------------------------------------------------------------------------------------------------- */
1047
1048 GVariantType *
1049 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1050 {
1051   const GVariantType *arg_types[256];
1052   guint n;
1053
1054   if (args)
1055     for (n = 0; args[n] != NULL; n++)
1056       {
1057         /* DBus places a hard limit of 255 on signature length.
1058          * therefore number of args must be less than 256.
1059          */
1060         g_assert (n < 256);
1061
1062         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1063
1064         if G_UNLIKELY (arg_types[n] == NULL)
1065           return NULL;
1066       }
1067   else
1068     n = 0;
1069
1070   return g_variant_type_new_tuple (arg_types, n);
1071 }
1072
1073 /* ---------------------------------------------------------------------------------------------------- */
1074
1075 #ifdef G_OS_WIN32
1076
1077 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1078
1079 gchar *
1080 _g_dbus_win32_get_user_sid (void)
1081 {
1082   HANDLE h;
1083   TOKEN_USER *user;
1084   DWORD token_information_len;
1085   PSID psid;
1086   gchar *sid;
1087   gchar *ret;
1088
1089   ret = NULL;
1090   user = NULL;
1091   h = INVALID_HANDLE_VALUE;
1092
1093   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
1094     {
1095       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
1096       goto out;
1097     }
1098
1099   /* Get length of buffer */
1100   token_information_len = 0;
1101   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
1102     {
1103       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
1104         {
1105           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1106           goto out;
1107         }
1108     }
1109   user = g_malloc (token_information_len);
1110   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
1111     {
1112       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1113       goto out;
1114     }
1115
1116   psid = user->User.Sid;
1117   if (!IsValidSid (psid))
1118     {
1119       g_warning ("Invalid SID");
1120       goto out;
1121     }
1122
1123   if (!ConvertSidToStringSidA (psid, &sid))
1124     {
1125       g_warning ("Invalid SID");
1126       goto out;
1127     }
1128
1129   ret = g_strdup (sid);
1130   LocalFree (sid);
1131
1132 out:
1133   g_free (user);
1134   if (h != INVALID_HANDLE_VALUE)
1135     CloseHandle (h);
1136   return ret;
1137 }
1138 #endif
1139
1140 /* ---------------------------------------------------------------------------------------------------- */
1141
1142 #define __G_DBUS_PRIVATE_C__
1143 #include "gioaliasdef.c"