Merge remote branch 'gvdb/master'
[platform/upstream/glib.git] / gio / gdbusprivate.c
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: David Zeuthen <davidz@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <stdlib.h>
26 #include <string.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30
31 #include "giotypes.h"
32 #include "gsocket.h"
33 #include "gdbusprivate.h"
34 #include "gdbusmessage.h"
35 #include "gdbuserror.h"
36 #include "gdbusintrospection.h"
37 #include "gasyncresult.h"
38 #include "gsimpleasyncresult.h"
39 #include "ginputstream.h"
40 #include "gmemoryinputstream.h"
41 #include "giostream.h"
42 #include "gsocketcontrolmessage.h"
43 #include "gsocketconnection.h"
44 #include "gsocketoutputstream.h"
45
46 #ifdef G_OS_UNIX
47 #include "gunixfdmessage.h"
48 #include "gunixconnection.h"
49 #include "gunixcredentialsmessage.h"
50 #endif
51
52 #ifdef G_OS_WIN32
53 #include <windows.h>
54 #endif
55
56 #include "glibintl.h"
57
58 /* ---------------------------------------------------------------------------------------------------- */
59
60 gchar *
61 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
62 {
63  guint n, m;
64  GString *ret;
65
66  ret = g_string_new (NULL);
67
68  for (n = 0; n < len; n += 16)
69    {
70      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
71
72      for (m = n; m < n + 16; m++)
73        {
74          if (m > n && (m%4) == 0)
75            g_string_append_c (ret, ' ');
76          if (m < len)
77            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
78          else
79            g_string_append (ret, "   ");
80        }
81
82      g_string_append (ret, "   ");
83
84      for (m = n; m < len && m < n + 16; m++)
85        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
86
87      g_string_append_c (ret, '\n');
88    }
89
90  return g_string_free (ret, FALSE);
91 }
92
93 /* ---------------------------------------------------------------------------------------------------- */
94
95 /* Unfortunately ancillary messages are discarded when reading from a
96  * socket using the GSocketInputStream abstraction. So we provide a
97  * very GInputStream-ish API that uses GSocket in this case (very
98  * similar to GSocketInputStream).
99  */
100
101 typedef struct
102 {
103   GSocket *socket;
104   GCancellable *cancellable;
105
106   void *buffer;
107   gsize count;
108
109   GSocketControlMessage ***messages;
110   gint *num_messages;
111
112   GSimpleAsyncResult *simple;
113
114   gboolean from_mainloop;
115 } ReadWithControlData;
116
117 static void
118 read_with_control_data_free (ReadWithControlData *data)
119 {
120   g_object_unref (data->socket);
121   if (data->cancellable != NULL)
122     g_object_unref (data->cancellable);
123   g_object_unref (data->simple);
124   g_free (data);
125 }
126
127 static gboolean
128 _g_socket_read_with_control_messages_ready (GSocket      *socket,
129                                             GIOCondition  condition,
130                                             gpointer      user_data)
131 {
132   ReadWithControlData *data = user_data;
133   GError *error;
134   gssize result;
135   GInputVector vector;
136
137   error = NULL;
138   vector.buffer = data->buffer;
139   vector.size = data->count;
140   result = g_socket_receive_message (data->socket,
141                                      NULL, /* address */
142                                      &vector,
143                                      1,
144                                      data->messages,
145                                      data->num_messages,
146                                      NULL,
147                                      data->cancellable,
148                                      &error);
149   if (result >= 0)
150     {
151       g_simple_async_result_set_op_res_gssize (data->simple, result);
152     }
153   else
154     {
155       g_assert (error != NULL);
156       g_simple_async_result_take_error (data->simple, error);
157     }
158
159   if (data->from_mainloop)
160     g_simple_async_result_complete (data->simple);
161   else
162     g_simple_async_result_complete_in_idle (data->simple);
163
164   return FALSE;
165 }
166
167 static void
168 _g_socket_read_with_control_messages (GSocket                 *socket,
169                                       void                    *buffer,
170                                       gsize                    count,
171                                       GSocketControlMessage ***messages,
172                                       gint                    *num_messages,
173                                       gint                     io_priority,
174                                       GCancellable            *cancellable,
175                                       GAsyncReadyCallback      callback,
176                                       gpointer                 user_data)
177 {
178   ReadWithControlData *data;
179
180   data = g_new0 (ReadWithControlData, 1);
181   data->socket = g_object_ref (socket);
182   data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
183   data->buffer = buffer;
184   data->count = count;
185   data->messages = messages;
186   data->num_messages = num_messages;
187
188   data->simple = g_simple_async_result_new (G_OBJECT (socket),
189                                             callback,
190                                             user_data,
191                                             _g_socket_read_with_control_messages);
192
193   if (!g_socket_condition_check (socket, G_IO_IN))
194     {
195       GSource *source;
196       data->from_mainloop = TRUE;
197       source = g_socket_create_source (data->socket,
198                                        G_IO_IN | G_IO_HUP | G_IO_ERR,
199                                        cancellable);
200       g_source_set_callback (source,
201                              (GSourceFunc) _g_socket_read_with_control_messages_ready,
202                              data,
203                              (GDestroyNotify) read_with_control_data_free);
204       g_source_attach (source, g_main_context_get_thread_default ());
205       g_source_unref (source);
206     }
207   else
208     {
209       _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
210       read_with_control_data_free (data);
211     }
212 }
213
214 static gssize
215 _g_socket_read_with_control_messages_finish (GSocket       *socket,
216                                              GAsyncResult  *result,
217                                              GError       **error)
218 {
219   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
220
221   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
222   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
223
224   if (g_simple_async_result_propagate_error (simple, error))
225       return -1;
226   else
227     return g_simple_async_result_get_op_res_gssize (simple);
228 }
229
230 /* ---------------------------------------------------------------------------------------------------- */
231
232 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
233
234 static GPtrArray *ensured_classes = NULL;
235
236 static void
237 ensure_type (GType gtype)
238 {
239   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
240 }
241
242 static void
243 released_required_types (void)
244 {
245   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
246   g_ptr_array_unref (ensured_classes);
247   ensured_classes = NULL;
248 }
249
250 static void
251 ensure_required_types (void)
252 {
253   g_assert (ensured_classes == NULL);
254   ensured_classes = g_ptr_array_new ();
255   ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
256   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
257 }
258 /* ---------------------------------------------------------------------------------------------------- */
259
260 G_LOCK_DEFINE_STATIC (shared_thread_lock);
261
262 typedef struct
263 {
264   gint num_users;
265   GThread *thread;
266   GMainContext *context;
267   GMainLoop *loop;
268 } SharedThreadData;
269
270 static SharedThreadData *shared_thread_data = NULL;
271
272 static gpointer
273 gdbus_shared_thread_func (gpointer data)
274 {
275   g_main_context_push_thread_default (shared_thread_data->context);
276   g_main_loop_run (shared_thread_data->loop);
277   g_main_context_pop_thread_default (shared_thread_data->context);
278   return NULL;
279 }
280
281 typedef void (*GDBusSharedThreadFunc) (gpointer user_data);
282
283 typedef struct
284 {
285   GDBusSharedThreadFunc func;
286   gpointer              user_data;
287   gboolean              done;
288 } CallerData;
289
290 static gboolean
291 invoke_caller (gpointer user_data)
292 {
293   CallerData *data = user_data;
294   data->func (data->user_data);
295   data->done = TRUE;
296   return FALSE;
297 }
298
299 /* ---------------------------------------------------------------------------------------------------- */
300
301 static void
302 _g_dbus_shared_thread_ref (GDBusSharedThreadFunc func,
303                            gpointer              user_data)
304 {
305   GError *error;
306   GSource *idle_source;
307   CallerData *data;
308   gboolean release_types;
309
310   G_LOCK (shared_thread_lock);
311
312   release_types = FALSE;
313
314   if (shared_thread_data != NULL)
315     {
316       shared_thread_data->num_users += 1;
317       goto have_thread;
318     }
319
320   shared_thread_data = g_new0 (SharedThreadData, 1);
321   shared_thread_data->num_users = 1;
322
323   /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
324   ensure_required_types ();
325   release_types = TRUE;
326
327   error = NULL;
328   shared_thread_data->context = g_main_context_new ();
329   shared_thread_data->loop = g_main_loop_new (shared_thread_data->context, FALSE);
330   shared_thread_data->thread = g_thread_create (gdbus_shared_thread_func,
331                                                 NULL,
332                                                 TRUE,
333                                                 &error);
334   g_assert_no_error (error);
335
336  have_thread:
337
338   data = g_new0 (CallerData, 1);
339   data->func = func;
340   data->user_data = user_data;
341   data->done = FALSE;
342
343   idle_source = g_idle_source_new ();
344   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
345   g_source_set_callback (idle_source,
346                          invoke_caller,
347                          data,
348                          NULL);
349   g_source_attach (idle_source, shared_thread_data->context);
350   g_source_unref (idle_source);
351
352   /* wait for the user code to run.. hmm.. probably use a condition variable instead */
353   while (!data->done)
354     g_thread_yield ();
355
356   if (release_types)
357     released_required_types ();
358
359   g_free (data);
360
361   G_UNLOCK (shared_thread_lock);
362 }
363
364 static void
365 _g_dbus_shared_thread_unref (void)
366 {
367   /* TODO: actually destroy the shared thread here */
368 #if 0
369   G_LOCK (shared_thread_lock);
370   g_assert (shared_thread_data != NULL);
371   shared_thread_data->num_users -= 1;
372   if (shared_thread_data->num_users == 0)
373     {
374       g_main_loop_quit (shared_thread_data->loop);
375       //g_thread_join (shared_thread_data->thread);
376       g_main_loop_unref (shared_thread_data->loop);
377       g_main_context_unref (shared_thread_data->context);
378       g_free (shared_thread_data);
379       shared_thread_data = NULL;
380       G_UNLOCK (shared_thread_lock);
381     }
382   else
383     {
384       G_UNLOCK (shared_thread_lock);
385     }
386 #endif
387 }
388
389 /* ---------------------------------------------------------------------------------------------------- */
390
391 struct GDBusWorker
392 {
393   volatile gint                       ref_count;
394
395   gboolean                            stopped;
396
397   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
398    * only affects messages received from the other peer (since GDBusServer is the
399    * only user) - we might want it to affect messages sent to the other peer too?
400    */
401   gboolean                            frozen;
402   GQueue                             *received_messages_while_frozen;
403
404   GIOStream                          *stream;
405   GDBusCapabilityFlags                capabilities;
406   GCancellable                       *cancellable;
407   GDBusWorkerMessageReceivedCallback  message_received_callback;
408   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
409   GDBusWorkerDisconnectedCallback     disconnected_callback;
410   gpointer                            user_data;
411
412   GThread                            *thread;
413
414   /* if not NULL, stream is GSocketConnection */
415   GSocket *socket;
416
417   /* used for reading */
418   GMutex                             *read_lock;
419   gchar                              *read_buffer;
420   gsize                               read_buffer_allocated_size;
421   gsize                               read_buffer_cur_size;
422   gsize                               read_buffer_bytes_wanted;
423   GUnixFDList                        *read_fd_list;
424   GSocketControlMessage             **read_ancillary_messages;
425   gint                                read_num_ancillary_messages;
426
427   /* used for writing */
428   GMutex                             *write_lock;
429   GQueue                             *write_queue;
430   gint                                num_writes_pending;
431   guint64                             write_num_messages_written;
432   GList                              *write_pending_flushes;
433   gboolean                            flush_pending;
434 };
435
436 /* ---------------------------------------------------------------------------------------------------- */
437
438 typedef struct
439 {
440   GMutex *mutex;
441   GCond *cond;
442   guint64 number_to_wait_for;
443   GError *error;
444 } FlushData;
445
446 struct _MessageToWriteData ;
447 typedef struct _MessageToWriteData MessageToWriteData;
448
449 static void message_to_write_data_free (MessageToWriteData *data);
450
451 static void read_message_print_transport_debug (gssize bytes_read,
452                                                 GDBusWorker *worker);
453
454 static void write_message_print_transport_debug (gssize bytes_written,
455                                                  MessageToWriteData *data);
456
457 /* ---------------------------------------------------------------------------------------------------- */
458
459 static GDBusWorker *
460 _g_dbus_worker_ref (GDBusWorker *worker)
461 {
462   g_atomic_int_inc (&worker->ref_count);
463   return worker;
464 }
465
466 static void
467 _g_dbus_worker_unref (GDBusWorker *worker)
468 {
469   if (g_atomic_int_dec_and_test (&worker->ref_count))
470     {
471       g_assert (worker->write_pending_flushes == NULL);
472
473       _g_dbus_shared_thread_unref ();
474
475       g_object_unref (worker->stream);
476
477       g_mutex_free (worker->read_lock);
478       g_object_unref (worker->cancellable);
479       if (worker->read_fd_list != NULL)
480         g_object_unref (worker->read_fd_list);
481
482       g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
483       g_queue_free (worker->received_messages_while_frozen);
484
485       g_mutex_free (worker->write_lock);
486       g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
487       g_queue_free (worker->write_queue);
488
489       g_free (worker->read_buffer);
490
491       g_free (worker);
492     }
493 }
494
495 static void
496 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
497                                   gboolean      remote_peer_vanished,
498                                   GError       *error)
499 {
500   if (!worker->stopped)
501     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
502 }
503
504 static void
505 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
506                                       GDBusMessage *message)
507 {
508   if (!worker->stopped)
509     worker->message_received_callback (worker, message, worker->user_data);
510 }
511
512 static GDBusMessage *
513 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
514                                               GDBusMessage *message)
515 {
516   GDBusMessage *ret;
517   if (!worker->stopped)
518     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
519   else
520     ret = message;
521   return ret;
522 }
523
524 /* can only be called from private thread with read-lock held - takes ownership of @message */
525 static void
526 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
527                                                   GDBusMessage *message)
528 {
529   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
530     {
531       /* queue up */
532       g_queue_push_tail (worker->received_messages_while_frozen, message);
533     }
534   else
535     {
536       /* not frozen, nor anything in queue */
537       _g_dbus_worker_emit_message_received (worker, message);
538       g_object_unref (message);
539     }
540 }
541
542 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
543 static gboolean
544 unfreeze_in_idle_cb (gpointer user_data)
545 {
546   GDBusWorker *worker = user_data;
547   GDBusMessage *message;
548
549   g_mutex_lock (worker->read_lock);
550   if (worker->frozen)
551     {
552       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
553         {
554           _g_dbus_worker_emit_message_received (worker, message);
555           g_object_unref (message);
556         }
557       worker->frozen = FALSE;
558     }
559   else
560     {
561       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
562     }
563   g_mutex_unlock (worker->read_lock);
564   return FALSE;
565 }
566
567 /* can be called from any thread */
568 void
569 _g_dbus_worker_unfreeze (GDBusWorker *worker)
570 {
571   GSource *idle_source;
572   idle_source = g_idle_source_new ();
573   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
574   g_source_set_callback (idle_source,
575                          unfreeze_in_idle_cb,
576                          _g_dbus_worker_ref (worker),
577                          (GDestroyNotify) _g_dbus_worker_unref);
578   g_source_attach (idle_source, shared_thread_data->context);
579   g_source_unref (idle_source);
580 }
581
582 /* ---------------------------------------------------------------------------------------------------- */
583
584 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
585
586 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
587 static void
588 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
589                            GAsyncResult  *res,
590                            gpointer       user_data)
591 {
592   GDBusWorker *worker = user_data;
593   GError *error;
594   gssize bytes_read;
595
596   g_mutex_lock (worker->read_lock);
597
598   /* If already stopped, don't even process the reply */
599   if (worker->stopped)
600     goto out;
601
602   error = NULL;
603   if (worker->socket == NULL)
604     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
605                                              res,
606                                              &error);
607   else
608     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
609                                                               res,
610                                                               &error);
611   if (worker->read_num_ancillary_messages > 0)
612     {
613       gint n;
614       for (n = 0; n < worker->read_num_ancillary_messages; n++)
615         {
616           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
617
618           if (FALSE)
619             {
620             }
621 #ifdef G_OS_UNIX
622           else if (G_IS_UNIX_FD_MESSAGE (control_message))
623             {
624               GUnixFDMessage *fd_message;
625               gint *fds;
626               gint num_fds;
627
628               fd_message = G_UNIX_FD_MESSAGE (control_message);
629               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
630               if (worker->read_fd_list == NULL)
631                 {
632                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
633                 }
634               else
635                 {
636                   gint n;
637                   for (n = 0; n < num_fds; n++)
638                     {
639                       /* TODO: really want a append_steal() */
640                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
641                       close (fds[n]);
642                     }
643                 }
644               g_free (fds);
645             }
646           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
647             {
648               /* do nothing */
649             }
650 #endif
651           else
652             {
653               if (error == NULL)
654                 {
655                   g_set_error (&error,
656                                G_IO_ERROR,
657                                G_IO_ERROR_FAILED,
658                                "Unexpected ancillary message of type %s received from peer",
659                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
660                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
661                   g_error_free (error);
662                   g_object_unref (control_message);
663                   n++;
664                   while (n < worker->read_num_ancillary_messages)
665                     g_object_unref (worker->read_ancillary_messages[n++]);
666                   g_free (worker->read_ancillary_messages);
667                   goto out;
668                 }
669             }
670           g_object_unref (control_message);
671         }
672       g_free (worker->read_ancillary_messages);
673     }
674
675   if (bytes_read == -1)
676     {
677       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
678       g_error_free (error);
679       goto out;
680     }
681
682 #if 0
683   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
684            (gint) bytes_read,
685            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
686            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
687            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
688                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
689            worker->stream,
690            worker);
691 #endif
692
693   /* TODO: hmm, hmm... */
694   if (bytes_read == 0)
695     {
696       g_set_error (&error,
697                    G_IO_ERROR,
698                    G_IO_ERROR_FAILED,
699                    "Underlying GIOStream returned 0 bytes on an async read");
700       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
701       g_error_free (error);
702       goto out;
703     }
704
705   read_message_print_transport_debug (bytes_read, worker);
706
707   worker->read_buffer_cur_size += bytes_read;
708   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
709     {
710       /* OK, got what we asked for! */
711       if (worker->read_buffer_bytes_wanted == 16)
712         {
713           gssize message_len;
714           /* OK, got the header - determine how many more bytes are needed */
715           error = NULL;
716           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
717                                                      16,
718                                                      &error);
719           if (message_len == -1)
720             {
721               g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
722               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
723               g_error_free (error);
724               goto out;
725             }
726
727           worker->read_buffer_bytes_wanted = message_len;
728           _g_dbus_worker_do_read_unlocked (worker);
729         }
730       else
731         {
732           GDBusMessage *message;
733           error = NULL;
734
735           /* TODO: use connection->priv->auth to decode the message */
736
737           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
738                                                   worker->read_buffer_cur_size,
739                                                   worker->capabilities,
740                                                   &error);
741           if (message == NULL)
742             {
743               gchar *s;
744               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
745               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
746                          "The error is: %s\n"
747                          "The payload is as follows:\n"
748                          "%s\n",
749                          worker->read_buffer_cur_size,
750                          error->message,
751                          s);
752               g_free (s);
753               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
754               g_error_free (error);
755               goto out;
756             }
757
758 #ifdef G_OS_UNIX
759           if (worker->read_fd_list != NULL)
760             {
761               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
762               g_object_unref (worker->read_fd_list);
763               worker->read_fd_list = NULL;
764             }
765 #endif
766
767           if (G_UNLIKELY (_g_dbus_debug_message ()))
768             {
769               gchar *s;
770               _g_dbus_debug_print_lock ();
771               g_print ("========================================================================\n"
772                        "GDBus-debug:Message:\n"
773                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
774                        worker->read_buffer_cur_size);
775               s = g_dbus_message_print (message, 2);
776               g_print ("%s", s);
777               g_free (s);
778               if (G_UNLIKELY (_g_dbus_debug_payload ()))
779                 {
780                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
781                   g_print ("%s\n", s);
782                   g_free (s);
783                 }
784               _g_dbus_debug_print_unlock ();
785             }
786
787           /* yay, got a message, go deliver it */
788           _g_dbus_worker_queue_or_deliver_received_message (worker, message);
789
790           /* start reading another message! */
791           worker->read_buffer_bytes_wanted = 0;
792           worker->read_buffer_cur_size = 0;
793           _g_dbus_worker_do_read_unlocked (worker);
794         }
795     }
796   else
797     {
798       /* didn't get all the bytes we requested - so repeat the request... */
799       _g_dbus_worker_do_read_unlocked (worker);
800     }
801
802  out:
803   g_mutex_unlock (worker->read_lock);
804
805   /* gives up the reference acquired when calling g_input_stream_read_async() */
806   _g_dbus_worker_unref (worker);
807 }
808
809 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
810 static void
811 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
812 {
813   /* if bytes_wanted is zero, it means start reading a message */
814   if (worker->read_buffer_bytes_wanted == 0)
815     {
816       worker->read_buffer_cur_size = 0;
817       worker->read_buffer_bytes_wanted = 16;
818     }
819
820   /* ensure we have a (big enough) buffer */
821   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
822     {
823       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
824       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
825       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
826     }
827
828   if (worker->socket == NULL)
829     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
830                                worker->read_buffer + worker->read_buffer_cur_size,
831                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
832                                G_PRIORITY_DEFAULT,
833                                worker->cancellable,
834                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
835                                _g_dbus_worker_ref (worker));
836   else
837     {
838       worker->read_ancillary_messages = NULL;
839       worker->read_num_ancillary_messages = 0;
840       _g_socket_read_with_control_messages (worker->socket,
841                                             worker->read_buffer + worker->read_buffer_cur_size,
842                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
843                                             &worker->read_ancillary_messages,
844                                             &worker->read_num_ancillary_messages,
845                                             G_PRIORITY_DEFAULT,
846                                             worker->cancellable,
847                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
848                                             _g_dbus_worker_ref (worker));
849     }
850 }
851
852 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
853 static void
854 _g_dbus_worker_do_read (GDBusWorker *worker)
855 {
856   g_mutex_lock (worker->read_lock);
857   _g_dbus_worker_do_read_unlocked (worker);
858   g_mutex_unlock (worker->read_lock);
859 }
860
861 /* ---------------------------------------------------------------------------------------------------- */
862
863 struct _MessageToWriteData
864 {
865   GDBusWorker  *worker;
866   GDBusMessage *message;
867   gchar        *blob;
868   gsize         blob_size;
869
870   gsize               total_written;
871   GSimpleAsyncResult *simple;
872
873 };
874
875 static void
876 message_to_write_data_free (MessageToWriteData *data)
877 {
878   _g_dbus_worker_unref (data->worker);
879   if (data->message)
880     g_object_unref (data->message);
881   g_free (data->blob);
882   g_free (data);
883 }
884
885 /* ---------------------------------------------------------------------------------------------------- */
886
887 static void write_message_continue_writing (MessageToWriteData *data);
888
889 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
890 static void
891 write_message_async_cb (GObject      *source_object,
892                         GAsyncResult *res,
893                         gpointer      user_data)
894 {
895   MessageToWriteData *data = user_data;
896   GSimpleAsyncResult *simple;
897   gssize bytes_written;
898   GError *error;
899
900   /* Note: we can't access data->simple after calling g_async_result_complete () because the
901    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
902    */
903   simple = data->simple;
904
905   error = NULL;
906   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
907                                                 res,
908                                                 &error);
909   if (bytes_written == -1)
910     {
911       g_simple_async_result_take_error (simple, error);
912       g_simple_async_result_complete (simple);
913       g_object_unref (simple);
914       goto out;
915     }
916   g_assert (bytes_written > 0); /* zero is never returned */
917
918   write_message_print_transport_debug (bytes_written, data);
919
920   data->total_written += bytes_written;
921   g_assert (data->total_written <= data->blob_size);
922   if (data->total_written == data->blob_size)
923     {
924       g_simple_async_result_complete (simple);
925       g_object_unref (simple);
926       goto out;
927     }
928
929   write_message_continue_writing (data);
930
931  out:
932   ;
933 }
934
935 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
936 static gboolean
937 on_socket_ready (GSocket      *socket,
938                  GIOCondition  condition,
939                  gpointer      user_data)
940 {
941   MessageToWriteData *data = user_data;
942   write_message_continue_writing (data);
943   return FALSE; /* remove source */
944 }
945
946 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
947 static void
948 write_message_continue_writing (MessageToWriteData *data)
949 {
950   GOutputStream *ostream;
951   GSimpleAsyncResult *simple;
952 #ifdef G_OS_UNIX
953   GUnixFDList *fd_list;
954 #endif
955
956   /* Note: we can't access data->simple after calling g_async_result_complete () because the
957    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
958    */
959   simple = data->simple;
960
961   ostream = g_io_stream_get_output_stream (data->worker->stream);
962 #ifdef G_OS_UNIX
963   fd_list = g_dbus_message_get_unix_fd_list (data->message);
964 #endif
965
966   g_assert (!g_output_stream_has_pending (ostream));
967   g_assert_cmpint (data->total_written, <, data->blob_size);
968
969   if (FALSE)
970     {
971     }
972 #ifdef G_OS_UNIX
973   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
974     {
975       GOutputVector vector;
976       GSocketControlMessage *control_message;
977       gssize bytes_written;
978       GError *error;
979
980       vector.buffer = data->blob;
981       vector.size = data->blob_size;
982
983       control_message = NULL;
984       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
985         {
986           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
987             {
988               g_simple_async_result_set_error (simple,
989                                                G_IO_ERROR,
990                                                G_IO_ERROR_FAILED,
991                                                "Tried sending a file descriptor but remote peer does not support this capability");
992               g_simple_async_result_complete (simple);
993               g_object_unref (simple);
994               goto out;
995             }
996           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
997         }
998
999       error = NULL;
1000       bytes_written = g_socket_send_message (data->worker->socket,
1001                                              NULL, /* address */
1002                                              &vector,
1003                                              1,
1004                                              control_message != NULL ? &control_message : NULL,
1005                                              control_message != NULL ? 1 : 0,
1006                                              G_SOCKET_MSG_NONE,
1007                                              data->worker->cancellable,
1008                                              &error);
1009       if (control_message != NULL)
1010         g_object_unref (control_message);
1011
1012       if (bytes_written == -1)
1013         {
1014           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1015           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1016             {
1017               GSource *source;
1018               source = g_socket_create_source (data->worker->socket,
1019                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
1020                                                data->worker->cancellable);
1021               g_source_set_callback (source,
1022                                      (GSourceFunc) on_socket_ready,
1023                                      data,
1024                                      NULL); /* GDestroyNotify */
1025               g_source_attach (source, g_main_context_get_thread_default ());
1026               g_source_unref (source);
1027               g_error_free (error);
1028               goto out;
1029             }
1030           g_simple_async_result_take_error (simple, error);
1031           g_simple_async_result_complete (simple);
1032           g_object_unref (simple);
1033           goto out;
1034         }
1035       g_assert (bytes_written > 0); /* zero is never returned */
1036
1037       write_message_print_transport_debug (bytes_written, data);
1038
1039       data->total_written += bytes_written;
1040       g_assert (data->total_written <= data->blob_size);
1041       if (data->total_written == data->blob_size)
1042         {
1043           g_simple_async_result_complete (simple);
1044           g_object_unref (simple);
1045           goto out;
1046         }
1047
1048       write_message_continue_writing (data);
1049     }
1050 #endif
1051   else
1052     {
1053 #ifdef G_OS_UNIX
1054       if (fd_list != NULL)
1055         {
1056           g_simple_async_result_set_error (simple,
1057                                            G_IO_ERROR,
1058                                            G_IO_ERROR_FAILED,
1059                                            "Tried sending a file descriptor on unsupported stream of type %s",
1060                                            g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1061           g_simple_async_result_complete (simple);
1062           g_object_unref (simple);
1063           goto out;
1064         }
1065 #endif
1066
1067       g_output_stream_write_async (ostream,
1068                                    (const gchar *) data->blob + data->total_written,
1069                                    data->blob_size - data->total_written,
1070                                    G_PRIORITY_DEFAULT,
1071                                    data->worker->cancellable,
1072                                    write_message_async_cb,
1073                                    data);
1074     }
1075  out:
1076   ;
1077 }
1078
1079 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1080 static void
1081 write_message_async (GDBusWorker         *worker,
1082                      MessageToWriteData  *data,
1083                      GAsyncReadyCallback  callback,
1084                      gpointer             user_data)
1085 {
1086   data->simple = g_simple_async_result_new (NULL,
1087                                             callback,
1088                                             user_data,
1089                                             write_message_async);
1090   data->total_written = 0;
1091   write_message_continue_writing (data);
1092 }
1093
1094 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1095 static gboolean
1096 write_message_finish (GAsyncResult   *res,
1097                       GError        **error)
1098 {
1099   g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
1100   if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
1101     return FALSE;
1102   else
1103     return TRUE;
1104 }
1105 /* ---------------------------------------------------------------------------------------------------- */
1106
1107 static void maybe_write_next_message (GDBusWorker *worker);
1108
1109 typedef struct
1110 {
1111   GDBusWorker *worker;
1112   GList *flushers;
1113 } FlushAsyncData;
1114
1115 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1116 static void
1117 ostream_flush_cb (GObject      *source_object,
1118                   GAsyncResult *res,
1119                   gpointer      user_data)
1120 {
1121   FlushAsyncData *data = user_data;
1122   GError *error;
1123   GList *l;
1124
1125   error = NULL;
1126   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1127                                 res,
1128                                 &error);
1129
1130   if (error == NULL)
1131     {
1132       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1133         {
1134           _g_dbus_debug_print_lock ();
1135           g_print ("========================================================================\n"
1136                    "GDBus-debug:Transport:\n"
1137                    "  ---- FLUSHED stream of type %s\n",
1138                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1139           _g_dbus_debug_print_unlock ();
1140         }
1141     }
1142
1143   g_assert (data->flushers != NULL);
1144   for (l = data->flushers; l != NULL; l = l->next)
1145     {
1146       FlushData *f = l->data;
1147
1148       f->error = error != NULL ? g_error_copy (error) : NULL;
1149
1150       g_mutex_lock (f->mutex);
1151       g_cond_signal (f->cond);
1152       g_mutex_unlock (f->mutex);
1153     }
1154   g_list_free (data->flushers);
1155
1156   if (error != NULL)
1157     g_error_free (error);
1158
1159   /* Make sure we tell folks that we don't have additional
1160      flushes pending */
1161   g_mutex_lock (data->worker->write_lock);
1162   data->worker->flush_pending = FALSE;
1163   g_mutex_unlock (data->worker->write_lock);
1164
1165   /* OK, cool, finally kick off the next write */
1166   maybe_write_next_message (data->worker);
1167
1168   _g_dbus_worker_unref (data->worker);
1169   g_free (data);
1170 }
1171
1172 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1173 static void
1174 message_written (GDBusWorker *worker,
1175                  MessageToWriteData *message_data)
1176 {
1177   GList *l;
1178   GList *ll;
1179   GList *flushers;
1180
1181   /* first log the fact that we wrote a message */
1182   if (G_UNLIKELY (_g_dbus_debug_message ()))
1183     {
1184       gchar *s;
1185       _g_dbus_debug_print_lock ();
1186       g_print ("========================================================================\n"
1187                "GDBus-debug:Message:\n"
1188                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1189                message_data->blob_size);
1190       s = g_dbus_message_print (message_data->message, 2);
1191       g_print ("%s", s);
1192       g_free (s);
1193       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1194         {
1195           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1196           g_print ("%s\n", s);
1197           g_free (s);
1198         }
1199       _g_dbus_debug_print_unlock ();
1200     }
1201
1202   /* then first wake up pending flushes and, if needed, flush the stream */
1203   flushers = NULL;
1204   g_mutex_lock (worker->write_lock);
1205   worker->write_num_messages_written += 1;
1206   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1207     {
1208       FlushData *f = l->data;
1209       ll = l->next;
1210
1211       if (f->number_to_wait_for == worker->write_num_messages_written)
1212         {
1213           flushers = g_list_append (flushers, f);
1214           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1215         }
1216     }
1217   if (flushers != NULL)
1218     {
1219       worker->flush_pending = TRUE;
1220     }
1221   g_mutex_unlock (worker->write_lock);
1222
1223   if (flushers != NULL)
1224     {
1225       FlushAsyncData *data;
1226       data = g_new0 (FlushAsyncData, 1);
1227       data->worker = _g_dbus_worker_ref (worker);
1228       data->flushers = flushers;
1229       /* flush the stream before writing the next message */
1230       g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
1231                                    G_PRIORITY_DEFAULT,
1232                                    worker->cancellable,
1233                                    ostream_flush_cb,
1234                                    data);
1235     }
1236   else
1237     {
1238       /* kick off the next write! */
1239       maybe_write_next_message (worker);
1240     }
1241 }
1242
1243 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1244 static void
1245 write_message_cb (GObject       *source_object,
1246                   GAsyncResult  *res,
1247                   gpointer       user_data)
1248 {
1249   MessageToWriteData *data = user_data;
1250   GError *error;
1251
1252   g_mutex_lock (data->worker->write_lock);
1253   data->worker->num_writes_pending -= 1;
1254   g_mutex_unlock (data->worker->write_lock);
1255
1256   error = NULL;
1257   if (!write_message_finish (res, &error))
1258     {
1259       /* TODO: handle */
1260       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1261       g_error_free (error);
1262     }
1263
1264   /* this function will also kick of the next write (it might need to
1265    * flush so writing the next message might happen much later
1266    * e.g. async)
1267    */
1268   message_written (data->worker, data);
1269
1270   message_to_write_data_free (data);
1271 }
1272
1273 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1274 static void
1275 maybe_write_next_message (GDBusWorker *worker)
1276 {
1277   MessageToWriteData *data;
1278
1279  write_next:
1280
1281   g_mutex_lock (worker->write_lock);
1282   data = g_queue_pop_head (worker->write_queue);
1283   if (data != NULL)
1284     worker->num_writes_pending += 1;
1285   g_mutex_unlock (worker->write_lock);
1286
1287   /* Note that write_lock is only used for protecting the @write_queue
1288    * and @num_writes_pending fields of the GDBusWorker struct ... which we
1289    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1290    *
1291    * Therefore, it's fine to drop it here when calling back into user
1292    * code and then writing the message out onto the GIOStream since this
1293    * function only runs on the worker thread.
1294    */
1295   if (data != NULL)
1296     {
1297       GDBusMessage *old_message;
1298       guchar *new_blob;
1299       gsize new_blob_size;
1300       GError *error;
1301
1302       old_message = data->message;
1303       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1304       if (data->message == old_message)
1305         {
1306           /* filters had no effect - do nothing */
1307         }
1308       else if (data->message == NULL)
1309         {
1310           /* filters dropped message */
1311           g_mutex_lock (worker->write_lock);
1312           worker->num_writes_pending -= 1;
1313           g_mutex_unlock (worker->write_lock);
1314           message_to_write_data_free (data);
1315           goto write_next;
1316         }
1317       else
1318         {
1319           /* filters altered the message -> reencode */
1320           error = NULL;
1321           new_blob = g_dbus_message_to_blob (data->message,
1322                                              &new_blob_size,
1323                                              worker->capabilities,
1324                                              &error);
1325           if (new_blob == NULL)
1326             {
1327               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1328                * the old message instead
1329                */
1330               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1331                          g_dbus_message_get_serial (data->message),
1332                          error->message);
1333               g_error_free (error);
1334             }
1335           else
1336             {
1337               g_free (data->blob);
1338               data->blob = (gchar *) new_blob;
1339               data->blob_size = new_blob_size;
1340             }
1341         }
1342
1343       write_message_async (worker,
1344                            data,
1345                            write_message_cb,
1346                            data);
1347     }
1348 }
1349
1350 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1351 static gboolean
1352 write_message_in_idle_cb (gpointer user_data)
1353 {
1354   GDBusWorker *worker = user_data;
1355   if (worker->num_writes_pending == 0 && !worker->flush_pending)
1356     maybe_write_next_message (worker);
1357   return FALSE;
1358 }
1359
1360 /* ---------------------------------------------------------------------------------------------------- */
1361
1362 /* can be called from any thread - steals blob */
1363 void
1364 _g_dbus_worker_send_message (GDBusWorker    *worker,
1365                              GDBusMessage   *message,
1366                              gchar          *blob,
1367                              gsize           blob_len)
1368 {
1369   MessageToWriteData *data;
1370
1371   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1372   g_return_if_fail (blob != NULL);
1373   g_return_if_fail (blob_len > 16);
1374
1375   data = g_new0 (MessageToWriteData, 1);
1376   data->worker = _g_dbus_worker_ref (worker);
1377   data->message = g_object_ref (message);
1378   data->blob = blob; /* steal! */
1379   data->blob_size = blob_len;
1380
1381   g_mutex_lock (worker->write_lock);
1382   g_queue_push_tail (worker->write_queue, data);
1383   if (worker->num_writes_pending == 0)
1384     {
1385       GSource *idle_source;
1386       idle_source = g_idle_source_new ();
1387       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1388       g_source_set_callback (idle_source,
1389                              write_message_in_idle_cb,
1390                              _g_dbus_worker_ref (worker),
1391                              (GDestroyNotify) _g_dbus_worker_unref);
1392       g_source_attach (idle_source, shared_thread_data->context);
1393       g_source_unref (idle_source);
1394     }
1395   g_mutex_unlock (worker->write_lock);
1396 }
1397
1398 /* ---------------------------------------------------------------------------------------------------- */
1399
1400 static void
1401 _g_dbus_worker_thread_begin_func (gpointer user_data)
1402 {
1403   GDBusWorker *worker = user_data;
1404
1405   worker->thread = g_thread_self ();
1406
1407   /* begin reading */
1408   _g_dbus_worker_do_read (worker);
1409 }
1410
1411 GDBusWorker *
1412 _g_dbus_worker_new (GIOStream                              *stream,
1413                     GDBusCapabilityFlags                    capabilities,
1414                     gboolean                                initially_frozen,
1415                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1416                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1417                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1418                     gpointer                                user_data)
1419 {
1420   GDBusWorker *worker;
1421
1422   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1423   g_return_val_if_fail (message_received_callback != NULL, NULL);
1424   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1425   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1426
1427   worker = g_new0 (GDBusWorker, 1);
1428   worker->ref_count = 1;
1429
1430   worker->read_lock = g_mutex_new ();
1431   worker->message_received_callback = message_received_callback;
1432   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1433   worker->disconnected_callback = disconnected_callback;
1434   worker->user_data = user_data;
1435   worker->stream = g_object_ref (stream);
1436   worker->capabilities = capabilities;
1437   worker->cancellable = g_cancellable_new ();
1438   worker->flush_pending = FALSE;
1439
1440   worker->frozen = initially_frozen;
1441   worker->received_messages_while_frozen = g_queue_new ();
1442
1443   worker->write_lock = g_mutex_new ();
1444   worker->write_queue = g_queue_new ();
1445
1446   if (G_IS_SOCKET_CONNECTION (worker->stream))
1447     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1448
1449   _g_dbus_shared_thread_ref (_g_dbus_worker_thread_begin_func, worker);
1450
1451   return worker;
1452 }
1453
1454 /* ---------------------------------------------------------------------------------------------------- */
1455
1456 /* This can be called from any thread - frees worker. Note that
1457  * callbacks might still happen if called from another thread than the
1458  * worker - use your own synchronization primitive in the callbacks.
1459  */
1460 void
1461 _g_dbus_worker_stop (GDBusWorker *worker)
1462 {
1463   worker->stopped = TRUE;
1464   g_cancellable_cancel (worker->cancellable);
1465   _g_dbus_worker_unref (worker);
1466 }
1467
1468 /* ---------------------------------------------------------------------------------------------------- */
1469
1470 /* can be called from any thread (except the worker thread) - blocks
1471  * calling thread until all queued outgoing messages are written and
1472  * the transport has been flushed
1473  */
1474 gboolean
1475 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1476                            GCancellable   *cancellable,
1477                            GError        **error)
1478 {
1479   gboolean ret;
1480   FlushData *data;
1481
1482   data = NULL;
1483   ret = TRUE;
1484
1485   /* if the queue is empty, there's nothing to wait for */
1486   g_mutex_lock (worker->write_lock);
1487   if (g_queue_get_length (worker->write_queue) > 0)
1488     {
1489       data = g_new0 (FlushData, 1);
1490       data->mutex = g_mutex_new ();
1491       data->cond = g_cond_new ();
1492       data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
1493       g_mutex_lock (data->mutex);
1494       worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
1495     }
1496   g_mutex_unlock (worker->write_lock);
1497
1498   if (data != NULL)
1499     {
1500       g_cond_wait (data->cond, data->mutex);
1501       g_mutex_unlock (data->mutex);
1502
1503       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1504       g_cond_free (data->cond);
1505       g_mutex_free (data->mutex);
1506       if (data->error != NULL)
1507         {
1508           ret = FALSE;
1509           g_propagate_error (error, data->error);
1510         }
1511       g_free (data);
1512     }
1513
1514   return ret;
1515 }
1516
1517 /* ---------------------------------------------------------------------------------------------------- */
1518
1519 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1520 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1521 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1522 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1523 #define G_DBUS_DEBUG_CALL           (1<<4)
1524 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1525 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1526 #define G_DBUS_DEBUG_RETURN         (1<<7)
1527 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1528 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1529
1530 static gint _gdbus_debug_flags = 0;
1531
1532 gboolean
1533 _g_dbus_debug_authentication (void)
1534 {
1535   _g_dbus_initialize ();
1536   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1537 }
1538
1539 gboolean
1540 _g_dbus_debug_transport (void)
1541 {
1542   _g_dbus_initialize ();
1543   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1544 }
1545
1546 gboolean
1547 _g_dbus_debug_message (void)
1548 {
1549   _g_dbus_initialize ();
1550   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1551 }
1552
1553 gboolean
1554 _g_dbus_debug_payload (void)
1555 {
1556   _g_dbus_initialize ();
1557   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1558 }
1559
1560 gboolean
1561 _g_dbus_debug_call (void)
1562 {
1563   _g_dbus_initialize ();
1564   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1565 }
1566
1567 gboolean
1568 _g_dbus_debug_signal (void)
1569 {
1570   _g_dbus_initialize ();
1571   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1572 }
1573
1574 gboolean
1575 _g_dbus_debug_incoming (void)
1576 {
1577   _g_dbus_initialize ();
1578   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1579 }
1580
1581 gboolean
1582 _g_dbus_debug_return (void)
1583 {
1584   _g_dbus_initialize ();
1585   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1586 }
1587
1588 gboolean
1589 _g_dbus_debug_emission (void)
1590 {
1591   _g_dbus_initialize ();
1592   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1593 }
1594
1595 gboolean
1596 _g_dbus_debug_address (void)
1597 {
1598   _g_dbus_initialize ();
1599   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1600 }
1601
1602 G_LOCK_DEFINE_STATIC (print_lock);
1603
1604 void
1605 _g_dbus_debug_print_lock (void)
1606 {
1607   G_LOCK (print_lock);
1608 }
1609
1610 void
1611 _g_dbus_debug_print_unlock (void)
1612 {
1613   G_UNLOCK (print_lock);
1614 }
1615
1616 /*
1617  * _g_dbus_initialize:
1618  *
1619  * Does various one-time init things such as
1620  *
1621  *  - registering the G_DBUS_ERROR error domain
1622  *  - parses the G_DBUS_DEBUG environment variable
1623  */
1624 void
1625 _g_dbus_initialize (void)
1626 {
1627   static volatile gsize initialized = 0;
1628
1629   if (g_once_init_enter (&initialized))
1630     {
1631       volatile GQuark g_dbus_error_domain;
1632       const gchar *debug;
1633
1634       g_dbus_error_domain = G_DBUS_ERROR;
1635
1636       debug = g_getenv ("G_DBUS_DEBUG");
1637       if (debug != NULL)
1638         {
1639           const GDebugKey keys[] = {
1640             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1641             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
1642             { "message",        G_DBUS_DEBUG_MESSAGE        },
1643             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
1644             { "call",           G_DBUS_DEBUG_CALL           },
1645             { "signal",         G_DBUS_DEBUG_SIGNAL         },
1646             { "incoming",       G_DBUS_DEBUG_INCOMING       },
1647             { "return",         G_DBUS_DEBUG_RETURN         },
1648             { "emission",       G_DBUS_DEBUG_EMISSION       },
1649             { "address",        G_DBUS_DEBUG_ADDRESS        }
1650           };
1651
1652           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1653           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1654             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1655         }
1656
1657       g_once_init_leave (&initialized, 1);
1658     }
1659 }
1660
1661 /* ---------------------------------------------------------------------------------------------------- */
1662
1663 GVariantType *
1664 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1665 {
1666   const GVariantType *arg_types[256];
1667   guint n;
1668
1669   if (args)
1670     for (n = 0; args[n] != NULL; n++)
1671       {
1672         /* DBus places a hard limit of 255 on signature length.
1673          * therefore number of args must be less than 256.
1674          */
1675         g_assert (n < 256);
1676
1677         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1678
1679         if G_UNLIKELY (arg_types[n] == NULL)
1680           return NULL;
1681       }
1682   else
1683     n = 0;
1684
1685   return g_variant_type_new_tuple (arg_types, n);
1686 }
1687
1688 /* ---------------------------------------------------------------------------------------------------- */
1689
1690 #ifdef G_OS_WIN32
1691
1692 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1693
1694 gchar *
1695 _g_dbus_win32_get_user_sid (void)
1696 {
1697   HANDLE h;
1698   TOKEN_USER *user;
1699   DWORD token_information_len;
1700   PSID psid;
1701   gchar *sid;
1702   gchar *ret;
1703
1704   ret = NULL;
1705   user = NULL;
1706   h = INVALID_HANDLE_VALUE;
1707
1708   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
1709     {
1710       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
1711       goto out;
1712     }
1713
1714   /* Get length of buffer */
1715   token_information_len = 0;
1716   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
1717     {
1718       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
1719         {
1720           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1721           goto out;
1722         }
1723     }
1724   user = g_malloc (token_information_len);
1725   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
1726     {
1727       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1728       goto out;
1729     }
1730
1731   psid = user->User.Sid;
1732   if (!IsValidSid (psid))
1733     {
1734       g_warning ("Invalid SID");
1735       goto out;
1736     }
1737
1738   if (!ConvertSidToStringSidA (psid, &sid))
1739     {
1740       g_warning ("Invalid SID");
1741       goto out;
1742     }
1743
1744   ret = g_strdup (sid);
1745   LocalFree (sid);
1746
1747 out:
1748   g_free (user);
1749   if (h != INVALID_HANDLE_VALUE)
1750     CloseHandle (h);
1751   return ret;
1752 }
1753 #endif
1754
1755 /* ---------------------------------------------------------------------------------------------------- */
1756
1757 gchar *
1758 _g_dbus_get_machine_id (GError **error)
1759 {
1760   gchar *ret;
1761   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
1762   ret = NULL;
1763   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
1764                             &ret,
1765                             NULL,
1766                             error))
1767     {
1768       g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: "));
1769     }
1770   else
1771     {
1772       /* TODO: validate value */
1773       g_strstrip (ret);
1774     }
1775   return ret;
1776 }
1777
1778 /* ---------------------------------------------------------------------------------------------------- */
1779
1780 gchar *
1781 _g_dbus_enum_to_string (GType enum_type, gint value)
1782 {
1783   gchar *ret;
1784   GEnumClass *klass;
1785   GEnumValue *enum_value;
1786
1787   klass = g_type_class_ref (enum_type);
1788   enum_value = g_enum_get_value (klass, value);
1789   if (enum_value != NULL)
1790     ret = g_strdup (enum_value->value_nick);
1791   else
1792     ret = g_strdup_printf ("unknown (value %d)", value);
1793   g_type_class_unref (klass);
1794   return ret;
1795 }
1796
1797 /* ---------------------------------------------------------------------------------------------------- */
1798
1799 static void
1800 write_message_print_transport_debug (gssize bytes_written,
1801                                      MessageToWriteData *data)
1802 {
1803   if (G_LIKELY (!_g_dbus_debug_transport ()))
1804     goto out;
1805
1806   _g_dbus_debug_print_lock ();
1807   g_print ("========================================================================\n"
1808            "GDBus-debug:Transport:\n"
1809            "  >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
1810            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
1811            bytes_written,
1812            g_dbus_message_get_serial (data->message),
1813            data->blob_size,
1814            data->total_written,
1815            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1816   _g_dbus_debug_print_unlock ();
1817  out:
1818   ;
1819 }
1820
1821 /* ---------------------------------------------------------------------------------------------------- */
1822
1823 static void
1824 read_message_print_transport_debug (gssize bytes_read,
1825                                     GDBusWorker *worker)
1826 {
1827   gsize size;
1828   gint32 serial;
1829   gint32 message_length;
1830
1831   if (G_LIKELY (!_g_dbus_debug_transport ()))
1832     goto out;
1833
1834   size = bytes_read + worker->read_buffer_cur_size;
1835   serial = 0;
1836   message_length = 0;
1837   if (size >= 16)
1838     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
1839   if (size >= 1)
1840     {
1841       switch (worker->read_buffer[0])
1842         {
1843         case 'l':
1844           if (size >= 12)
1845             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
1846           break;
1847         case 'B':
1848           if (size >= 12)
1849             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
1850           break;
1851         default:
1852           /* an error will be set elsewhere if this happens */
1853           goto out;
1854         }
1855     }
1856
1857     _g_dbus_debug_print_lock ();
1858   g_print ("========================================================================\n"
1859            "GDBus-debug:Transport:\n"
1860            "  <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
1861            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
1862            bytes_read,
1863            serial,
1864            message_length,
1865            worker->read_buffer_cur_size,
1866            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
1867   _g_dbus_debug_print_unlock ();
1868  out:
1869   ;
1870 }