Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / dbus / bus.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "dbus/bus.h"
6
7 #include "base/bind.h"
8 #include "base/logging.h"
9 #include "base/message_loop/message_loop.h"
10 #include "base/message_loop/message_loop_proxy.h"
11 #include "base/stl_util.h"
12 #include "base/strings/stringprintf.h"
13 #include "base/threading/thread.h"
14 #include "base/threading/thread_restrictions.h"
15 #include "base/time/time.h"
16 #include "dbus/exported_object.h"
17 #include "dbus/message.h"
18 #include "dbus/object_manager.h"
19 #include "dbus/object_path.h"
20 #include "dbus/object_proxy.h"
21 #include "dbus/scoped_dbus_error.h"
22
23 namespace dbus {
24
25 namespace {
26
27 const char kDisconnectedSignal[] = "Disconnected";
28 const char kDisconnectedMatchRule[] =
29     "type='signal', path='/org/freedesktop/DBus/Local',"
30     "interface='org.freedesktop.DBus.Local', member='Disconnected'";
31
32 // The NameOwnerChanged member in org.freedesktop.DBus
33 const char kNameOwnerChangedSignal[] = "NameOwnerChanged";
34
35 // The match rule used to filter for changes to a given service name owner.
36 const char kServiceNameOwnerChangeMatchRule[] =
37     "type='signal',interface='org.freedesktop.DBus',"
38     "member='NameOwnerChanged',path='/org/freedesktop/DBus',"
39     "sender='org.freedesktop.DBus',arg0='%s'";
40
41 // The class is used for watching the file descriptor used for D-Bus
42 // communication.
43 class Watch : public base::MessagePumpLibevent::Watcher {
44  public:
45   explicit Watch(DBusWatch* watch)
46       : raw_watch_(watch) {
47     dbus_watch_set_data(raw_watch_, this, NULL);
48   }
49
50   virtual ~Watch() {
51     dbus_watch_set_data(raw_watch_, NULL, NULL);
52   }
53
54   // Returns true if the underlying file descriptor is ready to be watched.
55   bool IsReadyToBeWatched() {
56     return dbus_watch_get_enabled(raw_watch_);
57   }
58
59   // Starts watching the underlying file descriptor.
60   void StartWatching() {
61     const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_);
62     const int flags = dbus_watch_get_flags(raw_watch_);
63
64     base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ;
65     if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE))
66       mode = base::MessageLoopForIO::WATCH_READ_WRITE;
67     else if (flags & DBUS_WATCH_READABLE)
68       mode = base::MessageLoopForIO::WATCH_READ;
69     else if (flags & DBUS_WATCH_WRITABLE)
70       mode = base::MessageLoopForIO::WATCH_WRITE;
71     else
72       NOTREACHED();
73
74     const bool persistent = true;  // Watch persistently.
75     const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor(
76         file_descriptor, persistent, mode, &file_descriptor_watcher_, this);
77     CHECK(success) << "Unable to allocate memory";
78   }
79
80   // Stops watching the underlying file descriptor.
81   void StopWatching() {
82     file_descriptor_watcher_.StopWatchingFileDescriptor();
83   }
84
85  private:
86   // Implement MessagePumpLibevent::Watcher.
87   virtual void OnFileCanReadWithoutBlocking(int file_descriptor) OVERRIDE {
88     const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE);
89     CHECK(success) << "Unable to allocate memory";
90   }
91
92   // Implement MessagePumpLibevent::Watcher.
93   virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) OVERRIDE {
94     const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE);
95     CHECK(success) << "Unable to allocate memory";
96   }
97
98   DBusWatch* raw_watch_;
99   base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_;
100 };
101
102 // The class is used for monitoring the timeout used for D-Bus method
103 // calls.
104 //
105 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of
106 // the object is is alive when HandleTimeout() is called. It's unlikely
107 // but it may be possible that HandleTimeout() is called after
108 // Bus::OnRemoveTimeout(). That's why we don't simply delete the object in
109 // Bus::OnRemoveTimeout().
110 class Timeout : public base::RefCountedThreadSafe<Timeout> {
111  public:
112   explicit Timeout(DBusTimeout* timeout)
113       : raw_timeout_(timeout),
114         monitoring_is_active_(false),
115         is_completed(false) {
116     dbus_timeout_set_data(raw_timeout_, this, NULL);
117     AddRef();  // Balanced on Complete().
118   }
119
120   // Returns true if the timeout is ready to be monitored.
121   bool IsReadyToBeMonitored() {
122     return dbus_timeout_get_enabled(raw_timeout_);
123   }
124
125   // Starts monitoring the timeout.
126   void StartMonitoring(Bus* bus) {
127     bus->GetDBusTaskRunner()->PostDelayedTask(
128         FROM_HERE,
129         base::Bind(&Timeout::HandleTimeout, this),
130         GetInterval());
131     monitoring_is_active_ = true;
132   }
133
134   // Stops monitoring the timeout.
135   void StopMonitoring() {
136     // We cannot take back the delayed task we posted in
137     // StartMonitoring(), so we just mark the monitoring is inactive now.
138     monitoring_is_active_ = false;
139   }
140
141   // Returns the interval.
142   base::TimeDelta GetInterval() {
143     return base::TimeDelta::FromMilliseconds(
144         dbus_timeout_get_interval(raw_timeout_));
145   }
146
147   // Cleans up the raw_timeout and marks that timeout is completed.
148   // See the class comment above for why we are doing this.
149   void Complete() {
150     dbus_timeout_set_data(raw_timeout_, NULL, NULL);
151     is_completed = true;
152     Release();
153   }
154
155  private:
156   friend class base::RefCountedThreadSafe<Timeout>;
157   ~Timeout() {
158   }
159
160   // Handles the timeout.
161   void HandleTimeout() {
162     // If the timeout is marked completed, we should do nothing. This can
163     // occur if this function is called after Bus::OnRemoveTimeout().
164     if (is_completed)
165       return;
166     // Skip if monitoring is canceled.
167     if (!monitoring_is_active_)
168       return;
169
170     const bool success = dbus_timeout_handle(raw_timeout_);
171     CHECK(success) << "Unable to allocate memory";
172   }
173
174   DBusTimeout* raw_timeout_;
175   bool monitoring_is_active_;
176   bool is_completed;
177 };
178
179 }  // namespace
180
181 Bus::Options::Options()
182   : bus_type(SESSION),
183     connection_type(PRIVATE) {
184 }
185
186 Bus::Options::~Options() {
187 }
188
189 Bus::Bus(const Options& options)
190     : bus_type_(options.bus_type),
191       connection_type_(options.connection_type),
192       dbus_task_runner_(options.dbus_task_runner),
193       on_shutdown_(false /* manual_reset */, false /* initially_signaled */),
194       connection_(NULL),
195       origin_thread_id_(base::PlatformThread::CurrentId()),
196       async_operations_set_up_(false),
197       shutdown_completed_(false),
198       num_pending_watches_(0),
199       num_pending_timeouts_(0),
200       address_(options.address),
201       on_disconnected_closure_(options.disconnected_callback) {
202   // This is safe to call multiple times.
203   dbus_threads_init_default();
204   // The origin message loop is unnecessary if the client uses synchronous
205   // functions only.
206   if (base::MessageLoop::current())
207     origin_task_runner_ = base::MessageLoop::current()->message_loop_proxy();
208 }
209
210 Bus::~Bus() {
211   DCHECK(!connection_);
212   DCHECK(owned_service_names_.empty());
213   DCHECK(match_rules_added_.empty());
214   DCHECK(filter_functions_added_.empty());
215   DCHECK(registered_object_paths_.empty());
216   DCHECK_EQ(0, num_pending_watches_);
217   // TODO(satorux): This check fails occasionally in browser_tests for tests
218   // that run very quickly. Perhaps something does not have time to clean up.
219   // Despite the check failing, the tests seem to run fine. crosbug.com/23416
220   // DCHECK_EQ(0, num_pending_timeouts_);
221 }
222
223 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name,
224                                  const ObjectPath& object_path) {
225   return GetObjectProxyWithOptions(service_name, object_path,
226                                    ObjectProxy::DEFAULT_OPTIONS);
227 }
228
229 ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name,
230                                             const ObjectPath& object_path,
231                                             int options) {
232   AssertOnOriginThread();
233
234   // Check if we already have the requested object proxy.
235   const ObjectProxyTable::key_type key(service_name + object_path.value(),
236                                        options);
237   ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
238   if (iter != object_proxy_table_.end()) {
239     return iter->second.get();
240   }
241
242   scoped_refptr<ObjectProxy> object_proxy =
243       new ObjectProxy(this, service_name, object_path, options);
244   object_proxy_table_[key] = object_proxy;
245
246   return object_proxy.get();
247 }
248
249 bool Bus::RemoveObjectProxy(const std::string& service_name,
250                             const ObjectPath& object_path,
251                             const base::Closure& callback) {
252   return RemoveObjectProxyWithOptions(service_name, object_path,
253                                       ObjectProxy::DEFAULT_OPTIONS,
254                                       callback);
255 }
256
257 bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name,
258                                        const ObjectPath& object_path,
259                                        int options,
260                                        const base::Closure& callback) {
261   AssertOnOriginThread();
262
263   // Check if we have the requested object proxy.
264   const ObjectProxyTable::key_type key(service_name + object_path.value(),
265                                        options);
266   ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
267   if (iter != object_proxy_table_.end()) {
268     scoped_refptr<ObjectProxy> object_proxy = iter->second;
269     object_proxy_table_.erase(iter);
270     // Object is present. Remove it now and Detach on the DBus thread.
271     GetDBusTaskRunner()->PostTask(
272         FROM_HERE,
273         base::Bind(&Bus::RemoveObjectProxyInternal,
274                    this, object_proxy, callback));
275     return true;
276   }
277   return false;
278 }
279
280 void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy,
281                                     const base::Closure& callback) {
282   AssertOnDBusThread();
283
284   object_proxy.get()->Detach();
285
286   GetOriginTaskRunner()->PostTask(FROM_HERE, callback);
287 }
288
289 ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) {
290   AssertOnOriginThread();
291
292   // Check if we already have the requested exported object.
293   ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
294   if (iter != exported_object_table_.end()) {
295     return iter->second.get();
296   }
297
298   scoped_refptr<ExportedObject> exported_object =
299       new ExportedObject(this, object_path);
300   exported_object_table_[object_path] = exported_object;
301
302   return exported_object.get();
303 }
304
305 void Bus::UnregisterExportedObject(const ObjectPath& object_path) {
306   AssertOnOriginThread();
307
308   // Remove the registered object from the table first, to allow a new
309   // GetExportedObject() call to return a new object, rather than this one.
310   ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
311   if (iter == exported_object_table_.end())
312     return;
313
314   scoped_refptr<ExportedObject> exported_object = iter->second;
315   exported_object_table_.erase(iter);
316
317   // Post the task to perform the final unregistration to the D-Bus thread.
318   // Since the registration also happens on the D-Bus thread in
319   // TryRegisterObjectPath(), and the task runner we post to is a
320   // SequencedTaskRunner, there is a guarantee that this will happen before any
321   // future registration call.
322   GetDBusTaskRunner()->PostTask(
323       FROM_HERE,
324       base::Bind(&Bus::UnregisterExportedObjectInternal,
325                  this, exported_object));
326 }
327
328 void Bus::UnregisterExportedObjectInternal(
329     scoped_refptr<ExportedObject> exported_object) {
330   AssertOnDBusThread();
331
332   exported_object->Unregister();
333 }
334
335 ObjectManager* Bus::GetObjectManager(const std::string& service_name,
336                                      const ObjectPath& object_path) {
337   AssertOnOriginThread();
338
339   // Check if we already have the requested object manager.
340   const ObjectManagerTable::key_type key(service_name + object_path.value());
341   ObjectManagerTable::iterator iter = object_manager_table_.find(key);
342   if (iter != object_manager_table_.end()) {
343     return iter->second.get();
344   }
345
346   scoped_refptr<ObjectManager> object_manager =
347       new ObjectManager(this, service_name, object_path);
348   object_manager_table_[key] = object_manager;
349
350   return object_manager.get();
351 }
352
353 bool Bus::RemoveObjectManager(const std::string& service_name,
354                               const ObjectPath& object_path,
355                               const base::Closure& callback) {
356   AssertOnOriginThread();
357   DCHECK(!callback.is_null());
358
359   const ObjectManagerTable::key_type key(service_name + object_path.value());
360   ObjectManagerTable::iterator iter = object_manager_table_.find(key);
361   if (iter == object_manager_table_.end())
362     return false;
363
364   // ObjectManager is present. Remove it now and CleanUp on the DBus thread.
365   scoped_refptr<ObjectManager> object_manager = iter->second;
366   object_manager_table_.erase(iter);
367
368   GetDBusTaskRunner()->PostTask(
369       FROM_HERE,
370       base::Bind(&Bus::RemoveObjectManagerInternal,
371                  this, object_manager, callback));
372
373   return true;
374 }
375
376 void Bus::RemoveObjectManagerInternal(
377       scoped_refptr<dbus::ObjectManager> object_manager,
378       const base::Closure& callback) {
379   AssertOnDBusThread();
380   DCHECK(object_manager.get());
381
382   object_manager->CleanUp();
383
384   // The ObjectManager has to be deleted on the origin thread since it was
385   // created there.
386   GetOriginTaskRunner()->PostTask(
387       FROM_HERE,
388       base::Bind(&Bus::RemoveObjectManagerInternalHelper,
389                  this, object_manager, callback));
390 }
391
392 void Bus::RemoveObjectManagerInternalHelper(
393       scoped_refptr<dbus::ObjectManager> object_manager,
394       const base::Closure& callback) {
395   AssertOnOriginThread();
396   DCHECK(object_manager.get());
397
398   // Release the object manager and run the callback.
399   object_manager = NULL;
400   callback.Run();
401 }
402
403 void Bus::GetManagedObjects() {
404   for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
405        iter != object_manager_table_.end(); ++iter) {
406     iter->second->GetManagedObjects();
407   }
408 }
409
410 bool Bus::Connect() {
411   // dbus_bus_get_private() and dbus_bus_get() are blocking calls.
412   AssertOnDBusThread();
413
414   // Check if it's already initialized.
415   if (connection_)
416     return true;
417
418   ScopedDBusError error;
419   if (bus_type_ == CUSTOM_ADDRESS) {
420     if (connection_type_ == PRIVATE) {
421       connection_ = dbus_connection_open_private(address_.c_str(), error.get());
422     } else {
423       connection_ = dbus_connection_open(address_.c_str(), error.get());
424     }
425   } else {
426     const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_);
427     if (connection_type_ == PRIVATE) {
428       connection_ = dbus_bus_get_private(dbus_bus_type, error.get());
429     } else {
430       connection_ = dbus_bus_get(dbus_bus_type, error.get());
431     }
432   }
433   if (!connection_) {
434     LOG(ERROR) << "Failed to connect to the bus: "
435                << (error.is_set() ? error.message() : "");
436     return false;
437   }
438
439   if (bus_type_ == CUSTOM_ADDRESS) {
440     // We should call dbus_bus_register here, otherwise unique name can not be
441     // acquired. According to dbus specification, it is responsible to call
442     // org.freedesktop.DBus.Hello method at the beging of bus connection to
443     // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is
444     // called internally.
445     if (!dbus_bus_register(connection_, error.get())) {
446       LOG(ERROR) << "Failed to register the bus component: "
447                  << (error.is_set() ? error.message() : "");
448       return false;
449     }
450   }
451   // We shouldn't exit on the disconnected signal.
452   dbus_connection_set_exit_on_disconnect(connection_, false);
453
454   // Watch Disconnected signal.
455   AddFilterFunction(Bus::OnConnectionDisconnectedFilter, this);
456   AddMatch(kDisconnectedMatchRule, error.get());
457
458   return true;
459 }
460
461 void Bus::ClosePrivateConnection() {
462   // dbus_connection_close is blocking call.
463   AssertOnDBusThread();
464   DCHECK_EQ(PRIVATE, connection_type_)
465       << "non-private connection should not be closed";
466   dbus_connection_close(connection_);
467 }
468
469 void Bus::ShutdownAndBlock() {
470   AssertOnDBusThread();
471
472   if (shutdown_completed_)
473     return;  // Already shutdowned, just return.
474
475   // Unregister the exported objects.
476   for (ExportedObjectTable::iterator iter = exported_object_table_.begin();
477        iter != exported_object_table_.end(); ++iter) {
478     iter->second->Unregister();
479   }
480
481   // Release all service names.
482   for (std::set<std::string>::iterator iter = owned_service_names_.begin();
483        iter != owned_service_names_.end();) {
484     // This is a bit tricky but we should increment the iter here as
485     // ReleaseOwnership() may remove |service_name| from the set.
486     const std::string& service_name = *iter++;
487     ReleaseOwnership(service_name);
488   }
489   if (!owned_service_names_.empty()) {
490     LOG(ERROR) << "Failed to release all service names. # of services left: "
491                << owned_service_names_.size();
492   }
493
494   // Detach from the remote objects.
495   for (ObjectProxyTable::iterator iter = object_proxy_table_.begin();
496        iter != object_proxy_table_.end(); ++iter) {
497     iter->second->Detach();
498   }
499
500   // Clean up the object managers.
501   for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
502        iter != object_manager_table_.end(); ++iter) {
503     iter->second->CleanUp();
504   }
505
506   // Release object proxies and exported objects here. We should do this
507   // here rather than in the destructor to avoid memory leaks due to
508   // cyclic references.
509   object_proxy_table_.clear();
510   exported_object_table_.clear();
511
512   // Private connection should be closed.
513   if (connection_) {
514     // Remove Disconnected watcher.
515     ScopedDBusError error;
516     RemoveFilterFunction(Bus::OnConnectionDisconnectedFilter, this);
517     RemoveMatch(kDisconnectedMatchRule, error.get());
518
519     if (connection_type_ == PRIVATE)
520       ClosePrivateConnection();
521     // dbus_connection_close() won't unref.
522     dbus_connection_unref(connection_);
523   }
524
525   connection_ = NULL;
526   shutdown_completed_ = true;
527 }
528
529 void Bus::ShutdownOnDBusThreadAndBlock() {
530   AssertOnOriginThread();
531   DCHECK(dbus_task_runner_.get());
532
533   GetDBusTaskRunner()->PostTask(
534       FROM_HERE,
535       base::Bind(&Bus::ShutdownOnDBusThreadAndBlockInternal, this));
536
537   // http://crbug.com/125222
538   base::ThreadRestrictions::ScopedAllowWait allow_wait;
539
540   // Wait until the shutdown is complete on the D-Bus thread.
541   // The shutdown should not hang, but set timeout just in case.
542   const int kTimeoutSecs = 3;
543   const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs));
544   const bool signaled = on_shutdown_.TimedWait(timeout);
545   LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus";
546 }
547
548 void Bus::RequestOwnership(const std::string& service_name,
549                            ServiceOwnershipOptions options,
550                            OnOwnershipCallback on_ownership_callback) {
551   AssertOnOriginThread();
552
553   GetDBusTaskRunner()->PostTask(
554       FROM_HERE,
555       base::Bind(&Bus::RequestOwnershipInternal,
556                  this, service_name, options, on_ownership_callback));
557 }
558
559 void Bus::RequestOwnershipInternal(const std::string& service_name,
560                                    ServiceOwnershipOptions options,
561                                    OnOwnershipCallback on_ownership_callback) {
562   AssertOnDBusThread();
563
564   bool success = Connect();
565   if (success)
566     success = RequestOwnershipAndBlock(service_name, options);
567
568   GetOriginTaskRunner()->PostTask(FROM_HERE,
569                                   base::Bind(on_ownership_callback,
570                                              service_name,
571                                              success));
572 }
573
574 bool Bus::RequestOwnershipAndBlock(const std::string& service_name,
575                                    ServiceOwnershipOptions options) {
576   DCHECK(connection_);
577   // dbus_bus_request_name() is a blocking call.
578   AssertOnDBusThread();
579
580   // Check if we already own the service name.
581   if (owned_service_names_.find(service_name) != owned_service_names_.end()) {
582     return true;
583   }
584
585   ScopedDBusError error;
586   const int result = dbus_bus_request_name(connection_,
587                                            service_name.c_str(),
588                                            options,
589                                            error.get());
590   if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
591     LOG(ERROR) << "Failed to get the ownership of " << service_name << ": "
592                << (error.is_set() ? error.message() : "");
593     return false;
594   }
595   owned_service_names_.insert(service_name);
596   return true;
597 }
598
599 bool Bus::ReleaseOwnership(const std::string& service_name) {
600   DCHECK(connection_);
601   // dbus_bus_request_name() is a blocking call.
602   AssertOnDBusThread();
603
604   // Check if we already own the service name.
605   std::set<std::string>::iterator found =
606       owned_service_names_.find(service_name);
607   if (found == owned_service_names_.end()) {
608     LOG(ERROR) << service_name << " is not owned by the bus";
609     return false;
610   }
611
612   ScopedDBusError error;
613   const int result = dbus_bus_release_name(connection_, service_name.c_str(),
614                                            error.get());
615   if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) {
616     owned_service_names_.erase(found);
617     return true;
618   } else {
619     LOG(ERROR) << "Failed to release the ownership of " << service_name << ": "
620                << (error.is_set() ? error.message() : "")
621                << ", result code: " << result;
622     return false;
623   }
624 }
625
626 bool Bus::SetUpAsyncOperations() {
627   DCHECK(connection_);
628   AssertOnDBusThread();
629
630   if (async_operations_set_up_)
631     return true;
632
633   // Process all the incoming data if any, so that OnDispatchStatus() will
634   // be called when the incoming data is ready.
635   ProcessAllIncomingDataIfAny();
636
637   bool success = dbus_connection_set_watch_functions(connection_,
638                                                      &Bus::OnAddWatchThunk,
639                                                      &Bus::OnRemoveWatchThunk,
640                                                      &Bus::OnToggleWatchThunk,
641                                                      this,
642                                                      NULL);
643   CHECK(success) << "Unable to allocate memory";
644
645   success = dbus_connection_set_timeout_functions(connection_,
646                                                   &Bus::OnAddTimeoutThunk,
647                                                   &Bus::OnRemoveTimeoutThunk,
648                                                   &Bus::OnToggleTimeoutThunk,
649                                                   this,
650                                                   NULL);
651   CHECK(success) << "Unable to allocate memory";
652
653   dbus_connection_set_dispatch_status_function(
654       connection_,
655       &Bus::OnDispatchStatusChangedThunk,
656       this,
657       NULL);
658
659   async_operations_set_up_ = true;
660
661   return true;
662 }
663
664 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request,
665                                         int timeout_ms,
666                                         DBusError* error) {
667   DCHECK(connection_);
668   AssertOnDBusThread();
669
670   return dbus_connection_send_with_reply_and_block(
671       connection_, request, timeout_ms, error);
672 }
673
674 void Bus::SendWithReply(DBusMessage* request,
675                         DBusPendingCall** pending_call,
676                         int timeout_ms) {
677   DCHECK(connection_);
678   AssertOnDBusThread();
679
680   const bool success = dbus_connection_send_with_reply(
681       connection_, request, pending_call, timeout_ms);
682   CHECK(success) << "Unable to allocate memory";
683 }
684
685 void Bus::Send(DBusMessage* request, uint32* serial) {
686   DCHECK(connection_);
687   AssertOnDBusThread();
688
689   const bool success = dbus_connection_send(connection_, request, serial);
690   CHECK(success) << "Unable to allocate memory";
691 }
692
693 bool Bus::AddFilterFunction(DBusHandleMessageFunction filter_function,
694                             void* user_data) {
695   DCHECK(connection_);
696   AssertOnDBusThread();
697
698   std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
699       std::make_pair(filter_function, user_data);
700   if (filter_functions_added_.find(filter_data_pair) !=
701       filter_functions_added_.end()) {
702     VLOG(1) << "Filter function already exists: " << filter_function
703             << " with associated data: " << user_data;
704     return false;
705   }
706
707   const bool success = dbus_connection_add_filter(
708       connection_, filter_function, user_data, NULL);
709   CHECK(success) << "Unable to allocate memory";
710   filter_functions_added_.insert(filter_data_pair);
711   return true;
712 }
713
714 bool Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function,
715                                void* user_data) {
716   DCHECK(connection_);
717   AssertOnDBusThread();
718
719   std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
720       std::make_pair(filter_function, user_data);
721   if (filter_functions_added_.find(filter_data_pair) ==
722       filter_functions_added_.end()) {
723     VLOG(1) << "Requested to remove an unknown filter function: "
724             << filter_function
725             << " with associated data: " << user_data;
726     return false;
727   }
728
729   dbus_connection_remove_filter(connection_, filter_function, user_data);
730   filter_functions_added_.erase(filter_data_pair);
731   return true;
732 }
733
734 void Bus::AddMatch(const std::string& match_rule, DBusError* error) {
735   DCHECK(connection_);
736   AssertOnDBusThread();
737
738   std::map<std::string, int>::iterator iter =
739       match_rules_added_.find(match_rule);
740   if (iter != match_rules_added_.end()) {
741     // The already existing rule's counter is incremented.
742     iter->second++;
743
744     VLOG(1) << "Match rule already exists: " << match_rule;
745     return;
746   }
747
748   dbus_bus_add_match(connection_, match_rule.c_str(), error);
749   match_rules_added_[match_rule] = 1;
750 }
751
752 bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) {
753   DCHECK(connection_);
754   AssertOnDBusThread();
755
756   std::map<std::string, int>::iterator iter =
757       match_rules_added_.find(match_rule);
758   if (iter == match_rules_added_.end()) {
759     LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule;
760     return false;
761   }
762
763   // The rule's counter is decremented and the rule is deleted when reachs 0.
764   iter->second--;
765   if (iter->second == 0) {
766     dbus_bus_remove_match(connection_, match_rule.c_str(), error);
767     match_rules_added_.erase(match_rule);
768   }
769   return true;
770 }
771
772 bool Bus::TryRegisterObjectPath(const ObjectPath& object_path,
773                                 const DBusObjectPathVTable* vtable,
774                                 void* user_data,
775                                 DBusError* error) {
776   DCHECK(connection_);
777   AssertOnDBusThread();
778
779   if (registered_object_paths_.find(object_path) !=
780       registered_object_paths_.end()) {
781     LOG(ERROR) << "Object path already registered: " << object_path.value();
782     return false;
783   }
784
785   const bool success = dbus_connection_try_register_object_path(
786       connection_,
787       object_path.value().c_str(),
788       vtable,
789       user_data,
790       error);
791   if (success)
792     registered_object_paths_.insert(object_path);
793   return success;
794 }
795
796 void Bus::UnregisterObjectPath(const ObjectPath& object_path) {
797   DCHECK(connection_);
798   AssertOnDBusThread();
799
800   if (registered_object_paths_.find(object_path) ==
801       registered_object_paths_.end()) {
802     LOG(ERROR) << "Requested to unregister an unknown object path: "
803                << object_path.value();
804     return;
805   }
806
807   const bool success = dbus_connection_unregister_object_path(
808       connection_,
809       object_path.value().c_str());
810   CHECK(success) << "Unable to allocate memory";
811   registered_object_paths_.erase(object_path);
812 }
813
814 void Bus::ShutdownOnDBusThreadAndBlockInternal() {
815   AssertOnDBusThread();
816
817   ShutdownAndBlock();
818   on_shutdown_.Signal();
819 }
820
821 void Bus::ProcessAllIncomingDataIfAny() {
822   AssertOnDBusThread();
823
824   // As mentioned at the class comment in .h file, connection_ can be NULL.
825   if (!connection_)
826     return;
827
828   // It is safe and necessary to call dbus_connection_get_dispatch_status even
829   // if the connection is lost. Otherwise we will miss "Disconnected" signal.
830   // (crbug.com/174431)
831   if (dbus_connection_get_dispatch_status(connection_) ==
832       DBUS_DISPATCH_DATA_REMAINS) {
833     while (dbus_connection_dispatch(connection_) ==
834            DBUS_DISPATCH_DATA_REMAINS) {
835     }
836   }
837 }
838
839 base::TaskRunner* Bus::GetDBusTaskRunner() {
840   if (dbus_task_runner_.get())
841     return dbus_task_runner_.get();
842   else
843     return GetOriginTaskRunner();
844 }
845
846 base::TaskRunner* Bus::GetOriginTaskRunner() {
847   DCHECK(origin_task_runner_.get());
848   return origin_task_runner_.get();
849 }
850
851 bool Bus::HasDBusThread() {
852   return dbus_task_runner_.get() != NULL;
853 }
854
855 void Bus::AssertOnOriginThread() {
856   DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
857 }
858
859 void Bus::AssertOnDBusThread() {
860   base::ThreadRestrictions::AssertIOAllowed();
861
862   if (dbus_task_runner_.get()) {
863     DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread());
864   } else {
865     AssertOnOriginThread();
866   }
867 }
868
869 std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name,
870                                          GetServiceOwnerOption options) {
871   AssertOnDBusThread();
872
873   MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner");
874   MessageWriter writer(&get_name_owner_call);
875   writer.AppendString(service_name);
876   VLOG(1) << "Method call: " << get_name_owner_call.ToString();
877
878   const ObjectPath obj_path("/org/freedesktop/DBus");
879   if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") ||
880       !get_name_owner_call.SetPath(obj_path)) {
881     if (options == REPORT_ERRORS)
882       LOG(ERROR) << "Failed to get name owner.";
883     return "";
884   }
885
886   ScopedDBusError error;
887   DBusMessage* response_message =
888       SendWithReplyAndBlock(get_name_owner_call.raw_message(),
889                             ObjectProxy::TIMEOUT_USE_DEFAULT,
890                             error.get());
891   if (!response_message) {
892     if (options == REPORT_ERRORS) {
893       LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": "
894                  << error.message();
895     }
896     return "";
897   }
898
899   scoped_ptr<Response> response(Response::FromRawMessage(response_message));
900   MessageReader reader(response.get());
901
902   std::string service_owner;
903   if (!reader.PopString(&service_owner))
904     service_owner.clear();
905   return service_owner;
906 }
907
908 void Bus::GetServiceOwner(const std::string& service_name,
909                           const GetServiceOwnerCallback& callback) {
910   AssertOnOriginThread();
911
912   GetDBusTaskRunner()->PostTask(
913       FROM_HERE,
914       base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback));
915 }
916
917 void Bus::GetServiceOwnerInternal(const std::string& service_name,
918                                   const GetServiceOwnerCallback& callback) {
919   AssertOnDBusThread();
920
921   std::string service_owner;
922   if (Connect())
923     service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS);
924   GetOriginTaskRunner()->PostTask(FROM_HERE,
925                                   base::Bind(callback, service_owner));
926 }
927
928 void Bus::ListenForServiceOwnerChange(
929     const std::string& service_name,
930     const GetServiceOwnerCallback& callback) {
931   AssertOnOriginThread();
932   DCHECK(!service_name.empty());
933   DCHECK(!callback.is_null());
934
935   GetDBusTaskRunner()->PostTask(
936       FROM_HERE,
937       base::Bind(&Bus::ListenForServiceOwnerChangeInternal,
938                  this, service_name, callback));
939 }
940
941 void Bus::ListenForServiceOwnerChangeInternal(
942     const std::string& service_name,
943     const GetServiceOwnerCallback& callback) {
944   AssertOnDBusThread();
945   DCHECK(!service_name.empty());
946   DCHECK(!callback.is_null());
947
948   if (!Connect() || !SetUpAsyncOperations())
949     return;
950
951   if (service_owner_changed_listener_map_.empty()) {
952     bool filter_added =
953         AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
954     DCHECK(filter_added);
955   }
956
957   ServiceOwnerChangedListenerMap::iterator it =
958       service_owner_changed_listener_map_.find(service_name);
959   if (it == service_owner_changed_listener_map_.end()) {
960     // Add a match rule for the new service name.
961     const std::string name_owner_changed_match_rule =
962         base::StringPrintf(kServiceNameOwnerChangeMatchRule,
963                            service_name.c_str());
964     ScopedDBusError error;
965     AddMatch(name_owner_changed_match_rule, error.get());
966     if (error.is_set()) {
967       LOG(ERROR) << "Failed to add match rule for " << service_name
968                  << ". Got " << error.name() << ": " << error.message();
969       return;
970     }
971
972     service_owner_changed_listener_map_[service_name].push_back(callback);
973     return;
974   }
975
976   // Check if the callback has already been added.
977   std::vector<GetServiceOwnerCallback>& callbacks = it->second;
978   for (size_t i = 0; i < callbacks.size(); ++i) {
979     if (callbacks[i].Equals(callback))
980       return;
981   }
982   callbacks.push_back(callback);
983 }
984
985 void Bus::UnlistenForServiceOwnerChange(
986     const std::string& service_name,
987     const GetServiceOwnerCallback& callback) {
988   AssertOnOriginThread();
989   DCHECK(!service_name.empty());
990   DCHECK(!callback.is_null());
991
992   GetDBusTaskRunner()->PostTask(
993       FROM_HERE,
994       base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal,
995                  this, service_name, callback));
996 }
997
998 void Bus::UnlistenForServiceOwnerChangeInternal(
999     const std::string& service_name,
1000     const GetServiceOwnerCallback& callback) {
1001   AssertOnDBusThread();
1002   DCHECK(!service_name.empty());
1003   DCHECK(!callback.is_null());
1004
1005   ServiceOwnerChangedListenerMap::iterator it =
1006       service_owner_changed_listener_map_.find(service_name);
1007   if (it == service_owner_changed_listener_map_.end())
1008     return;
1009
1010   std::vector<GetServiceOwnerCallback>& callbacks = it->second;
1011   for (size_t i = 0; i < callbacks.size(); ++i) {
1012     if (callbacks[i].Equals(callback)) {
1013       callbacks.erase(callbacks.begin() + i);
1014       break;  // There can be only one.
1015     }
1016   }
1017   if (!callbacks.empty())
1018     return;
1019
1020   // Last callback for |service_name| has been removed, remove match rule.
1021   const std::string name_owner_changed_match_rule =
1022       base::StringPrintf(kServiceNameOwnerChangeMatchRule,
1023                          service_name.c_str());
1024   ScopedDBusError error;
1025   RemoveMatch(name_owner_changed_match_rule, error.get());
1026   // And remove |service_owner_changed_listener_map_| entry.
1027   service_owner_changed_listener_map_.erase(it);
1028
1029   if (service_owner_changed_listener_map_.empty()) {
1030     bool filter_removed =
1031         RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
1032     DCHECK(filter_removed);
1033   }
1034 }
1035
1036 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
1037   AssertOnDBusThread();
1038
1039   // watch will be deleted when raw_watch is removed in OnRemoveWatch().
1040   Watch* watch = new Watch(raw_watch);
1041   if (watch->IsReadyToBeWatched()) {
1042     watch->StartWatching();
1043   }
1044   ++num_pending_watches_;
1045   return true;
1046 }
1047
1048 void Bus::OnRemoveWatch(DBusWatch* raw_watch) {
1049   AssertOnDBusThread();
1050
1051   Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
1052   delete watch;
1053   --num_pending_watches_;
1054 }
1055
1056 void Bus::OnToggleWatch(DBusWatch* raw_watch) {
1057   AssertOnDBusThread();
1058
1059   Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
1060   if (watch->IsReadyToBeWatched()) {
1061     watch->StartWatching();
1062   } else {
1063     // It's safe to call this if StartWatching() wasn't called, per
1064     // message_pump_libevent.h.
1065     watch->StopWatching();
1066   }
1067 }
1068
1069 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) {
1070   AssertOnDBusThread();
1071
1072   // timeout will be deleted when raw_timeout is removed in
1073   // OnRemoveTimeoutThunk().
1074   Timeout* timeout = new Timeout(raw_timeout);
1075   if (timeout->IsReadyToBeMonitored()) {
1076     timeout->StartMonitoring(this);
1077   }
1078   ++num_pending_timeouts_;
1079   return true;
1080 }
1081
1082 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) {
1083   AssertOnDBusThread();
1084
1085   Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
1086   timeout->Complete();
1087   --num_pending_timeouts_;
1088 }
1089
1090 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) {
1091   AssertOnDBusThread();
1092
1093   Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
1094   if (timeout->IsReadyToBeMonitored()) {
1095     timeout->StartMonitoring(this);
1096   } else {
1097     timeout->StopMonitoring();
1098   }
1099 }
1100
1101 void Bus::OnDispatchStatusChanged(DBusConnection* connection,
1102                                   DBusDispatchStatus status) {
1103   DCHECK_EQ(connection, connection_);
1104   AssertOnDBusThread();
1105
1106   // We cannot call ProcessAllIncomingDataIfAny() here, as calling
1107   // dbus_connection_dispatch() inside DBusDispatchStatusFunction is
1108   // prohibited by the D-Bus library. Hence, we post a task here instead.
1109   // See comments for dbus_connection_set_dispatch_status_function().
1110   GetDBusTaskRunner()->PostTask(FROM_HERE,
1111                                 base::Bind(&Bus::ProcessAllIncomingDataIfAny,
1112                                            this));
1113 }
1114
1115 void Bus::OnConnectionDisconnected(DBusConnection* connection) {
1116   AssertOnDBusThread();
1117
1118   if (!on_disconnected_closure_.is_null())
1119     GetOriginTaskRunner()->PostTask(FROM_HERE, on_disconnected_closure_);
1120
1121   if (!connection)
1122     return;
1123   DCHECK(!dbus_connection_get_is_connected(connection));
1124
1125   ShutdownAndBlock();
1126 }
1127
1128 void Bus::OnServiceOwnerChanged(DBusMessage* message) {
1129   DCHECK(message);
1130   AssertOnDBusThread();
1131
1132   // |message| will be unrefed on exit of the function. Increment the
1133   // reference so we can use it in Signal::FromRawMessage() below.
1134   dbus_message_ref(message);
1135   scoped_ptr<Signal> signal(Signal::FromRawMessage(message));
1136
1137   // Confirm the validity of the NameOwnerChanged signal.
1138   if (signal->GetMember() != kNameOwnerChangedSignal ||
1139       signal->GetInterface() != DBUS_INTERFACE_DBUS ||
1140       signal->GetSender() != DBUS_SERVICE_DBUS) {
1141     return;
1142   }
1143
1144   MessageReader reader(signal.get());
1145   std::string service_name;
1146   std::string old_owner;
1147   std::string new_owner;
1148   if (!reader.PopString(&service_name) ||
1149       !reader.PopString(&old_owner) ||
1150       !reader.PopString(&new_owner)) {
1151     return;
1152   }
1153
1154   ServiceOwnerChangedListenerMap::const_iterator it =
1155       service_owner_changed_listener_map_.find(service_name);
1156   if (it == service_owner_changed_listener_map_.end())
1157     return;
1158
1159   const std::vector<GetServiceOwnerCallback>& callbacks = it->second;
1160   for (size_t i = 0; i < callbacks.size(); ++i) {
1161     GetOriginTaskRunner()->PostTask(FROM_HERE,
1162                                     base::Bind(callbacks[i], new_owner));
1163   }
1164 }
1165
1166 // static
1167 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) {
1168   Bus* self = static_cast<Bus*>(data);
1169   return self->OnAddWatch(raw_watch);
1170 }
1171
1172 // static
1173 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) {
1174   Bus* self = static_cast<Bus*>(data);
1175   self->OnRemoveWatch(raw_watch);
1176 }
1177
1178 // static
1179 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) {
1180   Bus* self = static_cast<Bus*>(data);
1181   self->OnToggleWatch(raw_watch);
1182 }
1183
1184 // static
1185 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
1186   Bus* self = static_cast<Bus*>(data);
1187   return self->OnAddTimeout(raw_timeout);
1188 }
1189
1190 // static
1191 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
1192   Bus* self = static_cast<Bus*>(data);
1193   self->OnRemoveTimeout(raw_timeout);
1194 }
1195
1196 // static
1197 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
1198   Bus* self = static_cast<Bus*>(data);
1199   self->OnToggleTimeout(raw_timeout);
1200 }
1201
1202 // static
1203 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
1204                                        DBusDispatchStatus status,
1205                                        void* data) {
1206   Bus* self = static_cast<Bus*>(data);
1207   self->OnDispatchStatusChanged(connection, status);
1208 }
1209
1210 // static
1211 DBusHandlerResult Bus::OnConnectionDisconnectedFilter(
1212     DBusConnection* connection,
1213     DBusMessage* message,
1214     void* data) {
1215   if (dbus_message_is_signal(message,
1216                              DBUS_INTERFACE_LOCAL,
1217                              kDisconnectedSignal)) {
1218     Bus* self = static_cast<Bus*>(data);
1219     self->OnConnectionDisconnected(connection);
1220     return DBUS_HANDLER_RESULT_HANDLED;
1221   }
1222   return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
1223 }
1224
1225 // static
1226 DBusHandlerResult Bus::OnServiceOwnerChangedFilter(
1227     DBusConnection* connection,
1228     DBusMessage* message,
1229     void* data) {
1230   if (dbus_message_is_signal(message,
1231                              DBUS_INTERFACE_DBUS,
1232                              kNameOwnerChangedSignal)) {
1233     Bus* self = static_cast<Bus*>(data);
1234     self->OnServiceOwnerChanged(message);
1235   }
1236   // Always return unhandled to let others, e.g. ObjectProxies, handle the same
1237   // signal.
1238   return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
1239 }
1240
1241 }  // namespace dbus