Cleanups
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: David Zeuthen <davidz@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <stdlib.h>
26 #include <string.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30
31 #include "giotypes.h"
32 #include "gsocket.h"
33 #include "gdbusprivate.h"
34 #include "gdbusmessage.h"
35 #include "gdbuserror.h"
36 #include "gdbusintrospection.h"
37 #include "gasyncresult.h"
38 #include "gsimpleasyncresult.h"
39 #include "ginputstream.h"
40 #include "giostream.h"
41 #include "gsocketcontrolmessage.h"
42
43 #ifdef G_OS_UNIX
44 #include "gunixfdmessage.h"
45 #include "gunixconnection.h"
46 #include "gunixcredentialsmessage.h"
47 #endif
48
49 #include "glibintl.h"
50 #include "gioalias.h"
51
52 /* ---------------------------------------------------------------------------------------------------- */
53
54 static gchar *
55 hexdump (const gchar *data, gsize len, guint indent)
56 {
57  guint n, m;
58  GString *ret;
59
60  ret = g_string_new (NULL);
61
62  for (n = 0; n < len; n += 16)
63    {
64      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
65
66      for (m = n; m < n + 16; m++)
67        {
68          if (m > n && (m%4) == 0)
69            g_string_append_c (ret, ' ');
70          if (m < len)
71            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
72          else
73            g_string_append (ret, "   ");
74        }
75
76      g_string_append (ret, "   ");
77
78      for (m = n; m < len && m < n + 16; m++)
79        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
80
81      g_string_append_c (ret, '\n');
82    }
83
84  return g_string_free (ret, FALSE);
85 }
86
87 /* ---------------------------------------------------------------------------------------------------- */
88
89 /* Unfortunately ancillary messages are discarded when reading from a
90  * socket using the GSocketInputStream abstraction. So we provide a
91  * very GInputStream-ish API that uses GSocket in this case (very
92  * similar to GSocketInputStream).
93  */
94
95 typedef struct
96 {
97   GSocket *socket;
98   GCancellable *cancellable;
99
100   void *buffer;
101   gsize count;
102
103   GSocketControlMessage ***messages;
104   gint *num_messages;
105
106   GSimpleAsyncResult *simple;
107
108   gboolean from_mainloop;
109 } ReadWithControlData;
110
111 static void
112 read_with_control_data_free (ReadWithControlData *data)
113 {
114   g_object_unref (data->socket);
115   if (data->cancellable != NULL)
116     g_object_unref (data->cancellable);
117   g_free (data);
118 }
119
120 static gboolean
121 _g_socket_read_with_control_messages_ready (GSocket      *socket,
122                                             GIOCondition  condition,
123                                             gpointer      user_data)
124 {
125   ReadWithControlData *data = user_data;
126   GError *error;
127   gssize result;
128   GInputVector vector;
129
130   error = NULL;
131   vector.buffer = data->buffer;
132   vector.size = data->count;
133   result = g_socket_receive_message (data->socket,
134                                      NULL, /* address */
135                                      &vector,
136                                      1,
137                                      data->messages,
138                                      data->num_messages,
139                                      NULL,
140                                      data->cancellable,
141                                      &error);
142   if (result >= 0)
143     {
144       g_simple_async_result_set_op_res_gssize (data->simple, result);
145     }
146   else
147     {
148       g_assert (error != NULL);
149       g_simple_async_result_set_from_error (data->simple, error);
150       g_error_free (error);
151     }
152
153   if (data->from_mainloop)
154     g_simple_async_result_complete (data->simple);
155   else
156     g_simple_async_result_complete_in_idle (data->simple);
157
158   return FALSE;
159 }
160
161 static void
162 _g_socket_read_with_control_messages (GSocket                 *socket,
163                                       void                    *buffer,
164                                       gsize                    count,
165                                       GSocketControlMessage ***messages,
166                                       gint                    *num_messages,
167                                       gint                     io_priority,
168                                       GCancellable            *cancellable,
169                                       GAsyncReadyCallback      callback,
170                                       gpointer                 user_data)
171 {
172   ReadWithControlData *data;
173
174   data = g_new0 (ReadWithControlData, 1);
175   data->socket = g_object_ref (socket);
176   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
177   data->buffer = buffer;
178   data->count = count;
179   data->messages = messages;
180   data->num_messages = num_messages;
181
182   data->simple = g_simple_async_result_new (G_OBJECT (socket),
183                                             callback,
184                                             user_data,
185                                             _g_socket_read_with_control_messages);
186
187   if (!g_socket_condition_check (socket, G_IO_IN))
188     {
189       GSource *source;
190       data->from_mainloop = TRUE;
191       source = g_socket_create_source (data->socket,
192                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
193                                        cancellable);
194       g_source_set_callback (source,
195                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
196                              data,
197                              (GDestroyNotify) read_with_control_data_free);
198       g_source_attach (source, g_main_context_get_thread_default ());
199       g_source_unref (source);
200     }
201   else
202     {
203       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
204       read_with_control_data_free (data);
205     }
206 }
207
208 static gssize
209 _g_socket_read_with_control_messages_finish (GSocket       *socket,
210                                              GAsyncResult  *result,
211                                              GError       **error)
212 {
213   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
214
215   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
216   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
217
218   if (g_simple_async_result_propagate_error (simple, error))
219       return -1;
220   else
221     return g_simple_async_result_get_op_res_gssize (simple);
222 }
223
224 /* ---------------------------------------------------------------------------------------------------- */
225
226 G_LOCK_DEFINE_STATIC (shared_thread_lock);
227
228 typedef struct
229 {
230   gint num_users;
231   GThread *thread;
232   GMainContext *context;
233   GMainLoop *loop;
234 } SharedThreadData;
235
236 static SharedThreadData *shared_thread_data = NULL;
237
238 static gpointer
239 shared_thread_func (gpointer data)
240 {
241   g_main_context_push_thread_default (shared_thread_data->context);
242   g_main_loop_run (shared_thread_data->loop);
243   g_main_context_pop_thread_default (shared_thread_data->context);
244   return NULL;
245 }
246
247 typedef void (*GDBusSharedThreadFunc) (gpointer user_data);
248
249 typedef struct
250 {
251   GDBusSharedThreadFunc func;
252   gpointer              user_data;
253   gboolean              done;
254 } CallerData;
255
256 static gboolean
257 invoke_caller (gpointer user_data)
258 {
259   CallerData *data = user_data;
260   data->func (data->user_data);
261   data->done = TRUE;
262   return FALSE;
263 }
264
265 static void
266 _g_dbus_shared_thread_ref (GDBusSharedThreadFunc func,
267                            gpointer              user_data)
268 {
269   GError *error;
270   GSource *idle_source;
271   CallerData *data;
272
273   G_LOCK (shared_thread_lock);
274
275   if (shared_thread_data != NULL)
276     {
277       shared_thread_data->num_users += 1;
278       goto have_thread;
279     }
280
281   shared_thread_data = g_new0 (SharedThreadData, 1);
282   shared_thread_data->num_users = 1;
283
284   error = NULL;
285   shared_thread_data->context = g_main_context_new ();
286   shared_thread_data->loop = g_main_loop_new (shared_thread_data->context, FALSE);
287   shared_thread_data->thread = g_thread_create (shared_thread_func,
288                                                 NULL,
289                                                 TRUE,
290                                                 &error);
291   g_assert_no_error (error);
292
293  have_thread:
294
295   data = g_new0 (CallerData, 1);
296   data->func = func;
297   data->user_data = user_data;
298   data->done = FALSE;
299
300   idle_source = g_idle_source_new ();
301   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
302   g_source_set_callback (idle_source,
303                          invoke_caller,
304                          data,
305                          NULL);
306   g_source_attach (idle_source, shared_thread_data->context);
307   g_source_unref (idle_source);
308
309   /* wait for the user code to run.. hmm.. probably use a condition variable instead */
310   while (!data->done)
311     g_thread_yield ();
312
313   g_free (data);
314
315   G_UNLOCK (shared_thread_lock);
316 }
317
318 static void
319 _g_dbus_shared_thread_unref (void)
320 {
321   /* TODO: actually destroy the shared thread here */
322 #if 0
323   G_LOCK (shared_thread_lock);
324   g_assert (shared_thread_data != NULL);
325   shared_thread_data->num_users -= 1;
326   if (shared_thread_data->num_users == 0)
327     {
328       g_main_loop_quit (shared_thread_data->loop);
329       //g_thread_join (shared_thread_data->thread);
330       g_main_loop_unref (shared_thread_data->loop);
331       g_main_context_unref (shared_thread_data->context);
332       g_free (shared_thread_data);
333       shared_thread_data = NULL;
334       G_UNLOCK (shared_thread_lock);
335     }
336   else
337     {
338       G_UNLOCK (shared_thread_lock);
339     }
340 #endif
341 }
342
343 /* ---------------------------------------------------------------------------------------------------- */
344
345 struct GDBusWorker
346 {
347   volatile gint                       ref_count;
348   gboolean                            stopped;
349   GIOStream                          *stream;
350   GDBusCapabilityFlags                capabilities;
351   GCancellable                       *cancellable;
352   GDBusWorkerMessageReceivedCallback  message_received_callback;
353   GDBusWorkerDisconnectedCallback     disconnected_callback;
354   gpointer                            user_data;
355
356   GThread                            *thread;
357
358   /* if not NULL, stream is GSocketConnection */
359   GSocket *socket;
360
361   /* used for reading */
362   GMutex                             *read_lock;
363   gchar                              *read_buffer;
364   gsize                               read_buffer_allocated_size;
365   gsize                               read_buffer_cur_size;
366   gsize                               read_buffer_bytes_wanted;
367   GUnixFDList                        *read_fd_list;
368   GSocketControlMessage             **read_ancillary_messages;
369   gint                                read_num_ancillary_messages;
370
371   /* used for writing */
372   GMutex                             *write_lock;
373   GQueue                             *write_queue;
374   gboolean                            write_is_pending;
375 };
376
377 struct _MessageToWriteData ;
378 typedef struct _MessageToWriteData MessageToWriteData;
379
380 static void message_to_write_data_free (MessageToWriteData *data);
381
382 static GDBusWorker *
383 _g_dbus_worker_ref (GDBusWorker *worker)
384 {
385   g_atomic_int_inc (&worker->ref_count);
386   return worker;
387 }
388
389 static void
390 _g_dbus_worker_unref (GDBusWorker *worker)
391 {
392   if (g_atomic_int_dec_and_test (&worker->ref_count))
393     {
394       _g_dbus_shared_thread_unref ();
395
396       g_object_unref (worker->stream);
397
398       g_mutex_free (worker->read_lock);
399       g_object_unref (worker->cancellable);
400       if (worker->read_fd_list != NULL)
401         g_object_unref (worker->read_fd_list);
402
403       g_mutex_free (worker->write_lock);
404       g_queue_foreach (worker->write_queue,
405                        (GFunc) message_to_write_data_free,
406                        NULL);
407       g_queue_free (worker->write_queue);
408       g_free (worker);
409     }
410 }
411
412 static void
413 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
414                                   gboolean      remote_peer_vanished,
415                                   GError       *error)
416 {
417   if (!worker->stopped)
418     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
419 }
420
421 static void
422 _g_dbus_worker_emit_message (GDBusWorker  *worker,
423                              GDBusMessage *message)
424 {
425   if (!worker->stopped)
426     worker->message_received_callback (worker, message, worker->user_data);
427 }
428
429 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
430
431 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
432 static void
433 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
434                            GAsyncResult  *res,
435                            gpointer       user_data)
436 {
437   GDBusWorker *worker = user_data;
438   GError *error;
439   gssize bytes_read;
440
441   g_mutex_lock (worker->read_lock);
442
443   /* If already stopped, don't even process the reply */
444   if (worker->stopped)
445     goto out;
446
447   error = NULL;
448   if (worker->socket == NULL)
449     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
450                                              res,
451                                              &error);
452   else
453     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
454                                                               res,
455                                                               &error);
456   if (worker->read_num_ancillary_messages > 0)
457     {
458       gint n;
459       for (n = 0; n < worker->read_num_ancillary_messages; n++)
460         {
461           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
462
463           if (FALSE)
464             {
465             }
466 #ifdef G_OS_UNIX
467           else if (G_IS_UNIX_FD_MESSAGE (control_message))
468             {
469               GUnixFDMessage *fd_message;
470               gint *fds;
471               gint num_fds;
472
473               fd_message = G_UNIX_FD_MESSAGE (control_message);
474               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
475               if (worker->read_fd_list == NULL)
476                 {
477                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
478                 }
479               else
480                 {
481                   gint n;
482                   for (n = 0; n < num_fds; n++)
483                     {
484                       /* TODO: really want a append_steal() */
485                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
486                       close (fds[n]);
487                     }
488                 }
489               g_free (fds);
490             }
491           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
492             {
493               /* do nothing */
494             }
495 #endif
496           else
497             {
498               if (error == NULL)
499                 {
500                   g_set_error (&error,
501                                G_IO_ERROR,
502                                G_IO_ERROR_FAILED,
503                                "Unexpected ancillary message of type %s received from peer",
504                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
505                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
506                   g_error_free (error);
507                   g_object_unref (control_message);
508                   n++;
509                   while (n < worker->read_num_ancillary_messages)
510                     g_object_unref (worker->read_ancillary_messages[n++]);
511                   g_free (worker->read_ancillary_messages);
512                   goto out;
513                 }
514             }
515           g_object_unref (control_message);
516         }
517       g_free (worker->read_ancillary_messages);
518     }
519
520   if (bytes_read == -1)
521     {
522       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
523       g_error_free (error);
524       goto out;
525     }
526
527 #if 0
528   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
529            (gint) bytes_read,
530            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
531            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
532            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
533                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
534            worker->stream,
535            worker);
536 #endif
537
538   /* TODO: hmm, hmm... */
539   if (bytes_read == 0)
540     {
541       g_set_error (&error,
542                    G_IO_ERROR,
543                    G_IO_ERROR_FAILED,
544                    "Underlying GIOStream returned 0 bytes on an async read");
545       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
546       g_error_free (error);
547       goto out;
548     }
549
550   worker->read_buffer_cur_size += bytes_read;
551   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
552     {
553       /* OK, got what we asked for! */
554       if (worker->read_buffer_bytes_wanted == 16)
555         {
556           gssize message_len;
557           /* OK, got the header - determine how many more bytes are needed */
558           error = NULL;
559           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
560                                                      16,
561                                                      &error);
562           if (message_len == -1)
563             {
564               g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
565               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
566               g_error_free (error);
567               goto out;
568             }
569
570           worker->read_buffer_bytes_wanted = message_len;
571           _g_dbus_worker_do_read_unlocked (worker);
572         }
573       else
574         {
575           GDBusMessage *message;
576           error = NULL;
577
578           /* TODO: use connection->priv->auth to decode the message */
579
580           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
581                                                   worker->read_buffer_cur_size,
582                                                   worker->capabilities,
583                                                   &error);
584           if (message == NULL)
585             {
586               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
587               g_error_free (error);
588               goto out;
589             }
590
591           if (worker->read_fd_list != NULL)
592             {
593               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
594               worker->read_fd_list = NULL;
595             }
596
597           if (G_UNLIKELY (_g_dbus_debug_message ()))
598             {
599               gchar *s;
600               g_print ("========================================================================\n"
601                        "GDBus-debug:Message:\n"
602                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
603                        worker->read_buffer_cur_size);
604               s = g_dbus_message_print (message, 2);
605               g_print ("%s", s);
606               g_free (s);
607               s = hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
608               g_print ("%s\n", s);
609               g_free (s);
610             }
611
612           /* yay, got a message, go deliver it */
613           _g_dbus_worker_emit_message (worker, message);
614           g_object_unref (message);
615
616           /* start reading another message! */
617           worker->read_buffer_bytes_wanted = 0;
618           worker->read_buffer_cur_size = 0;
619           _g_dbus_worker_do_read_unlocked (worker);
620         }
621     }
622   else
623     {
624       /* didn't get all the bytes we requested - so repeat the request... */
625       _g_dbus_worker_do_read_unlocked (worker);
626     }
627
628  out:
629   g_mutex_unlock (worker->read_lock);
630
631   /* gives up the reference acquired when calling g_input_stream_read_async() */
632   _g_dbus_worker_unref (worker);
633 }
634
635 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
636 static void
637 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
638 {
639   /* if bytes_wanted is zero, it means start reading a message */
640   if (worker->read_buffer_bytes_wanted == 0)
641     {
642       worker->read_buffer_cur_size = 0;
643       worker->read_buffer_bytes_wanted = 16;
644     }
645
646   /* ensure we have a (big enough) buffer */
647   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
648     {
649       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
650       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
651       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
652     }
653
654   if (worker->socket == NULL)
655     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
656                                worker->read_buffer + worker->read_buffer_cur_size,
657                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
658                                G_PRIORITY_DEFAULT,
659                                worker->cancellable,
660                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
661                                _g_dbus_worker_ref (worker));
662   else
663     {
664       worker->read_ancillary_messages = NULL;
665       worker->read_num_ancillary_messages = 0;
666       _g_socket_read_with_control_messages (worker->socket,
667                                             worker->read_buffer + worker->read_buffer_cur_size,
668                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
669                                             &worker->read_ancillary_messages,
670                                             &worker->read_num_ancillary_messages,
671                                             G_PRIORITY_DEFAULT,
672                                             worker->cancellable,
673                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
674                                             _g_dbus_worker_ref (worker));
675     }
676 }
677
678 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
679 static void
680 _g_dbus_worker_do_read (GDBusWorker *worker)
681 {
682   g_mutex_lock (worker->read_lock);
683   _g_dbus_worker_do_read_unlocked (worker);
684   g_mutex_unlock (worker->read_lock);
685 }
686
687 /* ---------------------------------------------------------------------------------------------------- */
688
689 struct _MessageToWriteData
690 {
691   GDBusMessage *message;
692   gchar        *blob;
693   gsize         blob_size;
694 };
695
696 static void
697 message_to_write_data_free (MessageToWriteData *data)
698 {
699   g_object_unref (data->message);
700   g_free (data->blob);
701   g_free (data);
702 }
703
704 /* ---------------------------------------------------------------------------------------------------- */
705
706 /* called in private thread shared by all GDBusConnection instances (with write-lock held) */
707 static gboolean
708 write_message (GDBusWorker         *worker,
709                MessageToWriteData  *data,
710                GError             **error)
711 {
712   gboolean ret;
713
714   g_return_val_if_fail (data->blob_size > 16, FALSE);
715
716   ret = FALSE;
717
718   /* First, the initial 16 bytes - special case UNIX sockets here
719    * since it may involve writing an ancillary message with file
720    * descriptors
721    */
722 #ifdef G_OS_UNIX
723   {
724     GOutputVector vector;
725     GSocketControlMessage *message;
726     GUnixFDList *fd_list;
727     gssize bytes_written;
728
729     fd_list = g_dbus_message_get_unix_fd_list (data->message);
730
731     message = NULL;
732     if (fd_list != NULL)
733       {
734         if (!G_IS_UNIX_CONNECTION (worker->stream))
735           {
736             g_set_error (error,
737                          G_IO_ERROR,
738                          G_IO_ERROR_INVALID_ARGUMENT,
739                          "Tried sending a file descriptor on unsupported stream of type %s",
740                          g_type_name (G_TYPE_FROM_INSTANCE (worker->stream)));
741             goto out;
742           }
743         else if (!(worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
744           {
745             g_set_error_literal (error,
746                                  G_IO_ERROR,
747                                  G_IO_ERROR_INVALID_ARGUMENT,
748                                  "Tried sending a file descriptor but remote peer does not support this capability");
749             goto out;
750           }
751         message = g_unix_fd_message_new_with_fd_list (fd_list);
752       }
753
754     vector.buffer = data->blob;
755     vector.size = 16;
756
757     bytes_written = g_socket_send_message (worker->socket,
758                                            NULL, /* address */
759                                            &vector,
760                                            1,
761                                            message != NULL ? &message : NULL,
762                                            message != NULL ? 1 : 0,
763                                            G_SOCKET_MSG_NONE,
764                                            worker->cancellable,
765                                            error);
766     if (bytes_written == -1)
767       {
768         g_prefix_error (error, _("Error writing first 16 bytes of message to socket: "));
769         if (message != NULL)
770           g_object_unref (message);
771         goto out;
772       }
773     if (message != NULL)
774       g_object_unref (message);
775
776     if (bytes_written < 16)
777       {
778         /* TODO: I think this needs to be handled ... are we guaranteed that the ancillary
779          * messages are sent?
780          */
781         g_assert_not_reached ();
782       }
783   }
784 #else
785   /* write the first 16 bytes (guaranteed to return an error if everything can't be written) */
786   if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream),
787                                   (const gchar *) data->blob,
788                                   16,
789                                   NULL, /* bytes_written */
790                                   worker->cancellable, /* cancellable */
791                                   error))
792     goto out;
793 #endif
794
795   /* Then write the rest of the message (guaranteed to return an error if everything can't be written) */
796   if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream),
797                                   (const gchar *) data->blob + 16,
798                                   data->blob_size - 16,
799                                   NULL, /* bytes_written */
800                                   worker->cancellable, /* cancellable */
801                                   error))
802     goto out;
803
804   ret = TRUE;
805
806   if (G_UNLIKELY (_g_dbus_debug_message ()))
807     {
808       gchar *s;
809       g_print ("========================================================================\n"
810                "GDBus-debug:Message:\n"
811                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
812                data->blob_size);
813       s = g_dbus_message_print (data->message, 2);
814       g_print ("%s", s);
815       g_free (s);
816       s = hexdump (data->blob, data->blob_size, 2);
817       g_print ("%s\n", s);
818       g_free (s);
819     }
820
821  out:
822   return ret;
823 }
824
825 /* ---------------------------------------------------------------------------------------------------- */
826
827 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
828 static gboolean
829 write_message_in_idle_cb (gpointer user_data)
830 {
831   GDBusWorker *worker = user_data;
832   gboolean more_writes_are_pending;
833   MessageToWriteData *data;
834   GError *error;
835
836   g_mutex_lock (worker->write_lock);
837
838   data = g_queue_pop_head (worker->write_queue);
839   g_assert (data != NULL);
840
841   error = NULL;
842   if (!write_message (worker,
843                       data,
844                       &error))
845     {
846       /* TODO: handle */
847       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
848       g_error_free (error);
849     }
850   message_to_write_data_free (data);
851
852   more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0);
853
854   worker->write_is_pending = more_writes_are_pending;
855   g_mutex_unlock (worker->write_lock);
856
857   return more_writes_are_pending;
858 }
859
860 /* ---------------------------------------------------------------------------------------------------- */
861
862 /* can be called from any thread - steals blob */
863 void
864 _g_dbus_worker_send_message (GDBusWorker    *worker,
865                              GDBusMessage   *message,
866                              gchar          *blob,
867                              gsize           blob_len)
868 {
869   MessageToWriteData *data;
870
871   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
872   g_return_if_fail (blob != NULL);
873   g_return_if_fail (blob_len > 16);
874
875   data = g_new0 (MessageToWriteData, 1);
876   data->message = g_object_ref (message);
877   data->blob = blob; /* steal! */
878   data->blob_size = blob_len;
879
880   g_mutex_lock (worker->write_lock);
881   g_queue_push_tail (worker->write_queue, data);
882   if (!worker->write_is_pending)
883     {
884       GSource *idle_source;
885
886       worker->write_is_pending = TRUE;
887
888       idle_source = g_idle_source_new ();
889       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
890       g_source_set_callback (idle_source,
891                              write_message_in_idle_cb,
892                              _g_dbus_worker_ref (worker),
893                              (GDestroyNotify) _g_dbus_worker_unref);
894       g_source_attach (idle_source, shared_thread_data->context);
895       g_source_unref (idle_source);
896     }
897   g_mutex_unlock (worker->write_lock);
898 }
899
900 /* ---------------------------------------------------------------------------------------------------- */
901
902 static void
903 _g_dbus_worker_thread_begin_func (gpointer user_data)
904 {
905   GDBusWorker *worker = user_data;
906
907   worker->thread = g_thread_self ();
908
909   /* begin reading */
910   _g_dbus_worker_do_read (worker);
911 }
912
913 GDBusWorker *
914 _g_dbus_worker_new (GIOStream                          *stream,
915                     GDBusCapabilityFlags                capabilities,
916                     GDBusWorkerMessageReceivedCallback  message_received_callback,
917                     GDBusWorkerDisconnectedCallback     disconnected_callback,
918                     gpointer                            user_data)
919 {
920   GDBusWorker *worker;
921
922   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
923   g_return_val_if_fail (message_received_callback != NULL, NULL);
924   g_return_val_if_fail (disconnected_callback != NULL, NULL);
925
926   worker = g_new0 (GDBusWorker, 1);
927   worker->ref_count = 1;
928
929   worker->read_lock = g_mutex_new ();
930   worker->message_received_callback = message_received_callback;
931   worker->disconnected_callback = disconnected_callback;
932   worker->user_data = user_data;
933   worker->stream = g_object_ref (stream);
934   worker->capabilities = capabilities;
935   worker->cancellable = g_cancellable_new ();
936
937   worker->write_lock = g_mutex_new ();
938   worker->write_queue = g_queue_new ();
939
940   if (G_IS_SOCKET_CONNECTION (worker->stream))
941     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
942
943   _g_dbus_shared_thread_ref (_g_dbus_worker_thread_begin_func, worker);
944
945   return worker;
946 }
947
948 /* This can be called from any thread - frees worker - guarantees no callbacks
949  * will ever be issued again
950  */
951 void
952 _g_dbus_worker_stop (GDBusWorker *worker)
953 {
954   /* If we're called in the worker thread it means we are called from
955    * a worker callback. This is fine, we just can't lock in that case since
956    * we're already holding the lock...
957    */
958   if (g_thread_self () != worker->thread)
959     g_mutex_lock (worker->read_lock);
960   worker->stopped = TRUE;
961   if (g_thread_self () != worker->thread)
962     g_mutex_unlock (worker->read_lock);
963
964   g_cancellable_cancel (worker->cancellable);
965   _g_dbus_worker_unref (worker);
966 }
967
968 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
969 #define G_DBUS_DEBUG_MESSAGE        (1<<1)
970 #define G_DBUS_DEBUG_ALL            0xffffffff
971 static gint _gdbus_debug_flags = 0;
972
973 gboolean
974 _g_dbus_debug_authentication (void)
975 {
976   _g_dbus_initialize ();
977   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
978 }
979
980 gboolean
981 _g_dbus_debug_message (void)
982 {
983   _g_dbus_initialize ();
984   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
985 }
986
987 /*
988  * _g_dbus_initialize:
989  *
990  * Does various one-time init things such as
991  *
992  *  - registering the G_DBUS_ERROR error domain
993  *  - parses the G_DBUS_DEBUG environment variable
994  */
995 void
996 _g_dbus_initialize (void)
997 {
998   static volatile gsize initialized = 0;
999
1000   if (g_once_init_enter (&initialized))
1001     {
1002       volatile GQuark g_dbus_error_domain;
1003       const gchar *debug;
1004
1005       g_dbus_error_domain = G_DBUS_ERROR;
1006
1007       debug = g_getenv ("G_DBUS_DEBUG");
1008       if (debug != NULL)
1009         {
1010           gchar **tokens;
1011           guint n;
1012           tokens = g_strsplit (debug, ",", 0);
1013           for (n = 0; tokens[n] != NULL; n++)
1014             {
1015               if (g_strcmp0 (tokens[n], "authentication") == 0)
1016                 _gdbus_debug_flags |= G_DBUS_DEBUG_AUTHENTICATION;
1017               else if (g_strcmp0 (tokens[n], "message") == 0)
1018                 _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1019               else if (g_strcmp0 (tokens[n], "all") == 0)
1020                 _gdbus_debug_flags |= G_DBUS_DEBUG_ALL;
1021             }
1022           g_strfreev (tokens);
1023         }
1024
1025       g_once_init_leave (&initialized, 1);
1026     }
1027 }
1028
1029 /* ---------------------------------------------------------------------------------------------------- */
1030
1031 gchar *
1032 _g_dbus_compute_complete_signature (GDBusArgInfo **args,
1033                                     gboolean       include_parentheses)
1034 {
1035   GString *s;
1036   guint n;
1037
1038   if (include_parentheses)
1039     s = g_string_new ("(");
1040   else
1041     s = g_string_new ("");
1042   if (args != NULL)
1043     for (n = 0; args[n] != NULL; n++)
1044       g_string_append (s, args[n]->signature);
1045
1046   if (include_parentheses)
1047     g_string_append_c (s, ')');
1048
1049   return g_string_free (s, FALSE);
1050 }
1051
1052 #define __G_DBUS_PRIVATE_C__
1053 #include "gioaliasdef.c"