Bug 621945 – Filter outgoing messages in GDBusConnection
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: David Zeuthen <davidz@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <stdlib.h>
26 #include <string.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30
31 #include "giotypes.h"
32 #include "gsocket.h"
33 #include "gdbusprivate.h"
34 #include "gdbusmessage.h"
35 #include "gdbuserror.h"
36 #include "gdbusintrospection.h"
37 #include "gasyncresult.h"
38 #include "gsimpleasyncresult.h"
39 #include "ginputstream.h"
40 #include "giostream.h"
41 #include "gsocketcontrolmessage.h"
42 #include "gsocketconnection.h"
43
44 #ifdef G_OS_UNIX
45 #include "gunixfdmessage.h"
46 #include "gunixconnection.h"
47 #include "gunixcredentialsmessage.h"
48 #endif
49
50 #ifdef G_OS_WIN32
51 #include <windows.h>
52 #endif
53
54 #include "glibintl.h"
55 #include "gioalias.h"
56
57 /* ---------------------------------------------------------------------------------------------------- */
58
59 gchar *
60 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
61 {
62  guint n, m;
63  GString *ret;
64
65  ret = g_string_new (NULL);
66
67  for (n = 0; n < len; n += 16)
68    {
69      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
70
71      for (m = n; m < n + 16; m++)
72        {
73          if (m > n && (m%4) == 0)
74            g_string_append_c (ret, ' ');
75          if (m < len)
76            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
77          else
78            g_string_append (ret, "   ");
79        }
80
81      g_string_append (ret, "   ");
82
83      for (m = n; m < len && m < n + 16; m++)
84        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
85
86      g_string_append_c (ret, '\n');
87    }
88
89  return g_string_free (ret, FALSE);
90 }
91
92 /* ---------------------------------------------------------------------------------------------------- */
93
94 /* Unfortunately ancillary messages are discarded when reading from a
95  * socket using the GSocketInputStream abstraction. So we provide a
96  * very GInputStream-ish API that uses GSocket in this case (very
97  * similar to GSocketInputStream).
98  */
99
100 typedef struct
101 {
102   GSocket *socket;
103   GCancellable *cancellable;
104
105   void *buffer;
106   gsize count;
107
108   GSocketControlMessage ***messages;
109   gint *num_messages;
110
111   GSimpleAsyncResult *simple;
112
113   gboolean from_mainloop;
114 } ReadWithControlData;
115
116 static void
117 read_with_control_data_free (ReadWithControlData *data)
118 {
119   g_object_unref (data->socket);
120   if (data->cancellable != NULL)
121     g_object_unref (data->cancellable);
122   g_free (data);
123 }
124
125 static gboolean
126 _g_socket_read_with_control_messages_ready (GSocket      *socket,
127                                             GIOCondition  condition,
128                                             gpointer      user_data)
129 {
130   ReadWithControlData *data = user_data;
131   GError *error;
132   gssize result;
133   GInputVector vector;
134
135   error = NULL;
136   vector.buffer = data->buffer;
137   vector.size = data->count;
138   result = g_socket_receive_message (data->socket,
139                                      NULL, /* address */
140                                      &vector,
141                                      1,
142                                      data->messages,
143                                      data->num_messages,
144                                      NULL,
145                                      data->cancellable,
146                                      &error);
147   if (result >= 0)
148     {
149       g_simple_async_result_set_op_res_gssize (data->simple, result);
150     }
151   else
152     {
153       g_assert (error != NULL);
154       g_simple_async_result_set_from_error (data->simple, error);
155       g_error_free (error);
156     }
157
158   if (data->from_mainloop)
159     g_simple_async_result_complete (data->simple);
160   else
161     g_simple_async_result_complete_in_idle (data->simple);
162
163   return FALSE;
164 }
165
166 static void
167 _g_socket_read_with_control_messages (GSocket                 *socket,
168                                       void                    *buffer,
169                                       gsize                    count,
170                                       GSocketControlMessage ***messages,
171                                       gint                    *num_messages,
172                                       gint                     io_priority,
173                                       GCancellable            *cancellable,
174                                       GAsyncReadyCallback      callback,
175                                       gpointer                 user_data)
176 {
177   ReadWithControlData *data;
178
179   data = g_new0 (ReadWithControlData, 1);
180   data->socket = g_object_ref (socket);
181   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
182   data->buffer = buffer;
183   data->count = count;
184   data->messages = messages;
185   data->num_messages = num_messages;
186
187   data->simple = g_simple_async_result_new (G_OBJECT (socket),
188                                             callback,
189                                             user_data,
190                                             _g_socket_read_with_control_messages);
191
192   if (!g_socket_condition_check (socket, G_IO_IN))
193     {
194       GSource *source;
195       data->from_mainloop = TRUE;
196       source = g_socket_create_source (data->socket,
197                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
198                                        cancellable);
199       g_source_set_callback (source,
200                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
201                              data,
202                              (GDestroyNotify) read_with_control_data_free);
203       g_source_attach (source, g_main_context_get_thread_default ());
204       g_source_unref (source);
205     }
206   else
207     {
208       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
209       read_with_control_data_free (data);
210     }
211 }
212
213 static gssize
214 _g_socket_read_with_control_messages_finish (GSocket       *socket,
215                                              GAsyncResult  *result,
216                                              GError       **error)
217 {
218   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
219
220   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
221   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
222
223   if (g_simple_async_result_propagate_error (simple, error))
224       return -1;
225   else
226     return g_simple_async_result_get_op_res_gssize (simple);
227 }
228
229 /* ---------------------------------------------------------------------------------------------------- */
230
231 G_LOCK_DEFINE_STATIC (shared_thread_lock);
232
233 typedef struct
234 {
235   gint num_users;
236   GThread *thread;
237   GMainContext *context;
238   GMainLoop *loop;
239 } SharedThreadData;
240
241 static SharedThreadData *shared_thread_data = NULL;
242
243 static gpointer
244 shared_thread_func (gpointer data)
245 {
246   g_main_context_push_thread_default (shared_thread_data->context);
247   g_main_loop_run (shared_thread_data->loop);
248   g_main_context_pop_thread_default (shared_thread_data->context);
249   return NULL;
250 }
251
252 typedef void (*GDBusSharedThreadFunc) (gpointer user_data);
253
254 typedef struct
255 {
256   GDBusSharedThreadFunc func;
257   gpointer              user_data;
258   gboolean              done;
259 } CallerData;
260
261 static gboolean
262 invoke_caller (gpointer user_data)
263 {
264   CallerData *data = user_data;
265   data->func (data->user_data);
266   data->done = TRUE;
267   return FALSE;
268 }
269
270 static void
271 _g_dbus_shared_thread_ref (GDBusSharedThreadFunc func,
272                            gpointer              user_data)
273 {
274   GError *error;
275   GSource *idle_source;
276   CallerData *data;
277
278   G_LOCK (shared_thread_lock);
279
280   if (shared_thread_data != NULL)
281     {
282       shared_thread_data->num_users += 1;
283       goto have_thread;
284     }
285
286   shared_thread_data = g_new0 (SharedThreadData, 1);
287   shared_thread_data->num_users = 1;
288
289   error = NULL;
290   shared_thread_data->context = g_main_context_new ();
291   shared_thread_data->loop = g_main_loop_new (shared_thread_data->context, FALSE);
292   shared_thread_data->thread = g_thread_create (shared_thread_func,
293                                                 NULL,
294                                                 TRUE,
295                                                 &error);
296   g_assert_no_error (error);
297
298  have_thread:
299
300   data = g_new0 (CallerData, 1);
301   data->func = func;
302   data->user_data = user_data;
303   data->done = FALSE;
304
305   idle_source = g_idle_source_new ();
306   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
307   g_source_set_callback (idle_source,
308                          invoke_caller,
309                          data,
310                          NULL);
311   g_source_attach (idle_source, shared_thread_data->context);
312   g_source_unref (idle_source);
313
314   /* wait for the user code to run.. hmm.. probably use a condition variable instead */
315   while (!data->done)
316     g_thread_yield ();
317
318   g_free (data);
319
320   G_UNLOCK (shared_thread_lock);
321 }
322
323 static void
324 _g_dbus_shared_thread_unref (void)
325 {
326   /* TODO: actually destroy the shared thread here */
327 #if 0
328   G_LOCK (shared_thread_lock);
329   g_assert (shared_thread_data != NULL);
330   shared_thread_data->num_users -= 1;
331   if (shared_thread_data->num_users == 0)
332     {
333       g_main_loop_quit (shared_thread_data->loop);
334       //g_thread_join (shared_thread_data->thread);
335       g_main_loop_unref (shared_thread_data->loop);
336       g_main_context_unref (shared_thread_data->context);
337       g_free (shared_thread_data);
338       shared_thread_data = NULL;
339       G_UNLOCK (shared_thread_lock);
340     }
341   else
342     {
343       G_UNLOCK (shared_thread_lock);
344     }
345 #endif
346 }
347
348 /* ---------------------------------------------------------------------------------------------------- */
349
350 struct GDBusWorker
351 {
352   volatile gint                       ref_count;
353   gboolean                            stopped;
354   GIOStream                          *stream;
355   GDBusCapabilityFlags                capabilities;
356   GCancellable                       *cancellable;
357   GDBusWorkerMessageReceivedCallback  message_received_callback;
358   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
359   GDBusWorkerDisconnectedCallback     disconnected_callback;
360   gpointer                            user_data;
361
362   GThread                            *thread;
363
364   /* if not NULL, stream is GSocketConnection */
365   GSocket *socket;
366
367   /* used for reading */
368   GMutex                             *read_lock;
369   gchar                              *read_buffer;
370   gsize                               read_buffer_allocated_size;
371   gsize                               read_buffer_cur_size;
372   gsize                               read_buffer_bytes_wanted;
373   GUnixFDList                        *read_fd_list;
374   GSocketControlMessage             **read_ancillary_messages;
375   gint                                read_num_ancillary_messages;
376
377   /* used for writing */
378   GMutex                             *write_lock;
379   GQueue                             *write_queue;
380   gboolean                            write_is_pending;
381 };
382
383 struct _MessageToWriteData ;
384 typedef struct _MessageToWriteData MessageToWriteData;
385
386 static void message_to_write_data_free (MessageToWriteData *data);
387
388 static GDBusWorker *
389 _g_dbus_worker_ref (GDBusWorker *worker)
390 {
391   g_atomic_int_inc (&worker->ref_count);
392   return worker;
393 }
394
395 static void
396 _g_dbus_worker_unref (GDBusWorker *worker)
397 {
398   if (g_atomic_int_dec_and_test (&worker->ref_count))
399     {
400       _g_dbus_shared_thread_unref ();
401
402       g_object_unref (worker->stream);
403
404       g_mutex_free (worker->read_lock);
405       g_object_unref (worker->cancellable);
406       if (worker->read_fd_list != NULL)
407         g_object_unref (worker->read_fd_list);
408
409       g_mutex_free (worker->write_lock);
410       g_queue_foreach (worker->write_queue,
411                        (GFunc) message_to_write_data_free,
412                        NULL);
413       g_queue_free (worker->write_queue);
414       g_free (worker);
415     }
416 }
417
418 static void
419 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
420                                   gboolean      remote_peer_vanished,
421                                   GError       *error)
422 {
423   if (!worker->stopped)
424     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
425 }
426
427 static void
428 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
429                                       GDBusMessage *message)
430 {
431   if (!worker->stopped)
432     worker->message_received_callback (worker, message, worker->user_data);
433 }
434
435 static gboolean
436 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
437                                               GDBusMessage *message)
438 {
439   gboolean ret;
440   ret = FALSE;
441   if (!worker->stopped)
442     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
443   return ret;
444 }
445
446 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
447
448 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
449 static void
450 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
451                            GAsyncResult  *res,
452                            gpointer       user_data)
453 {
454   GDBusWorker *worker = user_data;
455   GError *error;
456   gssize bytes_read;
457
458   g_mutex_lock (worker->read_lock);
459
460   /* If already stopped, don't even process the reply */
461   if (worker->stopped)
462     goto out;
463
464   error = NULL;
465   if (worker->socket == NULL)
466     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
467                                              res,
468                                              &error);
469   else
470     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
471                                                               res,
472                                                               &error);
473   if (worker->read_num_ancillary_messages > 0)
474     {
475       gint n;
476       for (n = 0; n < worker->read_num_ancillary_messages; n++)
477         {
478           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
479
480           if (FALSE)
481             {
482             }
483 #ifdef G_OS_UNIX
484           else if (G_IS_UNIX_FD_MESSAGE (control_message))
485             {
486               GUnixFDMessage *fd_message;
487               gint *fds;
488               gint num_fds;
489
490               fd_message = G_UNIX_FD_MESSAGE (control_message);
491               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
492               if (worker->read_fd_list == NULL)
493                 {
494                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
495                 }
496               else
497                 {
498                   gint n;
499                   for (n = 0; n < num_fds; n++)
500                     {
501                       /* TODO: really want a append_steal() */
502                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
503                       close (fds[n]);
504                     }
505                 }
506               g_free (fds);
507             }
508           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
509             {
510               /* do nothing */
511             }
512 #endif
513           else
514             {
515               if (error == NULL)
516                 {
517                   g_set_error (&error,
518                                G_IO_ERROR,
519                                G_IO_ERROR_FAILED,
520                                "Unexpected ancillary message of type %s received from peer",
521                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
522                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
523                   g_error_free (error);
524                   g_object_unref (control_message);
525                   n++;
526                   while (n < worker->read_num_ancillary_messages)
527                     g_object_unref (worker->read_ancillary_messages[n++]);
528                   g_free (worker->read_ancillary_messages);
529                   goto out;
530                 }
531             }
532           g_object_unref (control_message);
533         }
534       g_free (worker->read_ancillary_messages);
535     }
536
537   if (bytes_read == -1)
538     {
539       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
540       g_error_free (error);
541       goto out;
542     }
543
544 #if 0
545   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
546            (gint) bytes_read,
547            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
548            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
549            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
550                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
551            worker->stream,
552            worker);
553 #endif
554
555   /* TODO: hmm, hmm... */
556   if (bytes_read == 0)
557     {
558       g_set_error (&error,
559                    G_IO_ERROR,
560                    G_IO_ERROR_FAILED,
561                    "Underlying GIOStream returned 0 bytes on an async read");
562       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
563       g_error_free (error);
564       goto out;
565     }
566
567   worker->read_buffer_cur_size += bytes_read;
568   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
569     {
570       /* OK, got what we asked for! */
571       if (worker->read_buffer_bytes_wanted == 16)
572         {
573           gssize message_len;
574           /* OK, got the header - determine how many more bytes are needed */
575           error = NULL;
576           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
577                                                      16,
578                                                      &error);
579           if (message_len == -1)
580             {
581               g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
582               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
583               g_error_free (error);
584               goto out;
585             }
586
587           worker->read_buffer_bytes_wanted = message_len;
588           _g_dbus_worker_do_read_unlocked (worker);
589         }
590       else
591         {
592           GDBusMessage *message;
593           error = NULL;
594
595           /* TODO: use connection->priv->auth to decode the message */
596
597           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
598                                                   worker->read_buffer_cur_size,
599                                                   worker->capabilities,
600                                                   &error);
601           if (message == NULL)
602             {
603               gchar *s;
604               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
605               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
606                          "The error is: %s\n"
607                          "The payload is as follows:\n"
608                          "%s\n",
609                          worker->read_buffer_cur_size,
610                          error->message,
611                          s);
612               g_free (s);
613               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
614               g_error_free (error);
615               goto out;
616             }
617
618 #ifdef G_OS_UNIX
619           if (worker->read_fd_list != NULL)
620             {
621               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
622               worker->read_fd_list = NULL;
623             }
624 #endif
625
626           if (G_UNLIKELY (_g_dbus_debug_message ()))
627             {
628               gchar *s;
629               g_print ("========================================================================\n"
630                        "GDBus-debug:Message:\n"
631                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
632                        worker->read_buffer_cur_size);
633               s = g_dbus_message_print (message, 2);
634               g_print ("%s", s);
635               g_free (s);
636               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
637               g_print ("%s\n", s);
638               g_free (s);
639             }
640
641           /* yay, got a message, go deliver it */
642           _g_dbus_worker_emit_message_received (worker, message);
643           g_object_unref (message);
644
645           /* start reading another message! */
646           worker->read_buffer_bytes_wanted = 0;
647           worker->read_buffer_cur_size = 0;
648           _g_dbus_worker_do_read_unlocked (worker);
649         }
650     }
651   else
652     {
653       /* didn't get all the bytes we requested - so repeat the request... */
654       _g_dbus_worker_do_read_unlocked (worker);
655     }
656
657  out:
658   g_mutex_unlock (worker->read_lock);
659
660   /* gives up the reference acquired when calling g_input_stream_read_async() */
661   _g_dbus_worker_unref (worker);
662 }
663
664 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
665 static void
666 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
667 {
668   /* if bytes_wanted is zero, it means start reading a message */
669   if (worker->read_buffer_bytes_wanted == 0)
670     {
671       worker->read_buffer_cur_size = 0;
672       worker->read_buffer_bytes_wanted = 16;
673     }
674
675   /* ensure we have a (big enough) buffer */
676   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
677     {
678       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
679       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
680       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
681     }
682
683   if (worker->socket == NULL)
684     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
685                                worker->read_buffer + worker->read_buffer_cur_size,
686                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
687                                G_PRIORITY_DEFAULT,
688                                worker->cancellable,
689                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
690                                _g_dbus_worker_ref (worker));
691   else
692     {
693       worker->read_ancillary_messages = NULL;
694       worker->read_num_ancillary_messages = 0;
695       _g_socket_read_with_control_messages (worker->socket,
696                                             worker->read_buffer + worker->read_buffer_cur_size,
697                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
698                                             &worker->read_ancillary_messages,
699                                             &worker->read_num_ancillary_messages,
700                                             G_PRIORITY_DEFAULT,
701                                             worker->cancellable,
702                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
703                                             _g_dbus_worker_ref (worker));
704     }
705 }
706
707 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
708 static void
709 _g_dbus_worker_do_read (GDBusWorker *worker)
710 {
711   g_mutex_lock (worker->read_lock);
712   _g_dbus_worker_do_read_unlocked (worker);
713   g_mutex_unlock (worker->read_lock);
714 }
715
716 /* ---------------------------------------------------------------------------------------------------- */
717
718 struct _MessageToWriteData
719 {
720   GDBusMessage *message;
721   gchar        *blob;
722   gsize         blob_size;
723 };
724
725 static void
726 message_to_write_data_free (MessageToWriteData *data)
727 {
728   g_object_unref (data->message);
729   g_free (data->blob);
730   g_free (data);
731 }
732
733 /* ---------------------------------------------------------------------------------------------------- */
734
735 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
736 static gboolean
737 write_message (GDBusWorker         *worker,
738                MessageToWriteData  *data,
739                GError             **error)
740 {
741   gboolean ret;
742
743   g_return_val_if_fail (data->blob_size > 16, FALSE);
744
745   ret = FALSE;
746
747   /* First, the initial 16 bytes - special case UNIX sockets here
748    * since it may involve writing an ancillary message with file
749    * descriptors
750    */
751 #ifdef G_OS_UNIX
752   {
753     GOutputVector vector;
754     GSocketControlMessage *message;
755     GUnixFDList *fd_list;
756     gssize bytes_written;
757
758     fd_list = g_dbus_message_get_unix_fd_list (data->message);
759
760     message = NULL;
761     if (fd_list != NULL)
762       {
763         if (!G_IS_UNIX_CONNECTION (worker->stream))
764           {
765             g_set_error (error,
766                          G_IO_ERROR,
767                          G_IO_ERROR_INVALID_ARGUMENT,
768                          "Tried sending a file descriptor on unsupported stream of type %s",
769                          g_type_name (G_TYPE_FROM_INSTANCE (worker->stream)));
770             goto out;
771           }
772         else if (!(worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
773           {
774             g_set_error_literal (error,
775                                  G_IO_ERROR,
776                                  G_IO_ERROR_INVALID_ARGUMENT,
777                                  "Tried sending a file descriptor but remote peer does not support this capability");
778             goto out;
779           }
780         message = g_unix_fd_message_new_with_fd_list (fd_list);
781       }
782
783     vector.buffer = data->blob;
784     vector.size = 16;
785
786     bytes_written = g_socket_send_message (worker->socket,
787                                            NULL, /* address */
788                                            &vector,
789                                            1,
790                                            message != NULL ? &message : NULL,
791                                            message != NULL ? 1 : 0,
792                                            G_SOCKET_MSG_NONE,
793                                            worker->cancellable,
794                                            error);
795     if (bytes_written == -1)
796       {
797         g_prefix_error (error, _("Error writing first 16 bytes of message to socket: "));
798         if (message != NULL)
799           g_object_unref (message);
800         goto out;
801       }
802     if (message != NULL)
803       g_object_unref (message);
804
805     if (bytes_written < 16)
806       {
807         /* TODO: I think this needs to be handled ... are we guaranteed that the ancillary
808          * messages are sent?
809          */
810         g_assert_not_reached ();
811       }
812   }
813 #else
814   /* write the first 16 bytes (guaranteed to return an error if everything can't be written) */
815   if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream),
816                                   (const gchar *) data->blob,
817                                   16,
818                                   NULL, /* bytes_written */
819                                   worker->cancellable, /* cancellable */
820                                   error))
821     goto out;
822 #endif
823
824   /* Then write the rest of the message (guaranteed to return an error if everything can't be written) */
825   if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream),
826                                   (const gchar *) data->blob + 16,
827                                   data->blob_size - 16,
828                                   NULL, /* bytes_written */
829                                   worker->cancellable, /* cancellable */
830                                   error))
831     goto out;
832
833   ret = TRUE;
834
835   if (G_UNLIKELY (_g_dbus_debug_message ()))
836     {
837       gchar *s;
838       g_print ("========================================================================\n"
839                "GDBus-debug:Message:\n"
840                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
841                data->blob_size);
842       s = g_dbus_message_print (data->message, 2);
843       g_print ("%s", s);
844       g_free (s);
845       s = _g_dbus_hexdump (data->blob, data->blob_size, 2);
846       g_print ("%s\n", s);
847       g_free (s);
848     }
849
850  out:
851   return ret;
852 }
853
854 /* ---------------------------------------------------------------------------------------------------- */
855
856 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
857 static gboolean
858 write_message_in_idle_cb (gpointer user_data)
859 {
860   GDBusWorker *worker = user_data;
861   gboolean more_writes_are_pending;
862   MessageToWriteData *data;
863   gboolean message_was_dropped;
864   GError *error;
865
866   g_mutex_lock (worker->write_lock);
867   data = g_queue_pop_head (worker->write_queue);
868   g_assert (data != NULL);
869   more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0);
870   worker->write_is_pending = more_writes_are_pending;
871   g_mutex_unlock (worker->write_lock);
872
873   /* Note that write_lock is only used for protecting the @write_queue
874    * and @write_is_pending fields of the GDBusWorker struct ... which we
875    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
876    *
877    * Therefore, it's fine to drop it here when calling back into user
878    * code and then writing the message out onto the GIOStream since this
879    * function only runs on the worker thread.
880    */
881   message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
882   if (G_LIKELY (!message_was_dropped))
883     {
884       error = NULL;
885       if (!write_message (worker,
886                           data,
887                           &error))
888         {
889           /* TODO: handle */
890           _g_dbus_worker_emit_disconnected (worker, TRUE, error);
891           g_error_free (error);
892         }
893     }
894   message_to_write_data_free (data);
895
896   return more_writes_are_pending;
897 }
898
899 /* ---------------------------------------------------------------------------------------------------- */
900
901 /* can be called from any thread - steals blob */
902 void
903 _g_dbus_worker_send_message (GDBusWorker    *worker,
904                              GDBusMessage   *message,
905                              gchar          *blob,
906                              gsize           blob_len)
907 {
908   MessageToWriteData *data;
909
910   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
911   g_return_if_fail (blob != NULL);
912   g_return_if_fail (blob_len > 16);
913
914   data = g_new0 (MessageToWriteData, 1);
915   data->message = g_object_ref (message);
916   data->blob = blob; /* steal! */
917   data->blob_size = blob_len;
918
919   g_mutex_lock (worker->write_lock);
920   g_queue_push_tail (worker->write_queue, data);
921   if (!worker->write_is_pending)
922     {
923       GSource *idle_source;
924
925       worker->write_is_pending = TRUE;
926
927       idle_source = g_idle_source_new ();
928       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
929       g_source_set_callback (idle_source,
930                              write_message_in_idle_cb,
931                              _g_dbus_worker_ref (worker),
932                              (GDestroyNotify) _g_dbus_worker_unref);
933       g_source_attach (idle_source, shared_thread_data->context);
934       g_source_unref (idle_source);
935     }
936   g_mutex_unlock (worker->write_lock);
937 }
938
939 /* ---------------------------------------------------------------------------------------------------- */
940
941 static void
942 _g_dbus_worker_thread_begin_func (gpointer user_data)
943 {
944   GDBusWorker *worker = user_data;
945
946   worker->thread = g_thread_self ();
947
948   /* begin reading */
949   _g_dbus_worker_do_read (worker);
950 }
951
952 GDBusWorker *
953 _g_dbus_worker_new (GIOStream                              *stream,
954                     GDBusCapabilityFlags                    capabilities,
955                     GDBusWorkerMessageReceivedCallback      message_received_callback,
956                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
957                     GDBusWorkerDisconnectedCallback         disconnected_callback,
958                     gpointer                                user_data)
959 {
960   GDBusWorker *worker;
961
962   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
963   g_return_val_if_fail (message_received_callback != NULL, NULL);
964   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
965   g_return_val_if_fail (disconnected_callback != NULL, NULL);
966
967   worker = g_new0 (GDBusWorker, 1);
968   worker->ref_count = 1;
969
970   worker->read_lock = g_mutex_new ();
971   worker->message_received_callback = message_received_callback;
972   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
973   worker->disconnected_callback = disconnected_callback;
974   worker->user_data = user_data;
975   worker->stream = g_object_ref (stream);
976   worker->capabilities = capabilities;
977   worker->cancellable = g_cancellable_new ();
978
979   worker->write_lock = g_mutex_new ();
980   worker->write_queue = g_queue_new ();
981
982   if (G_IS_SOCKET_CONNECTION (worker->stream))
983     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
984
985   _g_dbus_shared_thread_ref (_g_dbus_worker_thread_begin_func, worker);
986
987   return worker;
988 }
989
990 /* This can be called from any thread - frees worker - guarantees no callbacks
991  * will ever be issued again
992  */
993 void
994 _g_dbus_worker_stop (GDBusWorker *worker)
995 {
996   /* If we're called in the worker thread it means we are called from
997    * a worker callback. This is fine, we just can't lock in that case since
998    * we're already holding the lock...
999    */
1000   if (g_thread_self () != worker->thread)
1001     g_mutex_lock (worker->read_lock);
1002   worker->stopped = TRUE;
1003   if (g_thread_self () != worker->thread)
1004     g_mutex_unlock (worker->read_lock);
1005
1006   g_cancellable_cancel (worker->cancellable);
1007   _g_dbus_worker_unref (worker);
1008 }
1009
1010 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1011 #define G_DBUS_DEBUG_MESSAGE        (1<<1)
1012 #define G_DBUS_DEBUG_ALL            0xffffffff
1013 static gint _gdbus_debug_flags = 0;
1014
1015 gboolean
1016 _g_dbus_debug_authentication (void)
1017 {
1018   _g_dbus_initialize ();
1019   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1020 }
1021
1022 gboolean
1023 _g_dbus_debug_message (void)
1024 {
1025   _g_dbus_initialize ();
1026   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1027 }
1028
1029 /*
1030  * _g_dbus_initialize:
1031  *
1032  * Does various one-time init things such as
1033  *
1034  *  - registering the G_DBUS_ERROR error domain
1035  *  - parses the G_DBUS_DEBUG environment variable
1036  */
1037 void
1038 _g_dbus_initialize (void)
1039 {
1040   static volatile gsize initialized = 0;
1041
1042   if (g_once_init_enter (&initialized))
1043     {
1044       volatile GQuark g_dbus_error_domain;
1045       const gchar *debug;
1046
1047       g_dbus_error_domain = G_DBUS_ERROR;
1048
1049       debug = g_getenv ("G_DBUS_DEBUG");
1050       if (debug != NULL)
1051         {
1052           gchar **tokens;
1053           guint n;
1054           tokens = g_strsplit (debug, ",", 0);
1055           for (n = 0; tokens[n] != NULL; n++)
1056             {
1057               if (g_strcmp0 (tokens[n], "authentication") == 0)
1058                 _gdbus_debug_flags |= G_DBUS_DEBUG_AUTHENTICATION;
1059               else if (g_strcmp0 (tokens[n], "message") == 0)
1060                 _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1061               else if (g_strcmp0 (tokens[n], "all") == 0)
1062                 _gdbus_debug_flags |= G_DBUS_DEBUG_ALL;
1063             }
1064           g_strfreev (tokens);
1065         }
1066
1067       g_once_init_leave (&initialized, 1);
1068     }
1069 }
1070
1071 /* ---------------------------------------------------------------------------------------------------- */
1072
1073 GVariantType *
1074 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1075 {
1076   const GVariantType *arg_types[256];
1077   guint n;
1078
1079   if (args)
1080     for (n = 0; args[n] != NULL; n++)
1081       {
1082         /* DBus places a hard limit of 255 on signature length.
1083          * therefore number of args must be less than 256.
1084          */
1085         g_assert (n < 256);
1086
1087         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1088
1089         if G_UNLIKELY (arg_types[n] == NULL)
1090           return NULL;
1091       }
1092   else
1093     n = 0;
1094
1095   return g_variant_type_new_tuple (arg_types, n);
1096 }
1097
1098 /* ---------------------------------------------------------------------------------------------------- */
1099
1100 #ifdef G_OS_WIN32
1101
1102 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1103
1104 gchar *
1105 _g_dbus_win32_get_user_sid (void)
1106 {
1107   HANDLE h;
1108   TOKEN_USER *user;
1109   DWORD token_information_len;
1110   PSID psid;
1111   gchar *sid;
1112   gchar *ret;
1113
1114   ret = NULL;
1115   user = NULL;
1116   h = INVALID_HANDLE_VALUE;
1117
1118   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
1119     {
1120       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
1121       goto out;
1122     }
1123
1124   /* Get length of buffer */
1125   token_information_len = 0;
1126   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
1127     {
1128       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
1129         {
1130           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1131           goto out;
1132         }
1133     }
1134   user = g_malloc (token_information_len);
1135   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
1136     {
1137       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1138       goto out;
1139     }
1140
1141   psid = user->User.Sid;
1142   if (!IsValidSid (psid))
1143     {
1144       g_warning ("Invalid SID");
1145       goto out;
1146     }
1147
1148   if (!ConvertSidToStringSidA (psid, &sid))
1149     {
1150       g_warning ("Invalid SID");
1151       goto out;
1152     }
1153
1154   ret = g_strdup (sid);
1155   LocalFree (sid);
1156
1157 out:
1158   g_free (user);
1159   if (h != INVALID_HANDLE_VALUE)
1160     CloseHandle (h);
1161   return ret;
1162 }
1163 #endif
1164
1165 /* ---------------------------------------------------------------------------------------------------- */
1166
1167 #define __G_DBUS_PRIVATE_C__
1168 #include "gioaliasdef.c"