#include "config.hpp"
#include "ipc/exception.hpp"
-#include "ipc/internals/utils.hpp"
#include "ipc/internals/acceptor.hpp"
#include "logger/logger.hpp"
#include <poll.h>
#include <cerrno>
#include <cstring>
-#include <chrono>
#include <vector>
namespace vasum {
namespace ipc {
Acceptor::Acceptor(const std::string& socketPath, const NewConnectionCallback& newConnectionCallback)
- : mNewConnectionCallback(newConnectionCallback),
+ : mIsRunning(false),
+ mNewConnectionCallback(newConnectionCallback),
mSocket(Socket::createSocket(socketPath))
{
LOGT("Creating Acceptor for socket " << socketPath);
fds[1].fd = mSocket.getFD();
fds[1].events = POLLIN;
- // Main loop
- bool isRunning = true;
- while (isRunning) {
+ mIsRunning = true;
+ while (mIsRunning) {
LOGT("Waiting for new connections...");
int ret = ::poll(fds.data(), fds.size(), -1 /*blocking call*/);
// Check for incoming connections
if (fds[1].revents & POLLIN) {
fds[1].revents = 0;
- std::shared_ptr<Socket> tmpSocket = mSocket.accept();
- mNewConnectionCallback(tmpSocket);
+ handleConnection();
}
// Check for incoming events
if (fds[0].revents & POLLIN) {
fds[0].revents = 0;
-
- if (mEventQueue.receive() == Event::FINISH) {
- LOGD("Event FINISH");
- isRunning = false;
- break;
- }
+ handleEvent();
}
}
LOGT("Exiting run");
}
+void Acceptor::handleConnection()
+{
+ std::shared_ptr<Socket> tmpSocket = mSocket.accept();
+ mNewConnectionCallback(tmpSocket);
+}
+
+void Acceptor::handleEvent()
+{
+ if (mEventQueue.receive() == Event::FINISH) {
+ LOGD("Event FINISH");
+ mIsRunning = false;
+ }
+}
+
+FileDescriptor Acceptor::getEventFD()
+{
+ return mEventQueue.getFD();
+}
+
+FileDescriptor Acceptor::getConnectionFD()
+{
+ return mSocket.getFD();
+}
+
} // namespace ipc
} // namespace vasum
#include "ipc/internals/socket.hpp"
#include "ipc/internals/event-queue.hpp"
+#include "ipc/types.hpp"
#include <string>
#include <thread>
*/
void stop();
+ /**
+ * Handle one incoming connection.
+ * Used with external polling
+ */
+ void handleConnection();
+
+ /**
+ * Handle one event from the internal event's queue
+ * Used with external polling
+ */
+ void handleEvent();
+
+ /**
+ * @return file descriptor of internal event's queue
+ */
+ FileDescriptor getEventFD();
+
+ /**
+ * @return file descriptor for the connection socket
+ */
+ FileDescriptor getConnectionFD();
+
private:
enum class Event : int {
FINISH // Shutdown request
};
+ bool mIsRunning;
+
NewConnectionCallback mNewConnectionCallback;
Socket mSocket;
mRemovedPeerCallback = removedPeerCallback;
}
+FileDescriptor Processor::getEventFD()
+{
+ return mEventQueue.getFD();
+}
+
void Processor::removeMethod(const MethodID methodID)
{
LOGT("Removing method " << methodID);
peerFD = socketPtr->getFD();
SocketInfo socketInfo(peerFD, std::move(socketPtr));
mNewSockets.push(std::move(socketInfo));
+ mEventQueue.send(Event::ADD_PEER);
}
LOGI("New peer added. Id: " << peerFD);
- mEventQueue.send(Event::ADD_PEER);
return peerFD;
}
Lock lock(mSocketsMutex);
RemovePeerRequest request(peerFD, conditionPtr);
mPeersToDelete.push(std::move(request));
+ mEventQueue.send(Event::REMOVE_PEER);
}
- mEventQueue.send(Event::REMOVE_PEER);
auto isPeerDeleted = [&peerFD, this]()->bool {
Lock lock(mSocketsMutex);
void Processor::resetPolling()
{
+ if (!isStarted()) {
+ return;
+ }
+
LOGI("Resetting polling");
// Setup polling on eventfd and sockets
Lock lock(mSocketsMutex);
}
// Check for incoming events
- if (handleEvent()) {
- // mFDs changed
- continue;
+ if (mFDs[0].revents & POLLIN) {
+ mFDs[0].revents &= ~(POLLIN);
+ if (handleEvent()) {
+ // mFDs changed
+ continue;
+ }
}
+
}
cleanCommunication();
}
-
bool Processor::handleLostConnections()
{
std::vector<FileDescriptor> peersToRemove;
-
{
Lock lock(mSocketsMutex);
for (unsigned int i = 1; i < mFDs.size(); ++i) {
return !peersToRemove.empty();
}
+bool Processor::handleLostConnection(const FileDescriptor peerFD)
+{
+ removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
+ return true;
+}
+
bool Processor::handleInputs()
{
- std::vector<std::shared_ptr<Socket>> socketsWithInput;
+ std::vector<FileDescriptor> peersWithInput;
{
Lock lock(mSocketsMutex);
for (unsigned int i = 1; i < mFDs.size(); ++i) {
if (mFDs[i].revents & POLLIN) {
mFDs[i].revents &= ~(POLLIN);
- socketsWithInput.push_back(mSockets[mFDs[i].fd]);
+ peersWithInput.push_back(mFDs[i].fd);
}
}
}
bool pollChanged = false;
// Handle input outside the critical section
- for (const auto& socketPtr : socketsWithInput) {
- pollChanged = pollChanged || handleInput(*socketPtr);
+ for (const FileDescriptor peerFD : peersWithInput) {
+ pollChanged = pollChanged || handleInput(peerFD);
}
return pollChanged;
}
-bool Processor::handleInput(const Socket& socket)
+bool Processor::handleInput(const FileDescriptor peerFD)
{
LOGT("Handle incoming data");
+
+ std::shared_ptr<Socket> socketPtr;
+ try {
+ socketPtr = mSockets.at(peerFD);
+ } catch (const std::out_of_range&) {
+ LOGE("No such peer: " << peerFD);
+ return false;
+ }
+
MethodID methodID;
MessageID messageID;
{
- Socket::Guard guard = socket.getGuard();
- socket.read(&methodID, sizeof(methodID));
- socket.read(&messageID, sizeof(messageID));
+ Socket::Guard guard = socketPtr->getGuard();
+ try {
+ socketPtr->read(&methodID, sizeof(methodID));
+ socketPtr->read(&messageID, sizeof(messageID));
+
+ } catch (const IPCException& e) {
+ LOGE("Error during reading the socket");
+ removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
+ return true;
+ }
if (methodID == RETURN_METHOD_ID) {
- return onReturnValue(socket, messageID);
+ return onReturnValue(*socketPtr, messageID);
} else {
Lock lock(mCallsMutex);
// Method
std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(methodID);
lock.unlock();
- return onRemoteCall(socket, methodID, messageID, methodCallbacks);
+ return onRemoteCall(*socketPtr, methodID, messageID, methodCallbacks);
} else if (mSignalsCallbacks.count(methodID)) {
// Signal
std::shared_ptr<SignalHandlers> signalCallbacks = mSignalsCallbacks.at(methodID);
lock.unlock();
- return onRemoteSignal(socket, methodID, messageID, signalCallbacks);
+ return onRemoteSignal(*socketPtr, methodID, messageID, signalCallbacks);
} else {
// Nothing
lock.unlock();
LOGW("No method or signal callback for methodID: " << methodID);
- removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
+ removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
return true;
}
}
bool Processor::handleEvent()
{
- if (!(mFDs[0].revents & POLLIN)) {
- // No event to serve
- return false;
- }
-
- mFDs[0].revents &= ~(POLLIN);
-
switch (mEventQueue.receive()) {
case Event::FINISH: {
LOGD("Event FINISH");
// Broadcast the new signal to peers
LOGW("Sending handled signals");
- std::list<FileDescriptor> peersIDs;
+ std::list<FileDescriptor> peersFDs;
{
Lock lock(mSocketsMutex);
for (const auto kv : mSockets) {
- peersIDs.push_back(kv.first);
+ peersFDs.push_back(kv.first);
}
}
}
auto data = std::make_shared<RegisterSignalsMessage>(ids);
- for (const FileDescriptor peerFD : peersIDs) {
+ for (const FileDescriptor peerFD : peersFDs) {
callInternal<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
peerFD,
data,
* - new way to generate UIDs
* - callbacks for serialization/parsing
* - store Sockets in a vector, maybe SocketStore?
+* - fix valgrind tests
*
*
*/
void signal(const MethodID methodID,
const std::shared_ptr<SentDataType>& data);
+ /**
+ * Removes one peer.
+ * Handler used in external polling.
+ *
+ * @param peerFD file description identifying the peer
+ * @return should the polling structure be rebuild
+ */
+ bool handleLostConnection(const FileDescriptor peerFD);
+
+ /**
+ * Handles input from one peer.
+ * Handler used in external polling.
+ *
+ * @param peerFD file description identifying the peer
+ * @return should the polling structure be rebuild
+ */
+ bool handleInput(const FileDescriptor peerFD);
+
+ /**
+ * Handle one event from the internal event's queue
+ *
+ * @return should the polling structure be rebuild
+ */
+ bool handleEvent();
+
+ /**
+ * @return file descriptor for the internal event's queue
+ */
+ FileDescriptor getEventFD();
private:
typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
static void discardResultHandler(Status, std::shared_ptr<ReceivedDataType>&) {}
void run();
- bool handleEvent();
bool onCall();
bool onNewPeer();
bool onRemovePeer();
bool handleLostConnections();
bool handleInputs();
- bool handleInput(const Socket& socket);
+
bool onReturnValue(const Socket& socket,
const MessageID messageID);
bool onRemoteCall(const Socket& socket,
mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
}
- if (isStarted()) {
- // Broadcast the new signal to peers
- std::vector<MethodID> ids {methodID};
- auto data = std::make_shared<RegisterSignalsMessage>(ids);
+ std::vector<MethodID> ids {methodID};
+ auto data = std::make_shared<RegisterSignalsMessage>(ids);
- std::list<FileDescriptor> peersIDs;
- {
- Lock lock(mSocketsMutex);
- for (const auto kv : mSockets) {
- peersIDs.push_back(kv.first);
- }
+ std::list<FileDescriptor> peersFDs;
+ {
+ Lock lock(mSocketsMutex);
+ for (const auto kv : mSockets) {
+ peersFDs.push_back(kv.first);
}
+ }
- for (const FileDescriptor peerFD : peersIDs) {
- callSync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
- peerFD,
- data,
- DEFAULT_METHOD_TIMEOUT);
- }
+ for (const FileDescriptor peerFD : peersFDs) {
+ callSync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
+ peerFD,
+ data,
+ DEFAULT_METHOD_TIMEOUT);
}
+
}
template<typename SentDataType, typename ReceivedDataType>
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& process)
{
- if (!isStarted()) {
- LOGE("The Processor thread is not started. Can't send any data.");
- throw IPCException("The Processor thread is not started. Can't send any data.");
- }
-
return callInternal<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
}
void Processor::signal(const MethodID methodID,
const std::shared_ptr<SentDataType>& data)
{
- if (!isStarted()) {
- LOGE("The Processor thread is not started. Can't send any data.");
- throw IPCException("The Processor thread is not started. Can't send any data.");
- }
-
- std::list<FileDescriptor> peersIDs;
+ std::list<FileDescriptor> peersFDs;
{
Lock lock(mSocketsMutex);
- peersIDs = mSignalsPeers[methodID];
+ peersFDs = mSignalsPeers[methodID];
}
- for (const FileDescriptor peerFD : peersIDs) {
+ for (const FileDescriptor peerFD : peersFDs) {
Lock lock(mCallsMutex);
mCalls.push<SentDataType>(methodID, peerFD, data);
mEventQueue.send(Event::CALL);
// Neglected errors
LOGD("Retrying write");
} else {
- LOGE("Error during reading: " + std::string(strerror(errno)));
- throw IPCException("Error during reading: " + std::string(strerror(errno)));
+ LOGE("Error during writing: " + std::string(strerror(errno)));
+ throw IPCException("Error during writing: " + std::string(strerror(errno)));
}
if (nTotal >= size) {
--- /dev/null
+/*
+* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+* Contact: Jan Olszak <j.olszak@samsung.com>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Class for creating a dedicated GSource
+ */
+
+
+#include "config.hpp"
+
+#include "ipc/ipc-gsource.hpp"
+
+#if GLIB_CHECK_VERSION(2,36,0)
+
+#include "logger/logger.hpp"
+#include <algorithm>
+
+namespace vasum {
+namespace ipc {
+
+namespace {
+
+
+GIOCondition conditions = static_cast<GIOCondition>(G_IO_IN |
+ G_IO_ERR |
+ G_IO_HUP);
+}
+
+
+IPCGSource::IPCGSource(const std::vector<FileDescriptor> fds,
+ const HandlerCallback& handlerCallback)
+ : mHandlerCallback(handlerCallback)
+{
+ LOGD("Constructing IPCGSource");
+ for (const FileDescriptor fd : fds) {
+ addFD(fd);
+ }
+}
+
+IPCGSource::~IPCGSource()
+{
+ LOGD("Destroying IPCGSource");
+ g_source_destroy(&mGSource);
+
+}
+
+IPCGSource* IPCGSource::create(const std::vector<FileDescriptor>& fds,
+ const HandlerCallback& handlerCallback)
+{
+ LOGD("Creating IPCGSource");
+
+ static GSourceFuncs funcs = { &IPCGSource::prepare,
+ &IPCGSource::check,
+ &IPCGSource::dispatch,
+ &IPCGSource::finalize,
+ nullptr,
+ nullptr
+ };
+
+ // New GSource
+ GSource* gSource = g_source_new(&funcs, sizeof(IPCGSource));
+ g_source_set_priority(gSource, G_PRIORITY_HIGH);
+
+ // Fill additional data
+ IPCGSource* source = reinterpret_cast<IPCGSource*>(gSource);
+ return new(source) IPCGSource(fds, handlerCallback);
+}
+
+
+void IPCGSource::addFD(const FileDescriptor fd)
+{
+ if (!&mGSource) {
+ // In case it's called as a callback but the IPCGSource is destroyed
+ return;
+ }
+
+ LOGD("Adding fd to glib");
+ gpointer tag = g_source_add_unix_fd(&mGSource,
+ fd,
+ conditions);
+ FDInfo fdInfo(tag, fd);
+ mFDInfos.push_back(std::move(fdInfo));
+}
+
+void IPCGSource::removeFD(const FileDescriptor fd)
+{
+ if (!&mGSource) {
+ // In case it's called as a callback but the IPCGSource is destroyed
+ return;
+ }
+
+ LOGD("Removing fd from glib");
+ auto it = std::find(mFDInfos.begin(), mFDInfos.end(), fd);
+ if (it == mFDInfos.end()) {
+ LOGE("No such fd");
+ return;
+ }
+ g_source_remove_unix_fd(&mGSource, it->tag);
+ mFDInfos.erase(it);
+}
+
+guint IPCGSource::attach(GMainContext* context)
+{
+ LOGD("Attaching to GMainContext");
+ return g_source_attach(&mGSource, context);
+}
+
+gboolean IPCGSource::prepare(GSource* gSource, gint* timeout)
+{
+ if (!gSource) {
+ return FALSE;
+ }
+
+ *timeout = -1;
+
+ // TODO: Implement hasEvents() method in Client and Service and use it here as a callback:
+ // return source->hasEvents();
+ return FALSE;
+}
+
+gboolean IPCGSource::check(GSource* gSource)
+{
+ if (!gSource) {
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+gboolean IPCGSource::dispatch(GSource* gSource,
+ GSourceFunc /*callback*/,
+ gpointer /*userData*/)
+{
+ IPCGSource* source = reinterpret_cast<IPCGSource*>(gSource);
+
+ for (const FDInfo fdInfo : source->mFDInfos) {
+ GIOCondition cond = g_source_query_unix_fd(gSource, fdInfo.tag);
+ if (conditions & cond) {
+ source->mHandlerCallback(fdInfo.fd, cond);
+ }
+ }
+
+ return TRUE; // Don't remove the GSource from the GMainContext
+}
+
+void IPCGSource::finalize(GSource* gSource)
+{
+ if (gSource) {
+ IPCGSource* source = reinterpret_cast<IPCGSource*>(gSource);
+ source->~IPCGSource();
+ }
+}
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // GLIB_CHECK_VERSION
--- /dev/null
+/*
+* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+* Contact: Jan Olszak <j.olszak@samsung.com>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Class for creating a dedicated GSource
+ */
+
+#ifndef COMMON_IPC_IPC_GSOURCE_HPP
+#define COMMON_IPC_IPC_GSOURCE_HPP
+
+#include <glib.h>
+#if GLIB_CHECK_VERSION(2,36,0)
+
+#include "ipc/service.hpp"
+#include "ipc/types.hpp"
+#include "utils/callback-wrapper.hpp"
+#include <memory>
+
+
+namespace vasum {
+namespace ipc {
+
+/**
+ * Class for connecting to the glib's loop.
+ * Creates a dedicated GSource.
+ *
+ * It's supposed to be constructed ONLY with the static create method
+ * and destructed in a glib callback.
+ */
+struct IPCGSource {
+public:
+ typedef std::function<void(FileDescriptor fd, short pollEvent)> HandlerCallback;
+
+ IPCGSource() = delete;
+ IPCGSource(const IPCGSource&) = delete;
+ IPCGSource& operator=(const IPCGSource&) = delete;
+
+ /**
+ * New file descriptor to listen on.
+ *
+ * @param peerFD file descriptor
+ */
+ void addFD(const FileDescriptor peerFD);
+
+ /**
+ * Removes the file descriptor from the GSource
+ *
+ * @param peerFD file descriptor
+ */
+ void removeFD(const FileDescriptor peerFD);
+
+ /**
+ * Attach to the glib's GMainContext
+ *
+ * @param context where to connect
+ * @return result of the g_source_attach call
+ */
+ guint attach(GMainContext* context = nullptr);
+
+ /**
+ * Creates the IPCGSource class in the memory allocated by glib.
+ * Calls IPCGSource's constructor
+ *
+ * @param fds initial set of file descriptors
+ * @param handlerCallback event handling callback
+ *
+ * @return pointer to the IPCGSource
+ */
+ static IPCGSource* create(const std::vector<FileDescriptor>& fds,
+ const HandlerCallback& handlerCallback);
+
+private:
+
+ /**
+ * GSourceFuncs' callback
+ */
+ static gboolean prepare(GSource* source, gint* timeout);
+
+ /**
+ * GSourceFuncs' callback
+ */
+ static gboolean check(GSource* source);
+
+ /**
+ * GSourceFuncs' callback
+ */
+ static gboolean dispatch(GSource* source,
+ GSourceFunc callbacks,
+ gpointer userData);
+
+ /**
+ * GSourceFuncs' callback
+ */
+ static void finalize(GSource* source);
+
+
+
+ // Called only from IPCGSource::create
+ IPCGSource(const std::vector<FileDescriptor> fds,
+ const HandlerCallback& handlerCallback);
+
+ // Called only from IPCGSource::finalize
+ ~IPCGSource();
+
+ struct FDInfo {
+ FDInfo(gpointer tag, FileDescriptor fd)
+ : tag(tag), fd(fd) {}
+
+ bool operator==(const gpointer t)
+ {
+ return t == tag;
+ }
+
+ bool operator==(const FileDescriptor f)
+ {
+ return f == fd;
+ }
+
+ gpointer tag;
+ FileDescriptor fd;
+ };
+
+ GSource mGSource;
+ HandlerCallback mHandlerCallback;
+ std::vector<FDInfo> mFDInfos;
+};
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // GLIB_CHECK_VERSION
+
+#endif // COMMON_IPC_IPC_GSOURCE_HPP
LOGD("Stopped");
}
+std::vector<FileDescriptor> Service::getFDs()
+{
+ std::vector<FileDescriptor> fds;
+ fds.push_back(mAcceptor.getEventFD());
+ fds.push_back(mAcceptor.getConnectionFD());
+ fds.push_back(mProcessor.getEventFD());
+
+ return fds;
+}
+
+void Service::handle(const FileDescriptor fd, const short pollEvent)
+{
+ if (fd == mProcessor.getEventFD() && pollEvent & POLLIN) {
+ mProcessor.handleEvent();
+ return;
+
+ } else if (fd == mAcceptor.getConnectionFD() && pollEvent & POLLIN) {
+ mAcceptor.handleConnection();
+ return;
+
+ } else if (fd == mAcceptor.getEventFD() && pollEvent & POLLIN) {
+ mAcceptor.handleEvent();
+ return;
+
+ } else if (pollEvent & POLLIN) {
+ mProcessor.handleInput(fd);
+ return;
+
+ } else if (pollEvent & POLLHUP) {
+ mProcessor.handleLostConnection(fd);
+ return;
+ }
+}
+
+
void Service::setNewPeerCallback(const PeerCallback& newPeerCallback)
{
mProcessor.setNewPeerCallback(newPeerCallback);
void stop();
/**
+ * Used with an external polling loop
+ *
+ * @return vector of internal file descriptors
+ */
+ std::vector<FileDescriptor> getFDs();
+
+ /**
+ * Used with an external polling loop.
+ * Handles one event from the file descriptor.
+ *
+ * @param fd file descriptor
+ * @param pollEvent event on the fd. Defined in poll.h
+ *
+ */
+ void handle(const FileDescriptor fd, const short pollEvent);
+
+ /**
* Set the callback called for each new connection to a peer
*
* @param newPeerCallback the callback
* @brief Types definitions and helper functions
*/
+#include "config.hpp"
+
#include "ipc/types.hpp"
#include "logger/logger.hpp"
#include "ipc/service.hpp"
#include "ipc/client.hpp"
+#include "ipc/ipc-gsource.hpp"
#include "ipc/types.hpp"
+#include "utils/glib-loop.hpp"
#include "config/fields.hpp"
#include "logger/logger.hpp"
using namespace vasum;
using namespace vasum::ipc;
+using namespace vasum::utils;
+using namespace std::placeholders;
namespace fs = boost::filesystem;
namespace {
return data;
}
-FileDescriptor connect(Service& s, Client& c)
+FileDescriptor connect(Service& s, Client& c, bool serviceUsesGlib = false)
{
// Connects the Client to the Service and returns Clients FileDescriptor
-
std::mutex mutex;
std::condition_variable cv;
FileDescriptor peerFD = 0;
- auto newPeerCallback = [&cv, &peerFD, &mutex](const FileDescriptor newFileDescriptor) {
+ auto newPeerCallback = [&cv, &peerFD, &mutex](const FileDescriptor newFD) {
std::unique_lock<std::mutex> lock(mutex);
- peerFD = newFileDescriptor;
- cv.notify_one();
+ peerFD = newFD;
+ cv.notify_all();
};
- s.setNewPeerCallback(newPeerCallback);
- if (!s.isStarted()) {
- s.start();
+ if (!serviceUsesGlib) {
+ s.setNewPeerCallback(newPeerCallback);
+
+ if (!s.isStarted()) {
+ s.start();
+ }
+ } else {
+#if GLIB_CHECK_VERSION(2,36,0)
+
+ IPCGSource* serviceGSourcePtr = IPCGSource::create(s.getFDs(), std::bind(&Service::handle, &s, _1, _2));
+
+ auto agregateCallback = [&newPeerCallback, &serviceGSourcePtr](const FileDescriptor newFD) {
+ serviceGSourcePtr->addFD(newFD);
+ newPeerCallback(newFD);
+ };
+
+ s.setNewPeerCallback(agregateCallback);
+ s.setRemovedPeerCallback(std::bind(&IPCGSource::removeFD, serviceGSourcePtr, _1));
+
+ serviceGSourcePtr->attach();
+#endif // GLIB_CHECK_VERSION
+
}
c.start();
std::unique_lock<std::mutex> lock(mutex);
- BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(1000), [&peerFD]() {
+ BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(2000), [&peerFD]() {
return peerFD != 0;
}));
void testEcho(Client& c, const MethodID methodID)
{
std::shared_ptr<SendData> sentData(new SendData(34));
- std::shared_ptr<SendData> recvData = c.callSync<SendData, SendData>(methodID, sentData);
+ std::shared_ptr<SendData> recvData = c.callSync<SendData, SendData>(methodID, sentData, 1000);
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
}
+#if GLIB_CHECK_VERSION(2,36,0)
+
+BOOST_AUTO_TEST_CASE(ServiceGSource)
+{
+ ScopedGlibLoop loop;
+
+ std::atomic_bool isSignalCalled(false);
+ auto signalHandler = [&isSignalCalled](const FileDescriptor, std::shared_ptr<SendData>&) {
+ isSignalCalled = true;
+ };
+
+ Service s(socketPath);
+ s.addMethodHandler<SendData, SendData>(1, echoCallback);
+
+ Client c(socketPath);
+ s.addSignalHandler<SendData>(2, signalHandler);
+ connect(s, c, true);
+
+ testEcho(c, 1);
+
+ auto data = std::make_shared<SendData>(1);
+ c.signal<SendData>(2, data);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(100)); //TODO wait_for
+ BOOST_CHECK(isSignalCalled);
+}
+
+#endif // GLIB_CHECK_VERSION
+
// BOOST_AUTO_TEST_CASE(ConnectionLimitTest)
// {
// unsigned oldLimit = ipc::getMaxFDNumber();