* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// Workaround for libstdc++ bug
+#ifndef _GLIBCXX_USE_NANOSLEEP
+#define _GLIBCXX_USE_NANOSLEEP
+#endif
+
#include "DBusConnection.h"
#include "DBusInputStream.h"
+#include <algorithm>
#include <sstream>
#include <cassert>
#include <future>
+#include <chrono>
+#include <thread>
namespace CommonAPI {
namespace DBus {
&DBusConnection::onLibdbusObjectPathMessageThunk
};
-void DBusConnection::dispatch(std::shared_ptr<DBusConnection>* selfReference) {
- while (!stopDispatching_ && readWriteDispatch(10) && !selfReference->unique()) {
- if(pauseDispatching_) {
+
+//std::bind used to start the dispatch thread holds one reference, and the selfReference
+//created within the thread is the second. If only those two remain, no one but the
+//dispatch thread references the connection, which therefore can be finished.
+constexpr uint32_t ownUseCount = 2;
+
+void DBusConnection::dispatch() {
+ std::shared_ptr<DBusConnection> selfReference = this->shared_from_this();
+ while (!stopDispatching_ && readWriteDispatch(10) && selfReference.use_count() > ownUseCount) {
+ if (pauseDispatching_) {
dispatchSuspendLock_.lock();
dispatchSuspendLock_.unlock();
}
}
- delete selfReference;
-
}
bool DBusConnection::readWriteDispatch(int timeoutMilliseconds) {
dispatchThread_(NULL),
dbusObjectMessageHandler_(),
watchContext_(NULL),
- connectionNameCount_() {
+ connectionNameCount_(),
+ dispatchSource_(),
+ mainLoopContext_(std::shared_ptr<MainLoopContext>(NULL)),
+ enforcerThread(NULL) {
dbus_threads_init_default();
}
dispatchThread_(NULL),
dbusObjectMessageHandler_(),
watchContext_(NULL),
- connectionNameCount_() {
+ connectionNameCount_(),
+ dispatchSource_(),
+ mainLoopContext_(std::shared_ptr<MainLoopContext>(NULL)),
+ enforcerThread(NULL) {
dbus_threads_init_default();
}
initLibdbusSignalFilterAfterConnect();
- if(startDispatchThread) {
- std::shared_ptr<DBusConnection>* ptr = new std::shared_ptr<DBusConnection>(this->shared_from_this());
- dispatchThread_ = new std::thread(&DBusConnection::dispatch, this, ptr);
+ if (startDispatchThread) {
+ dispatchThread_ = new std::thread(std::bind(&DBusConnection::dispatch, this->shared_from_this()));
}
stopDispatching_ = !startDispatchThread;
delete dbusMessageReplyAsyncHandler;
}
+
+//Would not be needed if libdbus would actually handle its timeouts for pending calls.
+void DBusConnection::enforceAsynchronousTimeouts() const {
+ enforeTimeoutMutex.lock();
+
+ while (!timeoutMap.empty()) {
+ auto minTimeoutElement = std::min_element(timeoutMap.begin(), timeoutMap.end(),
+ [] (const TimeoutMapElement& lhs, const TimeoutMapElement& rhs) {
+ return std::get<0>(lhs.second) < std::get<0>(rhs.second);
+ });
+
+ int minTimeout = std::get<0>(minTimeoutElement->second);
+
+ enforeTimeoutMutex.unlock();
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(minTimeout));
+
+ enforeTimeoutMutex.lock();
+
+ for (auto it = timeoutMap.begin(); it != timeoutMap.end(); ) {
+ int& currentTimeout = std::get<0>(it->second);
+ currentTimeout -= minTimeout;
+ if (currentTimeout <= 0) {
+ DBusPendingCall* libdbusPendingCall = it->first;
+
+ if (!dbus_pending_call_get_completed(libdbusPendingCall)) {
+ dbus_pending_call_cancel(libdbusPendingCall);
+ DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second);
+ DBusMessage& dbusMessageCall = std::get<2>(it->second);
+ asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT));
+ delete asyncHandler;
+ }
+ dbus_pending_call_unref(libdbusPendingCall);
+ it = timeoutMap.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
+
+ //Must be atomic with respect to local threading
+ auto threadPtr = enforcerThread;
+ enforcerThread = NULL;
+ enforeTimeoutMutex.unlock();
+
+ threadPtr->detach();
+ delete threadPtr;
+}
+
std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync(
- const DBusMessage& dbusMessage,
- std::unique_ptr<DBusMessageReplyAsyncHandler> dbusMessageReplyAsyncHandler,
- int timeoutMilliseconds) const {
+ const DBusMessage& dbusMessage,
+ std::unique_ptr<DBusMessageReplyAsyncHandler> dbusMessageReplyAsyncHandler,
+ int timeoutMilliseconds) const {
assert(dbusMessage);
assert(isConnected());
DBusPendingCall* libdbusPendingCall;
dbus_bool_t libdbusSuccess;
- libdbusSuccess = dbus_connection_send_with_reply(
- libdbusConnection_,
- dbusMessage.libdbusMessage_,
- &libdbusPendingCall,
- timeoutMilliseconds);
+ libdbusSuccess = dbus_connection_send_with_reply(libdbusConnection_,
+ dbusMessage.libdbusMessage_,
+ &libdbusPendingCall,
+ timeoutMilliseconds);
if (!libdbusSuccess || !libdbusPendingCall) {
- dbusMessageReplyAsyncHandler->onDBusMessageReply(CallStatus::CONNECTION_FAILED, dbusMessage);
+ dbusMessageReplyAsyncHandler->onDBusMessageReply(CallStatus::CONNECTION_FAILED, dbusMessage.createMethodError(DBUS_ERROR_DISCONNECTED));
return dbusMessageReplyAsyncHandler->getFuture();
}
onLibdbusDataCleanup);
if (!libdbusSuccess) {
- dbusMessageReplyAsyncHandler->onDBusMessageReply(CallStatus::OUT_OF_MEMORY, dbusMessage);
+ dbusMessageReplyAsyncHandler->onDBusMessageReply(CallStatus::OUT_OF_MEMORY, dbusMessage);
dbus_pending_call_unref(libdbusPendingCall);
return dbusMessageReplyAsyncHandler->getFuture();
}
- return dbusMessageReplyAsyncHandler.release()->getFuture();
+ DBusMessageReplyAsyncHandler* replyAsyncHandler = dbusMessageReplyAsyncHandler.release();
+
+ const bool mainloopContextIsPresent = (bool) mainLoopContext_.lock();
+ if (!mainloopContextIsPresent && timeoutMilliseconds != DBUS_TIMEOUT_INFINITE) {
+ dbus_pending_call_ref(libdbusPendingCall);
+ std::tuple<int, DBusMessageReplyAsyncHandler*, DBusMessage> toInsert {timeoutMilliseconds, replyAsyncHandler, dbusMessage};
+
+ enforeTimeoutMutex.lock();
+ timeoutMap.insert( {libdbusPendingCall, toInsert } );
+ if (!enforcerThread) {
+ enforcerThread = new std::thread(std::bind(&DBusConnection::enforceAsynchronousTimeouts, this->shared_from_this()));
+ }
+ enforeTimeoutMutex.unlock();
+ }
+
+ return replyAsyncHandler->getFuture();
}
DBusMessage DBusConnection::sendDBusMessageWithReplyAndBlock(const DBusMessage& dbusMessage,
DBusError& dbusError,
int timeoutMilliseconds) const {
- auto selfReference = this->shared_from_this();
-
assert(dbusMessage);
assert(!dbusError);
assert(isConnected());
return availableServiceInstances;
}
- while (timeout.count() > 0) {
- size_t dbusServiceResolvingCount = getResolvedServiceInstances(interfaceName, availableServiceInstances);
+ size_t dbusServiceResolvingCount = getResolvedServiceInstances(interfaceName, availableServiceInstances);
- if (!dbusServiceResolvingCount) {
- break;
- }
-
- // wait for unknown and acquiring services, then restart from the beginning
- typedef std::chrono::high_resolution_clock clock;
- clock::time_point startTimePoint = clock::now();
+ if (!dbusServiceResolvingCount) {
+ return availableServiceInstances;
+ }
- size_t wakeupCount = 0;
- dbusServiceChanged_.wait_for(
- dbusServicesLock,
- timeout,
- [&] {
- wakeupCount++;
- return wakeupCount > dbusServiceResolvingCount;
- });
+ dbusServiceChanged_.wait(
+ dbusServicesLock,
+ [&] {
+ return getNumResolvingServiceInstances() == 0;
+ });
- if (wakeupCount > 1) {
- getResolvedServiceInstances(interfaceName, availableServiceInstances);
- break;
- }
+ getResolvedServiceInstances(interfaceName, availableServiceInstances);
- std::chrono::milliseconds elapsedWaitTime =
- std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() - startTimePoint);
- timeout -= elapsedWaitTime;
- }
// maybe partial list but it contains everything we know for now
return availableServiceInstances;
size_t stillResolvingCount = getResolvedServiceInstances(interfaceName, availableServiceInstances);
- if(stillResolvingCount == 0 && !dbusServices_.empty()) {
+ if (stillResolvingCount == 0 && !dbusServices_.empty()) {
callback(availableServiceInstances);
} else {
- //This is a necessary hack, because libdbus never returns from async calls if a
- //service handles it's answers the wrong way. Here an artificial timeout is
- //added to circumvent this limitation.
+ //Necessary as service discovery might need some time, but the async version of "getAvailableServiceInstances"
+ //shall return without delay.
std::thread(
[this, callback, interfaceName, domainName](std::shared_ptr<DBusServiceRegistry> selfRef) {
auto availableServiceInstances = getAvailableServiceInstances(interfaceName, domainName);
}
+size_t DBusServiceRegistry::getNumResolvingServiceInstances() {
+ size_t dbusServicesResolvingCount = 0;
+
+ // caller must hold lock
+ auto dbusServiceIterator = dbusServices_.begin();
+ while (dbusServiceIterator != dbusServices_.end()) {
+ DBusServiceState& dbusServiceState = dbusServiceIterator->second.first;
+
+ switch (dbusServiceState) {
+ case DBusServiceState::AVAILABLE:
+ dbusServicesResolvingCount++;
+ break;
+
+ case DBusServiceState::RESOLVING:
+ dbusServicesResolvingCount++;
+ break;
+
+ default:
+ break;
+ }
+
+ dbusServiceIterator++;
+ }
+
+ return dbusServicesResolvingCount;
+}
+
+
size_t DBusServiceRegistry::getResolvedServiceInstances(const std::string& dbusInterfaceName, std::vector<std::string>& availableServiceInstances) {
size_t dbusServicesResolvingCount = 0;