From b25dcc4c14d972123e109517585ff36989d05d1a Mon Sep 17 00:00:00 2001 From: Jan Olszak Date: Mon, 3 Nov 2014 08:52:00 +0200 Subject: [PATCH] IPC via UX sockets [Bug/Feature] IPC for communication between the library and daemon [Cause] N/A [Solution] N/A [Verification] Build, install, run tests Change-Id: I9880c7b4f3104b93f38d0e6ad86762fb17013d28 --- common/ipc/client.cpp | 80 +++++++ common/ipc/client.hpp | 156 +++++++++++++ common/ipc/exception.hpp | 45 ++++ common/ipc/internals/acceptor.cpp | 131 +++++++++++ common/ipc/internals/acceptor.hpp | 87 +++++++ common/ipc/internals/event-queue.hpp | 112 +++++++++ common/ipc/internals/eventfd.cpp | 77 +++++++ common/ipc/internals/eventfd.hpp | 62 +++++ common/ipc/internals/processor.cpp | 434 +++++++++++++++++++++++++++++++++++ common/ipc/internals/processor.hpp | 418 +++++++++++++++++++++++++++++++++ common/ipc/internals/socket.cpp | 200 ++++++++++++++++ common/ipc/internals/socket.hpp | 116 ++++++++++ common/ipc/internals/utils.cpp | 134 +++++++++++ common/ipc/internals/utils.hpp | 77 +++++++ common/ipc/service.cpp | 86 +++++++ common/ipc/service.hpp | 168 ++++++++++++++ common/ipc/types.hpp | 48 ++++ packaging/security-containers.spec | 1 + server/CMakeLists.txt | 2 +- tests/unit_tests/CMakeLists.txt | 3 +- tests/unit_tests/ipc/ut-ipc.cpp | 391 +++++++++++++++++++++++++++++++ 21 files changed, 2826 insertions(+), 2 deletions(-) create mode 100644 common/ipc/client.cpp create mode 100644 common/ipc/client.hpp create mode 100644 common/ipc/exception.hpp create mode 100644 common/ipc/internals/acceptor.cpp create mode 100644 common/ipc/internals/acceptor.hpp create mode 100644 common/ipc/internals/event-queue.hpp create mode 100644 common/ipc/internals/eventfd.cpp create mode 100644 common/ipc/internals/eventfd.hpp create mode 100644 common/ipc/internals/processor.cpp create mode 100644 common/ipc/internals/processor.hpp create mode 100644 common/ipc/internals/socket.cpp create mode 100644 common/ipc/internals/socket.hpp create mode 100644 common/ipc/internals/utils.cpp create mode 100644 common/ipc/internals/utils.hpp create mode 100644 common/ipc/service.cpp create mode 100644 common/ipc/service.hpp create mode 100644 common/ipc/types.hpp create mode 100644 tests/unit_tests/ipc/ut-ipc.cpp diff --git a/common/ipc/client.cpp b/common/ipc/client.cpp new file mode 100644 index 0000000..c1651f3 --- /dev/null +++ b/common/ipc/client.cpp @@ -0,0 +1,80 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Handling client connections + */ + +#include "config.hpp" + +#include "ipc/client.hpp" +#include "ipc/internals/socket.hpp" +#include "ipc/exception.hpp" + +namespace security_containers { +namespace ipc { + +Client::Client(const std::string& socketPath) + : mSocketPath(socketPath) +{ + LOGD("Creating client"); +} + +Client::~Client() +{ + LOGD("Destroying client..."); + try { + stop(); + } catch (IPCException& e) { + LOGE("Error in Client's destructor: " << e.what()); + } + LOGD("Destroyed client"); +} + +void Client::start() +{ + LOGD("Starting client..."); + + // Initialize the connection with the server + LOGD("Connecting to " + mSocketPath); + auto socketPtr = std::make_shared(Socket::connectSocket(mSocketPath)); + mServiceID = mProcessor.addPeer(socketPtr); + + // Start listening + mProcessor.start(); + + LOGD("Started client"); +} + +void Client::stop() +{ + LOGD("Stopping client..."); + mProcessor.stop(); + LOGD("Stopped"); +} + +void Client::removeMethod(const MethodID methodID) +{ + LOGD("Removing method id: " << methodID); + mProcessor.removeMethod(methodID); +} + +} // namespace ipc +} // namespace security_containers diff --git a/common/ipc/client.hpp b/common/ipc/client.hpp new file mode 100644 index 0000000..7429b24 --- /dev/null +++ b/common/ipc/client.hpp @@ -0,0 +1,156 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Handling client connections + */ + +#ifndef COMMON_IPC_CLIENT_HPP +#define COMMON_IPC_CLIENT_HPP + +#include "ipc/internals/processor.hpp" +#include "ipc/types.hpp" +#include "logger/logger.hpp" + +#include + +namespace security_containers { +namespace ipc { + +/** + * This class wraps communication via UX sockets for client applications. + * It uses serialization mechanism from libConfig. + * + * There is one additional thread: + * - PROCESSOR is responsible for the communication and calling the callbacks + * + * For message format @see ipc::Processor + */ +class Client { +public: + typedef Processor::MethodID MethodID; + + /** + * @param serverPath path to the server's socket + */ + Client(const std::string& serverPath); + ~Client(); + + Client(const Client&) = delete; + Client& operator=(const Client&) = delete; + + /** + * Starts the worker thread + */ + void start(); + + /** + * Stops all worker thread + */ + void stop(); + + /** + * Saves the callback connected to the method id. + * When a message with the given method id is received + * the data will be parsed and passed to this callback. + * + * @param methodID API dependent id of the method + * @param methodCallback method handling implementation + */ + template + void addMethodHandler(const MethodID methodID, + const typename MethodHandler::type& method); + + /** + * Removes the callback + * + * @param methodID API dependent id of the method + */ + void removeMethod(const MethodID methodID); + + /** + * Synchronous method call. + * + * @param methodID API dependent id of the method + * @param data data to send + * @param timeoutMS how long to wait for the return value before throw + * @return result data + */ + template + std::shared_ptr callSync(const MethodID methodID, + const std::shared_ptr& data, + unsigned int timeoutMS = 500); + + /** + * Asynchronous method call. The return callback will be called on + * return data arrival. It will be run in the PROCESSOR thread. + * + * + * @param methodID API dependent id of the method + * @param sendCallback callback for data serialization + * @param resultCallback callback for result serialization and handling + */ + template + void callAsync(const MethodID methodID, + const std::shared_ptr& data, + const typename ResultHandler::type& resultCallback); + +private: + Processor::PeerID mServiceID; + Processor mProcessor; + std::string mSocketPath; +}; + +template +void Client::addMethodHandler(const MethodID methodID, + const typename MethodHandler::type& method) +{ + LOGD("Adding method with id " << methodID); + mProcessor.addMethodHandler(methodID, method); + LOGD("Added method with id " << methodID); +} + +template +std::shared_ptr Client::callSync(const MethodID methodID, + const std::shared_ptr& data, + unsigned int timeoutMS) +{ + LOGD("Sync calling method: " << methodID); + return mProcessor.callSync(methodID, mServiceID, data, timeoutMS); +} + +template +void Client::callAsync(const MethodID methodID, + const std::shared_ptr& data, + const typename ResultHandler::type& resultCallback) +{ + LOGD("Async calling method: " << methodID); + mProcessor.callAsync(methodID, + mServiceID, + data, + resultCallback); + LOGD("Async called method: " << methodID); +} + +} // namespace ipc +} // namespace security_containers + +#endif // COMMON_IPC_CLIENT_HPP diff --git a/common/ipc/exception.hpp b/common/ipc/exception.hpp new file mode 100644 index 0000000..67d9c86 --- /dev/null +++ b/common/ipc/exception.hpp @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved + * + * Contact: Jan Olszak + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Exceptions for the IPC + */ + + +#ifndef COMMON_IPC_EXCEPTION_HPP +#define COMMON_IPC_EXCEPTION_HPP + +#include "base-exception.hpp" + +namespace security_containers { + + +/** + * Base class for exceptions in IPC + */ +struct IPCException: public SecurityContainersException { + IPCException(const std::string& error) : SecurityContainersException(error) {} +}; + + +} + + +#endif // COMMON_IPC_EXCEPTION_HPP diff --git a/common/ipc/internals/acceptor.cpp b/common/ipc/internals/acceptor.cpp new file mode 100644 index 0000000..9738546 --- /dev/null +++ b/common/ipc/internals/acceptor.cpp @@ -0,0 +1,131 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Class for accepting new connections + */ + +#include "config.hpp" + +#include "ipc/exception.hpp" +#include "ipc/internals/utils.hpp" +#include "ipc/internals/acceptor.hpp" +#include "logger/logger.hpp" + +#include +#include +#include +#include +#include + +namespace security_containers { +namespace ipc { + +Acceptor::Acceptor(const std::string& socketPath, const NewConnectionCallback& newConnectionCallback) + : mNewConnectionCallback(newConnectionCallback), + mSocket(Socket::createSocket(socketPath)) +{ + LOGT("Creating Acceptor for socket " << socketPath); +} + +Acceptor::~Acceptor() +{ + LOGT("Destroying Acceptor"); + try { + stop(); + } catch (IPCException& e) { + LOGE("Error in destructor: " << e.what()); + } + LOGT("Destroyed Acceptor"); +} + +void Acceptor::start() +{ + LOGT("Starting Acceptor"); + if (!mThread.joinable()) { + mThread = std::thread(&Acceptor::run, this); + } + LOGT("Started Acceptor"); +} + +void Acceptor::stop() +{ + LOGT("Stopping Acceptor"); + if (mThread.joinable()) { + LOGT("Event::FINISH -> Acceptor"); + mEventQueue.send(Event::FINISH); + LOGT("Waiting for Acceptor to finish"); + mThread.join(); + } + LOGT("Stopped Acceptor"); +} + +void Acceptor::run() +{ + // Setup polling structure + std::vector fds(2); + + fds[0].fd = mEventQueue.getFD(); + fds[0].events = POLLIN; + + fds[1].fd = mSocket.getFD(); + fds[1].events = POLLIN; + + // Main loop + bool isRunning = true; + while (isRunning) { + LOGT("Waiting for new connections..."); + + int ret = ::poll(fds.data(), fds.size(), -1 /*blocking call*/); + + LOGT("...Incoming connection!"); + + if (ret == -1 || ret == 0) { + if (errno == EINTR) { + continue; + } + LOGE("Error in poll: " << std::string(strerror(errno))); + throw IPCException("Error in poll: " + std::string(strerror(errno))); + break; + } + + // Check for incoming connections + if (fds[1].revents & POLLIN) { + fds[1].revents = 0; + std::shared_ptr tmpSocket = mSocket.accept(); + mNewConnectionCallback(tmpSocket); + } + + // Check for incoming events + if (fds[0].revents & POLLIN) { + fds[0].revents = 0; + + if (mEventQueue.receive() == Event::FINISH) { + LOGD("Event FINISH"); + isRunning = false; + break; + } + } + } + LOGT("Exiting run"); +} + +} // namespace ipc +} // namespace security_containers diff --git a/common/ipc/internals/acceptor.hpp b/common/ipc/internals/acceptor.hpp new file mode 100644 index 0000000..b863400 --- /dev/null +++ b/common/ipc/internals/acceptor.hpp @@ -0,0 +1,87 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Class for accepting new connections + */ + +#ifndef COMMON_IPC_INTERNALS_ACCEPTOR_HPP +#define COMMON_IPC_INTERNALS_ACCEPTOR_HPP + +#include "config.hpp" + +#include "ipc/internals/socket.hpp" +#include "ipc/internals/event-queue.hpp" + +#include +#include + +namespace security_containers { +namespace ipc { + +/** + * Accepts new connections and passes the new socket to a callback. + */ +class Acceptor { +public: + + typedef std::function& socketPtr)> NewConnectionCallback; + + /** + * Class for accepting new connections. + * + * @param socketPath path to the socket + * @param newConnectionCallback called on new connections + */ + Acceptor(const std::string& socketPath, + const NewConnectionCallback& newConnectionCallback); + ~Acceptor(); + + Acceptor(const Acceptor& acceptor) = delete; + Acceptor& operator=(const Acceptor&) = delete; + + /** + * Starts the thread accepting the new connections. + */ + void start(); + + /** + * Stops the accepting thread. + */ + void stop(); + +private: + enum class Event : int { + FINISH // Shutdown request + }; + + NewConnectionCallback mNewConnectionCallback; + Socket mSocket; + + EventQueue mEventQueue; + std::thread mThread; + + void run(); +}; + +} // namespace ipc +} // namespace security_containers + +#endif // COMMON_IPC_INTERNALS_ACCEPTOR_HPP diff --git a/common/ipc/internals/event-queue.hpp b/common/ipc/internals/event-queue.hpp new file mode 100644 index 0000000..82cb2ff --- /dev/null +++ b/common/ipc/internals/event-queue.hpp @@ -0,0 +1,112 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Class for passing events using eventfd mechanism + */ + +#ifndef COMMON_IPC_INTERNALS_EVENT_QUEUE_HPP +#define COMMON_IPC_INTERNALS_EVENT_QUEUE_HPP + +#include "ipc/exception.hpp" +#include "ipc/internals/eventfd.hpp" +#include "logger/logger.hpp" + +#include +#include +#include + +namespace security_containers { +namespace ipc { + + +/** + * This class implements a simple FIFO queue of events. + * One can listen for the event with select/poll/epoll. + * + * @tparam MessageType type to pass as event's value + */ +template +class EventQueue { +public: + EventQueue() = default; + ~EventQueue() = default; + EventQueue(const EventQueue& eventQueue) = delete; + EventQueue& operator=(const EventQueue&) = delete; + + /** + * @return reference to the event's file descriptor + */ + int getFD() const; + + /** + * Send an event of a given value + * + * @param value size of the buffer + */ + void send(const MessageType& mess); + + /** + * Receives the signal. + * Blocks if there is no event. + * + * @return event's value + */ + MessageType receive(); + +private: + typedef std::lock_guard Lock; + + std::mutex mCommunicationMutex; + std::queue mMessages; + + EventFD mEventFD; +}; + +template +int EventQueue::getFD() const +{ + return mEventFD.getFD(); +} + +template +void EventQueue::send(const MessageType& mess) +{ + Lock lock(mCommunicationMutex); + LOGT("Sending event"); + mMessages.push(mess); + mEventFD.send(); +} + +template +MessageType EventQueue::receive() +{ + Lock lock(mCommunicationMutex); + mEventFD.receive(); + LOGT("Received event"); + MessageType mess = mMessages.front(); + mMessages.pop(); + return mess; +} + +} // namespace ipc +} // namespace security_containers + +#endif // COMMON_IPC_INTERNALS_EVENT_QUEUE_HPP diff --git a/common/ipc/internals/eventfd.cpp b/common/ipc/internals/eventfd.cpp new file mode 100644 index 0000000..c8a17b6 --- /dev/null +++ b/common/ipc/internals/eventfd.cpp @@ -0,0 +1,77 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Linux socket wrapper + */ + +#include "config.hpp" + +#include "ipc/internals/eventfd.hpp" +#include "ipc/internals/utils.hpp" +#include "ipc/exception.hpp" +#include "logger/logger.hpp" + +#include +#include +#include +#include + +namespace security_containers { +namespace ipc { + +EventFD::EventFD() +{ + mFD = ::eventfd(0, EFD_SEMAPHORE); + if (mFD == -1) { + LOGE("Error in eventfd: " << std::string(strerror(errno))); + throw IPCException("Error in eventfd: " + std::string(strerror(errno))); + } +} + +EventFD::~EventFD() +{ + try { + ipc::close(mFD); + } catch (IPCException& e) { + LOGE("Error in Event's destructor: " << e.what()); + } +} + +int EventFD::getFD() const +{ + return mFD; +} + +void EventFD::send() +{ + const std::uint64_t toSend = 1; + ipc::write(mFD, &toSend, sizeof(toSend)); +} + +void EventFD::receive() +{ + std::uint64_t readBuffer; + ipc::read(mFD, &readBuffer, sizeof(readBuffer)); +} + + +} // namespace ipc +} // namespace security_containers diff --git a/common/ipc/internals/eventfd.hpp b/common/ipc/internals/eventfd.hpp new file mode 100644 index 0000000..9de6f17 --- /dev/null +++ b/common/ipc/internals/eventfd.hpp @@ -0,0 +1,62 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Eventfd wrapper + */ + +#ifndef COMMON_IPC_INTERNALS_EVENTFD_HPP +#define COMMON_IPC_INTERNALS_EVENTFD_HPP + +namespace security_containers { +namespace ipc { + +class EventFD { +public: + + EventFD(); + ~EventFD(); + EventFD(const EventFD& eventfd) = delete; + EventFD& operator=(const EventFD&) = delete; + + /** + * @return event's file descriptor. + */ + int getFD() const; + + /** + * Send an event of a given value + */ + void send(); + + /** + * Receives the signal. + * Blocks if there is no event. + */ + void receive(); + +private: + int mFD; +}; + +} // namespace ipc +} // namespace security_containers + +#endif // COMMON_IPC_INTERNALS_EVENTFD_HPP diff --git a/common/ipc/internals/processor.cpp b/common/ipc/internals/processor.cpp new file mode 100644 index 0000000..0465574 --- /dev/null +++ b/common/ipc/internals/processor.cpp @@ -0,0 +1,434 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Data and event processing thread + */ + +#include "config.hpp" + +#include "ipc/exception.hpp" +#include "ipc/internals/processor.hpp" +#include "ipc/internals/utils.hpp" + +#include +#include +#include +#include + +#include +#include + + +namespace security_containers { +namespace ipc { + +const Processor::MethodID Processor::RETURN_METHOD_ID = std::numeric_limits::max(); + +Processor::Processor(const PeerCallback& newPeerCallback, + const PeerCallback& removedPeerCallback, + const unsigned int maxNumberOfPeers) + : mNewPeerCallback(newPeerCallback), + mRemovedPeerCallback(removedPeerCallback), + mMaxNumberOfPeers(maxNumberOfPeers), + mMessageIDCounter(0), + mPeerIDCounter(0) +{ + LOGT("Creating Processor"); +} + +Processor::~Processor() +{ + LOGT("Destroying Processor"); + try { + stop(); + } catch (IPCException& e) { + LOGE("Error in Processor's destructor: " << e.what()); + } + LOGT("Destroyed Processor"); +} + +void Processor::start() +{ + LOGT("Starting Processor"); + if (!mThread.joinable()) { + mThread = std::thread(&Processor::run, this); + } + LOGT("Started Processor"); +} + +void Processor::stop() +{ + LOGT("Stopping Processor"); + if (mThread.joinable()) { + mEventQueue.send(Event::FINISH); + mThread.join(); + } + LOGT("Stopped Processor"); +} + +void Processor::removeMethod(const MethodID methodID) +{ + LOGT("Removing method " << methodID); + Lock lock(mCallsMutex); + mMethodsCallbacks.erase(methodID); +} + +Processor::PeerID Processor::addPeer(const std::shared_ptr& socketPtr) +{ + LOGT("Adding socket"); + PeerID peerID; + { + Lock lock(mSocketsMutex); + peerID = getNextPeerID(); + SocketInfo socketInfo; + socketInfo.peerID = peerID; + socketInfo.socketPtr = std::move(socketPtr); + mNewSockets.push(std::move(socketInfo)); + } + LOGI("New peer added. Id: " << peerID); + mEventQueue.send(Event::NEW_PEER); + + return peerID; +} + +void Processor::removePeer(PeerID peerID) +{ + LOGW("Removing naughty peer. ID: " << peerID); + { + Lock lock(mSocketsMutex); + mSockets.erase(peerID); + } + resetPolling(); +} + +void Processor::resetPolling() +{ + LOGI("Resetting polling"); + // Setup polling on eventfd and sockets + Lock lock(mSocketsMutex); + mFDs.resize(mSockets.size() + 1); + + mFDs[0].fd = mEventQueue.getFD(); + mFDs[0].events = POLLIN; + + auto socketIt = mSockets.begin(); + for (unsigned int i = 1; i < mFDs.size(); ++i) { + mFDs[i].fd = socketIt->second->getFD(); + mFDs[i].events = POLLIN | POLLHUP; // Listen for input events + ++socketIt; + // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too? + } +} + +void Processor::run() +{ + resetPolling(); + + mIsRunning = true; + while (mIsRunning) { + LOGT("Waiting for communication..."); + int ret = poll(mFDs.data(), mFDs.size(), -1 /*blocking call*/); + LOGT("... incoming communication!"); + if (ret == -1 || ret == 0) { + if (errno == EINTR) { + continue; + } + LOGE("Error in poll: " << std::string(strerror(errno))); + throw IPCException("Error in poll: " + std::string(strerror(errno))); + } + + // Check for lost connections: + if (handleLostConnections()) { + // mFDs changed + continue; + } + + // Check for incoming data. + if (handleInputs()) { + // mFDs changed + continue; + } + + // Check for incoming events + if (handleEvent()) { + // mFDs changed + continue; + } + } +} + +bool Processor::handleLostConnections() +{ + std::list peersToRemove; + + { + Lock lock(mSocketsMutex); + auto socketIt = mSockets.begin(); + for (unsigned int i = 1; i < mFDs.size(); ++i, ++socketIt) { + if (mFDs[i].revents & POLLHUP) { + LOGI("Lost connection to peer: " << socketIt->first); + mFDs[i].revents &= ~(POLLHUP); + peersToRemove.push_back(socketIt->first); + } + } + + for (const auto peerID : peersToRemove) { + LOGT("Removing peer. ID: " << peerID); + mSockets.erase(peerID); + } + } + + if (!peersToRemove.empty()) { + resetPolling(); + } + + return !peersToRemove.empty(); +} + +bool Processor::handleInputs() +{ + std::list> > peersWithInput; + { + Lock lock(mSocketsMutex); + auto socketIt = mSockets.begin(); + for (unsigned int i = 1; i < mFDs.size(); ++i, ++socketIt) { + if (mFDs[i].revents & POLLIN) { + mFDs[i].revents &= ~(POLLIN); + peersWithInput.push_back(*socketIt); + } + } + } + + bool pollChanged = false; + // Handle input outside the critical section + for (const auto& peer : peersWithInput) { + pollChanged = pollChanged || handleInput(peer.first, *peer.second); + } + return pollChanged; +} + +bool Processor::handleInput(const PeerID peerID, const Socket& socket) +{ + LOGT("Handle incoming data"); + MethodID methodID; + MessageID messageID; + { + LOGI("Locking"); + Socket::Guard guard = socket.getGuard(); + socket.read(&methodID, sizeof(methodID)); + socket.read(&messageID, sizeof(messageID)); + LOGI("Locked"); + + if (methodID == RETURN_METHOD_ID) { + LOGI("Return value for messageID: " << messageID); + ReturnCallbacks returnCallbacks; + try { + Lock lock(mReturnCallbacksMutex); + LOGT("Getting the return callback"); + returnCallbacks = std::move(mReturnCallbacks.at(messageID)); + mReturnCallbacks.erase(messageID); + } catch (const std::out_of_range&) { + LOGW("No return callback for messageID: " << messageID); + return false; + } + + std::shared_ptr data; + try { + LOGT("Parsing incoming return data"); + data = returnCallbacks.parse(socket.getFD()); + } catch (const IPCException&) { + removePeer(peerID); + return true; + } + + guard.unlock(); + + LOGT("Process callback for methodID: " << methodID << "; messageID: " << messageID); + returnCallbacks.process(data); + + } else { + LOGI("Remote call; methodID: " << methodID << " messageID: " << messageID); + std::shared_ptr methodCallbacks; + try { + Lock lock(mCallsMutex); + methodCallbacks = mMethodsCallbacks.at(methodID); + } catch (const std::out_of_range&) { + LOGW("No method callback for methodID: " << methodID); + removePeer(peerID); + return true; + } + + std::shared_ptr data; + try { + LOGT("Parsing incoming data"); + data = methodCallbacks->parse(socket.getFD()); + } catch (const IPCException&) { + removePeer(peerID); + return true; + } + + guard.unlock(); + + LOGT("Process callback for methodID: " << methodID << "; messageID: " << messageID); + std::shared_ptr returnData = methodCallbacks->method(data); + + LOGT("Sending return data; methodID: " << methodID << "; messageID: " << messageID); + try { + // Send the call with the socket + Socket::Guard guard = socket.getGuard(); + socket.write(&RETURN_METHOD_ID, sizeof(RETURN_METHOD_ID)); + socket.write(&messageID, sizeof(messageID)); + methodCallbacks->serialize(socket.getFD(), returnData); + } catch (const IPCException&) { + removePeer(peerID); + return true; + } + } + } + + return false; +} + +bool Processor::handleEvent() +{ + if (!(mFDs[0].revents & POLLIN)) { + // No event to serve + return false; + } + + mFDs[0].revents &= ~(POLLIN); + + switch (mEventQueue.receive()) { + case Event::FINISH: { + LOGD("Event FINISH"); + mIsRunning = false; + return false; + } + + case Event::CALL: { + LOGD("Event CALL"); + handleCall(); + return false; + } + + case Event::NEW_PEER: { + LOGD("Event NEW_PEER"); + SocketInfo socketInfo; + { + Lock lock(mSocketsMutex); + + socketInfo = std::move(mNewSockets.front()); + mNewSockets.pop(); + + if (mSockets.size() > mMaxNumberOfPeers) { + LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerID); + + } + if (mSockets.count(socketInfo.peerID) != 0) { + LOGE("There already was a socket for peerID: " << socketInfo.peerID); + } + mSockets[socketInfo.peerID] = socketInfo.socketPtr; + } + resetPolling(); + + if (mNewPeerCallback) { + // Notify about the new user. + mNewPeerCallback(socketInfo.peerID); + } + return true; + } + } + + return false; +} + +Processor::MessageID Processor::getNextMessageID() +{ + // TODO: This method of generating UIDs is buggy. To be changed. + return ++mMessageIDCounter; +} + +Processor::PeerID Processor::getNextPeerID() +{ + // TODO: This method of generating UIDs is buggy. To be changed. + return ++mPeerIDCounter; +} + +Processor::Call Processor::getCall() +{ + Lock lock(mCallsMutex); + if (mCalls.empty()) { + LOGE("Calls queue empty"); + throw IPCException("Calls queue empty"); + } + Call call = std::move(mCalls.front()); + mCalls.pop(); + return call; +} + +void Processor::handleCall() +{ + LOGT("Handle call from another thread"); + Call call = getCall(); + + ReturnCallbacks returnCallbacks; + returnCallbacks.parse = call.parse; + returnCallbacks.process = call.process; + + std::shared_ptr socketPtr; + try { + // Get the addressee's socket + Lock lock(mSocketsMutex); + socketPtr = mSockets.at(call.peerID); + } catch (const std::out_of_range&) { + LOGE("Peer disconnected. No socket with a peerID: " << call.peerID); + return; + } + + MessageID messageID = getNextMessageID(); + + { + // Set what to do with the return message + Lock lock(mReturnCallbacksMutex); + if (mReturnCallbacks.count(messageID) != 0) { + LOGE("There already was a return callback for messageID: " << messageID); + } + mReturnCallbacks[messageID] = std::move(returnCallbacks); + } + + try { + // Send the call with the socket + Socket::Guard guard = socketPtr->getGuard(); + socketPtr->write(&call.methodID, sizeof(call.methodID)); + socketPtr->write(&messageID, sizeof(messageID)); + call.serialize(socketPtr->getFD(), call.data); + } catch (const std::exception& e) { + LOGE("Error during sending a message: " << e.what()); + { + Lock lock(mReturnCallbacksMutex); + mReturnCallbacks.erase(messageID); + } + // TODO: User should get the error code. + } +} + +} // namespace ipc +} // namespace security_containers diff --git a/common/ipc/internals/processor.hpp b/common/ipc/internals/processor.hpp new file mode 100644 index 0000000..cb14a67 --- /dev/null +++ b/common/ipc/internals/processor.hpp @@ -0,0 +1,418 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Data and event processing thread + */ + +#ifndef COMMON_IPC_INTERNALS_PROCESSOR_HPP +#define COMMON_IPC_INTERNALS_PROCESSOR_HPP + +#include "ipc/internals/socket.hpp" +#include "ipc/internals/event-queue.hpp" +#include "ipc/exception.hpp" +#include "ipc/types.hpp" +#include "config/manager.hpp" +#include "config/is-visitable.hpp" +#include "logger/logger.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace security_containers { +namespace ipc { +namespace { +const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500; +} +/** +* This class wraps communication via UX sockets +* +* It's intended to be used both in Client and Service classes. +* It uses a serialization mechanism from libConfig. +* Library user will only have to pass the types that each call will send and receive +* +* Message format: +* - MethodID - probably casted enum. +* MethodID == std::numeric_limits::max() is reserved for return messages +* - MessageID - unique id of a message exchange sent by this object instance. Used to identify reply messages. +* - Rest: The data written in a callback. One type per method.ReturnCallbacks +* +* TODO: +* - error codes passed to async callbacks +* - remove ReturnCallbacks on peer disconnect +* - on sync timeout erase the return callback +* - don't throw timeout if the message is already processed +* - naming convention or methods that just commissions the PROCESS thread to do something +* - removePeer API function +*/ +class Processor { +public: + typedef std::function PeerCallback; + typedef unsigned int PeerID; + typedef unsigned int MethodID; + + /** + * Method ID. Used to indicate a message with the return value. + */ + static const MethodID RETURN_METHOD_ID; + /** + * Constructs the Processor, but doesn't start it. + * The object is ready to add methods. + * + * @param newPeerCallback called when a new peer arrives + * @param removedPeerCallback called when the Processor stops listening for this peer + */ + Processor(const PeerCallback& newPeerCallback = nullptr, + const PeerCallback& removedPeerCallback = nullptr, + const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS); + ~Processor(); + + Processor(const Processor&) = delete; + Processor(Processor&&) = delete; + Processor& operator=(const Processor&) = delete; + + /** + * Start the processing thread. + * Quits immediately after starting the thread. + */ + void start(); + + /** + * Stops the processing thread. + * No incoming data will be handled after. + */ + void stop(); + + /** + * From now on socket is owned by the Processor object. + * Calls the newPeerCallback. + * + * @param socketPtr pointer to the new socket + * @return peerID of the new socket + */ + PeerID addPeer(const std::shared_ptr& socketPtr); + + /** + * Saves the callbacks connected to the method id. + * When a message with the given method id is received, + * the data will be passed to the serialization callback through file descriptor. + * + * Then the process callback will be called with the parsed data. + * + * @param methodID API dependent id of the method + * @param process data processing callback + * @tparam SentDataType data type to send + * @tparam ReceivedDataType data type to receive + */ + template + void addMethodHandler(const MethodID methodID, + const typename MethodHandler::type& process); + + /** + * Removes the callback + * + * @param methodID API dependent id of the method + */ + void removeMethod(const MethodID methodID); + + /** + * Synchronous method call. + * + * @param methodID API dependent id of the method + * @param peerID id of the peer + * @param data data to sent + * @param timeoutMS how long to wait for the return value before throw + * @tparam SentDataType data type to send + * @tparam ReceivedDataType data type to receive + */ + template + std::shared_ptr callSync(const MethodID methodID, + const PeerID peerID, + const std::shared_ptr& data, + unsigned int timeoutMS = 500); + + /** + * Asynchronous method call + * + * @param methodID API dependent id of the method + * @param peerID id of the peer + * @param data data to sent + * @param process callback processing the return data + * @tparam SentDataType data type to send + * @tparam ReceivedDataType data type to receive + */ + template + void callAsync(const MethodID methodID, + const PeerID peerID, + const std::shared_ptr& data, + const typename ResultHandler::type& process); + + +private: + typedef std::function& data)> SerializeCallback; + typedef std::function(int fd)> ParseCallback; + typedef std::lock_guard Lock; + typedef unsigned int MessageID; + + struct Call { + Call(const Call& other) = delete; + Call& operator=(const Call&) = delete; + Call() = default; + Call(Call&&) = default; + + PeerID peerID; + MethodID methodID; + std::shared_ptr data; + SerializeCallback serialize; + ParseCallback parse; + ResultHandler::type process; + }; + + struct MethodHandlers { + MethodHandlers(const MethodHandlers& other) = delete; + MethodHandlers& operator=(const MethodHandlers&) = delete; + MethodHandlers() = default; + MethodHandlers(MethodHandlers&&) = default; + MethodHandlers& operator=(MethodHandlers &&) = default; + + SerializeCallback serialize; + ParseCallback parse; + MethodHandler::type method; + }; + + struct ReturnCallbacks { + ReturnCallbacks(const ReturnCallbacks& other) = delete; + ReturnCallbacks& operator=(const ReturnCallbacks&) = delete; + ReturnCallbacks() = default; + ReturnCallbacks(ReturnCallbacks&&) = default; + ReturnCallbacks& operator=(ReturnCallbacks &&) = default; + + ParseCallback parse; + ResultHandler::type process; + }; + + struct SocketInfo { + SocketInfo(const SocketInfo& other) = delete; + SocketInfo& operator=(const SocketInfo&) = delete; + SocketInfo() = default; + SocketInfo(SocketInfo&&) = default; + SocketInfo& operator=(SocketInfo &&) = default; + + std::shared_ptr socketPtr; + PeerID peerID; + }; + + enum class Event : int { + FINISH, // Shutdown request + CALL, // New method call in the queue + NEW_PEER // New peer in the queue + }; + EventQueue mEventQueue; + + + bool mIsRunning; + + // Mutex for the Calls queue and the map of methods. + std::mutex mCallsMutex; + std::queue mCalls; + std::unordered_map> mMethodsCallbacks; + + // Mutex for changing mSockets map. + // Shouldn't be locked on any read/write, that could block. Just copy the ptr. + std::mutex mSocketsMutex; + std::unordered_map > mSockets; + std::queue mNewSockets; + + // Mutex for modifying the map with return callbacks + std::mutex mReturnCallbacksMutex; + std::unordered_map mReturnCallbacks; + + + PeerCallback mNewPeerCallback; + PeerCallback mRemovedPeerCallback; + + unsigned int mMaxNumberOfPeers; + + std::thread mThread; + std::vector mFDs; + + std::atomic mMessageIDCounter; + std::atomic mPeerIDCounter; + + void run(); + bool handleEvent(); + void handleCall(); + bool handleLostConnections(); + bool handleInputs(); + bool handleInput(const PeerID peerID, const Socket& socket); + void resetPolling(); + MessageID getNextMessageID(); + PeerID getNextPeerID(); + Call getCall(); + void removePeer(PeerID peerID); + +}; + +template +void Processor::addMethodHandler(const MethodID methodID, + const typename MethodHandler::type& method) +{ + static_assert(config::isVisitable::value, + "Use the libConfig library"); + static_assert(config::isVisitable::value, + "Use the libConfig library"); + + if (methodID == RETURN_METHOD_ID) { + LOGE("Forbidden methodID: " << methodID); + throw IPCException("Forbidden methodID: " + std::to_string(methodID)); + } + + using namespace std::placeholders; + + MethodHandlers methodCall; + + methodCall.parse = [](const int fd)->std::shared_ptr { + std::shared_ptr data(new ReceivedDataType()); + config::loadFromFD(fd, *data); + return data; + }; + + methodCall.serialize = [](const int fd, std::shared_ptr& data)->void { + config::saveToFD(fd, *std::static_pointer_cast(data)); + }; + + methodCall.method = [method](std::shared_ptr& data)->std::shared_ptr { + std::shared_ptr tmpData = std::static_pointer_cast(data); + return method(tmpData); + }; + + { + Lock lock(mCallsMutex); + mMethodsCallbacks[methodID] = std::make_shared(std::move(methodCall)); + } +} + +template +void Processor::callAsync(const MethodID methodID, + const PeerID peerID, + const std::shared_ptr& data, + const typename ResultHandler::type& process) +{ + static_assert(config::isVisitable::value, + "Use the libConfig library"); + static_assert(config::isVisitable::value, + "Use the libConfig library"); + + if (!mThread.joinable()) { + LOGE("The Processor thread is not started. Can't send any data."); + throw IPCException("The Processor thread is not started. Can't send any data."); + } + + using namespace std::placeholders; + + Call call; + call.peerID = peerID; + call.methodID = methodID; + call.data = data; + + call.parse = [](const int fd)->std::shared_ptr { + std::shared_ptr data(new ReceivedDataType()); + config::loadFromFD(fd, *data); + return data; + }; + + call.serialize = [](const int fd, std::shared_ptr& data)->void { + config::saveToFD(fd, *std::static_pointer_cast(data)); + }; + + call.process = [process](std::shared_ptr& data)->void { + std::shared_ptr tmpData = std::static_pointer_cast(data); + return process(tmpData); + }; + + { + Lock lock(mCallsMutex); + mCalls.push(std::move(call)); + } + + mEventQueue.send(Event::CALL); +} + + +template +std::shared_ptr Processor::callSync(const MethodID methodID, + const PeerID peerID, + const std::shared_ptr& data, + unsigned int timeoutMS) +{ + static_assert(config::isVisitable::value, + "Use the libConfig library"); + static_assert(config::isVisitable::value, + "Use the libConfig library"); + + if (!mThread.joinable()) { + LOGE("The Processor thread is not started. Can't send any data."); + throw IPCException("The Processor thread is not started. Can't send any data."); + } + + std::shared_ptr result; + + std::mutex mtx; + std::unique_lock lck(mtx); + std::condition_variable cv; + + auto process = [&result, &cv](std::shared_ptr returnedData) { + result = returnedData; + cv.notify_one(); + }; + + callAsync(methodID, + peerID, + data, + process); + + auto isResultInitialized = [&result]() { + return static_cast(result); + }; + + if (!cv.wait_for(lck, std::chrono::milliseconds(timeoutMS), isResultInitialized)) { + LOGE("Function call timeout; methodID: " << methodID); + throw IPCException("Function call timeout; methodID: " + std::to_string(methodID)); + } + + return result; +} + + +} // namespace ipc +} // namespace security_containers + +#endif // COMMON_IPC_INTERNALS_PROCESSOR_HPP diff --git a/common/ipc/internals/socket.cpp b/common/ipc/internals/socket.cpp new file mode 100644 index 0000000..002b9cf --- /dev/null +++ b/common/ipc/internals/socket.cpp @@ -0,0 +1,200 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Linux socket wrapper + */ + +#include "config.hpp" + +#include "ipc/exception.hpp" +#include "ipc/internals/socket.hpp" +#include "ipc/internals/utils.hpp" +#include "logger/logger.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace security_containers { +namespace ipc { + +namespace { +const int MAX_QUEUE_LENGTH = 1000; +} + +Socket::Socket(int socketFD) + : mFD(socketFD) +{ +} + +Socket::Socket(Socket&& socket) + : mFD(socket.mFD) +{ + socket.mFD = -1; +} + +Socket::~Socket() +{ + try { + ipc::close(mFD); + } catch (IPCException& e) { + LOGE("Error in Socket's destructor: " << e.what()); + } +} + +Socket::Guard Socket::getGuard() const +{ + return Guard(mCommunicationMutex); +} + +int Socket::getFD() const +{ + return mFD; +} + +std::shared_ptr Socket::accept() +{ + int sockfd = ::accept(mFD, nullptr, nullptr); + if (sockfd == -1) { + LOGE("Error in accept: " << std::string(strerror(errno))); + IPCException("Error in accept: " + std::string(strerror(errno))); + } + return std::make_shared(sockfd); +} + +void Socket::write(const void* bufferPtr, const size_t size) const +{ + Guard guard(mCommunicationMutex); + ipc::write(mFD, bufferPtr, size); +} + +void Socket::read(void* bufferPtr, const size_t size) const +{ + Guard guard(mCommunicationMutex); + ipc::read(mFD, bufferPtr, size); +} + +int Socket::getSystemdSocket(const std::string& path) +{ + int n = ::sd_listen_fds(-1 /*Block further calls to sd_listen_fds*/); + if (n < 0) { + LOGE("sd_listen_fds fails with errno: " + n); + throw IPCException("sd_listen_fds fails with errno: " + n); + } + + for (int fd = SD_LISTEN_FDS_START; + fd < SD_LISTEN_FDS_START + n; + ++fd) { + if (0 < ::sd_is_socket_unix(SD_LISTEN_FDS_START, SOCK_STREAM, 1, path.c_str(), 0)) { + return fd; + } + } + LOGW("No usable sockets were passed by systemd."); + return -1; +} + +int Socket::createDomainSocket(const std::string& path) +{ + // Isn't the path too long? + if (path.size() >= sizeof(sockaddr_un::sun_path)) { + LOGE("Socket's path too long"); + throw IPCException("Socket's path too long"); + } + + int sockfd = ::socket(AF_UNIX, SOCK_STREAM, 0); + if (sockfd == -1) { + LOGE("Error in socket: " + std::string(strerror(errno))); + throw IPCException("Error in socket: " + std::string(strerror(errno))); + } + + ::sockaddr_un serverAddress; + serverAddress.sun_family = AF_UNIX; + ::strncpy(serverAddress.sun_path, path.c_str(), sizeof(sockaddr_un::sun_path)); + unlink(serverAddress.sun_path); + + // Everybody can access the socket + // TODO: Use SMACK to guard the socket + if (-1 == ::bind(sockfd, + reinterpret_cast(&serverAddress), + sizeof(struct sockaddr_un))) { + std::string message = strerror(errno); + ::close(sockfd); + LOGE("Error in bind: " << message); + IPCException("Error in bind: " + message); + } + + if (-1 == ::listen(sockfd, + MAX_QUEUE_LENGTH)) { + std::string message = strerror(errno); + ::close(sockfd); + LOGE("Error in listen: " << message); + IPCException("Error in listen: " + message); + } + + return sockfd; +} + +Socket Socket::createSocket(const std::string& path) +{ + // Initialize a socket + int fd = getSystemdSocket(path); + fd = fd != -1 ? fd : createDomainSocket(path); + + return Socket(fd); +} + +Socket Socket::connectSocket(const std::string& path) +{ + // Isn't the path too long? + if (path.size() >= sizeof(sockaddr_un::sun_path)) { + LOGE("Socket's path too long"); + throw IPCException("Socket's path too long"); + } + + int fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd == -1) { + LOGE("Error in socket: " + std::string(strerror(errno))); + throw IPCException("Error in socket: " + std::string(strerror(errno))); + } + + sockaddr_un serverAddress; + serverAddress.sun_family = AF_UNIX; + strncpy(serverAddress.sun_path, path.c_str(), sizeof(sockaddr_un::sun_path)); + + if (-1 == connect(fd, + reinterpret_cast(&serverAddress), + sizeof(struct sockaddr_un))) { + ::close(fd); + LOGE("Error in connect: " + std::string(strerror(errno))); + throw IPCException("Error in connect: " + std::string(strerror(errno))); + } + + return Socket(fd); +} + +} // namespace ipc +} // namespace security_containers diff --git a/common/ipc/internals/socket.hpp b/common/ipc/internals/socket.hpp new file mode 100644 index 0000000..b7a6d40 --- /dev/null +++ b/common/ipc/internals/socket.hpp @@ -0,0 +1,116 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Linux socket wrapper + */ + +#ifndef COMMON_IPC_INTERNALS_SOCKET_HPP +#define COMMON_IPC_INTERNALS_SOCKET_HPP + +#include +#include +#include + +namespace security_containers { +namespace ipc { + +/** + * This class wraps all operations possible to do with a socket. + * + * It has operations both for client and server application. + */ +class Socket { +public: + + typedef std::unique_lock Guard; + + /** + * Default constructor. + * If socketFD is passed then it's passed by the Socket + * + * @param socketFD socket obtained outside the class. + */ + Socket(int socketFD = -1); + Socket(Socket&& socket); + ~Socket(); + Socket& operator=(const Socket&) = delete; + + /** + * @return reference to the socket's file descriptor + */ + int getFD() const; + + /** + * Write data using the file descriptor + * + * @param bufferPtr buffer with the data + * @param size size of the buffer + */ + void write(const void* bufferPtr, const size_t size) const; + + /** + * Reads a value of the given type. + * + * @param bufferPtr buffer with the data + * @param size size of the buffer + */ + void read(void* bufferPtr, const size_t size) const; + + /** + * Accepts connection. Used by a server application. + * Blocking, called by a server. + */ + std::shared_ptr accept() ; + + /** + */ + Socket::Guard getGuard() const; + + + /** + * Prepares socket for accepting connections. + * Called by a server. + * + * @param path path to the socket + * @return created socket + */ + static Socket createSocket(const std::string& path); + + /** + * Connects to a socket. Called as a client. + * + * @param path path to the socket + * @return connected socket + */ + static Socket connectSocket(const std::string& path); + +private: + int mFD; + mutable std::recursive_mutex mCommunicationMutex; + + static int createDomainSocket(const std::string& path); + static int getSystemdSocket(const std::string& path); +}; + +} // namespace ipc +} // namespace security_containers + +#endif // COMMON_IPC_INTERNALS_SOCKET_HPP diff --git a/common/ipc/internals/utils.cpp b/common/ipc/internals/utils.cpp new file mode 100644 index 0000000..e98b60d --- /dev/null +++ b/common/ipc/internals/utils.cpp @@ -0,0 +1,134 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Utility functions + */ + +#include "config.hpp" + +#include "ipc/exception.hpp" +#include "ipc/internals/utils.hpp" +#include "logger/logger.hpp" + +#include +#include +#include + +#include +#include + +namespace fs = boost::filesystem; + +namespace security_containers { +namespace ipc { + +void close(int fd) +{ + if (fd < 0) { + return; + } + + for (;;) { + if (-1 == ::close(fd)) { + if (errno == EINTR) { + LOGD("Close interrupted by a signal, retrying"); + continue; + } + LOGE("Error in close: " << std::string(strerror(errno))); + throw IPCException("Error in close: " + std::string(strerror(errno))); + } + break; + } +} + +void write(int fd, const void* bufferPtr, const size_t size) +{ + size_t nTotal = 0; + int n; + + do { + n = ::write(fd, + reinterpret_cast(bufferPtr) + nTotal, + size - nTotal); + if (n < 0) { + if (errno == EINTR) { + LOGD("Write interrupted by a signal, retrying"); + continue; + } + LOGE("Error during writing: " + std::string(strerror(errno))); + throw IPCException("Error during witting: " + std::string(strerror(errno))); + } + nTotal += n; + } while (nTotal < size); +} + +void read(int fd, void* bufferPtr, const size_t size) +{ + size_t nTotal = 0; + int n; + + do { + n = ::read(fd, + reinterpret_cast(bufferPtr) + nTotal, + size - nTotal); + if (n < 0) { + if (errno == EINTR) { + LOGD("Read interrupted by a signal, retrying"); + continue; + } + LOGE("Error during reading: " + std::string(strerror(errno))); + throw IPCException("Error during reading: " + std::string(strerror(errno))); + } + nTotal += n; + } while (nTotal < size); +} + +unsigned int getMaxFDNumber() +{ + struct rlimit rlim; + if (-1 == getrlimit(RLIMIT_NOFILE, &rlim)) { + LOGE("Error during getrlimit: " + std::string(strerror(errno))); + throw IPCException("Error during getrlimit: " + std::string(strerror(errno))); + } + return rlim.rlim_cur; +} + +void setMaxFDNumber(unsigned int limit) +{ + struct rlimit rlim; + rlim.rlim_cur = limit; + rlim.rlim_max = limit; + if (-1 == setrlimit(RLIMIT_NOFILE, &rlim)) { + LOGE("Error during setrlimit: " + std::string(strerror(errno))); + throw IPCException("Error during setrlimit: " + std::string(strerror(errno))); + } +} + +unsigned int getFDNumber() +{ + const std::string path = "/proc/self/fd/"; + return std::distance(fs::directory_iterator(path), + fs::directory_iterator()); +} + +} // namespace ipc +} // namespace security_containers + diff --git a/common/ipc/internals/utils.hpp b/common/ipc/internals/utils.hpp new file mode 100644 index 0000000..0b1815d --- /dev/null +++ b/common/ipc/internals/utils.hpp @@ -0,0 +1,77 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Utility functions + */ + +#ifndef COMMON_IPC_INTERNALS_UTILS_HPP +#define COMMON_IPC_INTERNALS_UTILS_HPP + +#include + +namespace security_containers { +namespace ipc { + +/** + * Close the file descriptor. + * Repeat until + */ +void close(int fd); + +/** + * Write to a file descriptor, throw on error. + * + * @param fd file descriptor + * @param bufferPtr pointer to the data buffer + * @param size size of data to write + */ +void write(int fd, const void* bufferPtr, const size_t size); + +/** + * Read from a file descriptor, throw on error. + * + * @param fd file descriptor + * @param bufferPtr pointer to the data buffer + * @param size size of the data to read + */ +void read(int fd, void* bufferPtr, const size_t size); + +/** + * @return the max number of file descriptors for this process. + */ +unsigned int getMaxFDNumber(); + +/** + * Set the software and hardware limit of file descriptors for this process. + * + * @param limit limit of file descriptors + */ +void setMaxFDNumber(unsigned int limit); + +/** + * @return number of opened file descriptors by this process + */ +unsigned int getFDNumber(); + +} // namespace ipc +} // namespace security_containers + +#endif // COMMON_IPC_INTERNALS_UTILS_HPP diff --git a/common/ipc/service.cpp b/common/ipc/service.cpp new file mode 100644 index 0000000..d2590d6 --- /dev/null +++ b/common/ipc/service.cpp @@ -0,0 +1,86 @@ +// /* +// * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +// * +// * Contact: Jan Olszak +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License +// */ + +// /** +// * @file +// * @author Jan Olszak (j.olszak@samsung.com) +// * @brief Implementation of the IPC handling class +// */ + +#include "config.hpp" + +#include "ipc/service.hpp" +#include "ipc/exception.hpp" +#include "logger/logger.hpp" + +using namespace std::placeholders; + +namespace security_containers { +namespace ipc { + +Service::Service(const std::string& socketPath, + const PeerCallback& addPeerCallback, + const PeerCallback& removePeerCallback) + : mProcessor(addPeerCallback, removePeerCallback), + mAcceptor(socketPath, std::bind(&Processor::addPeer, &mProcessor, _1)) + +{ + LOGD("Creating server"); +} + +Service::~Service() +{ + LOGD("Destroying server..."); + try { + stop(); + } catch (IPCException& e) { + LOGE("Error in Service's destructor: " << e.what()); + } + LOGD("Destroyed"); +} + +void Service::start() +{ + LOGD("Starting server"); + mProcessor.start(); + + // There can be an incoming connection from mAcceptor before mProcessor is listening, + // but it's OK. It will handle the connection when ready. So no need to wait for mProcessor. + mAcceptor.start(); + + LOGD("Started server"); +} + +void Service::stop() +{ + LOGD("Stopping server.."); + mAcceptor.stop(); + mProcessor.stop(); + LOGD("Stopped"); +} + +void Service::removeMethod(const MethodID methodID) +{ + LOGD("Removing method " << methodID); + mProcessor.removeMethod(methodID); + LOGD("Removed " << methodID); +} + + +} // namespace ipc +} // namespace security_containers diff --git a/common/ipc/service.hpp b/common/ipc/service.hpp new file mode 100644 index 0000000..08f7ec9 --- /dev/null +++ b/common/ipc/service.hpp @@ -0,0 +1,168 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Declaration of the IPC handling class + */ + +#ifndef COMMON_IPC_SERVICE_HPP +#define COMMON_IPC_SERVICE_HPP + +#include "ipc/internals/processor.hpp" +#include "ipc/internals/acceptor.hpp" +#include "ipc/types.hpp" +#include "logger/logger.hpp" + +#include + +namespace security_containers { +namespace ipc { + + +/** + * This class wraps communication via UX sockets. + * It uses serialization mechanism from libConfig. + * + * There are two working threads: + * - ACCEPTOR accepts incoming connections and passes them to PROCESSOR + * - PROCESSOR is responsible for the communication and calling the callbacks + * + * For message format @see ipc::Processor + */ +class Service { +public: + typedef Processor::PeerCallback PeerCallback; + typedef Processor::PeerID PeerID; + typedef Processor::MethodID MethodID; + + /** + * @param path path to the socket + */ + Service(const std::string& path, + const PeerCallback& addPeerCallback = nullptr, + const PeerCallback& removePeerCallback = nullptr); + ~Service(); + + Service(const Service&) = delete; + Service& operator=(const Service&) = delete; + + /** + * Starts the worker and acceptor threads + */ + void start(); + + /** + * Stops all working threads + */ + void stop(); + + /** + * Saves the callback connected to the method id. + * When a message with the given method id is received + * the data will be parsed and passed to this callback. + * + * @param methodID API dependent id of the method + * @param methodCallback method handling implementation + */ + template + void addMethodHandler(const MethodID methodID, + const typename MethodHandler::type& method); + + /** + * Removes the callback + * + * @param methodID API dependent id of the method + */ + void removeMethod(const MethodID methodID); + + /** + * Synchronous method call. + * + * @param methodID API dependent id of the method + * @param data data to send + * @return result data + */ + template + std::shared_ptr callSync(const MethodID methodID, + const PeerID peerID, + const std::shared_ptr& data, + unsigned int timeoutMS); + + /** + * Asynchronous method call. The return callback will be called on + * return data arrival. It will be run in the PROCESSOR thread. + * + * + * @param methodID API dependent id of the method + * @param sendCallback callback for data serialization + * @param resultCallback callback for result serialization and handling + */ + template + void callAsync(const MethodID methodID, + const PeerID peerID, + const std::shared_ptr& data, + const typename ResultHandler::type& resultCallback); + +private: + typedef std::lock_guard Lock; + Processor mProcessor; + Acceptor mAcceptor; +}; + + +template +void Service::addMethodHandler(const MethodID methodID, + const typename MethodHandler::type& method) +{ + LOGD("Adding method with id " << methodID); + mProcessor.addMethodHandler(methodID, method); + LOGD("Added method with id " << methodID); +} + +template +std::shared_ptr Service::callSync(const MethodID methodID, + const PeerID peerID, + const std::shared_ptr& data, + unsigned int timeoutMS = 500) +{ + LOGD("Sync calling method: " << methodID << " for user: " << peerID); + return mProcessor.callSync(methodID, peerID, data, timeoutMS); +} + +template +void Service::callAsync(const MethodID methodID, + const PeerID peerID, + const std::shared_ptr& data, + const typename ResultHandler::type& resultCallback) +{ + LOGD("Async calling method: " << methodID << " for user: " << peerID); + mProcessor.callAsync(methodID, + peerID, + data, + resultCallback); + LOGD("Async called method: " << methodID << "for user: " << peerID); +} + + +} // namespace ipc +} // namespace security_containers + +#endif // COMMON_IPC_SERVICE_HPP diff --git a/common/ipc/types.hpp b/common/ipc/types.hpp new file mode 100644 index 0000000..4269664 --- /dev/null +++ b/common/ipc/types.hpp @@ -0,0 +1,48 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Handler types definitions + */ + +#ifndef COMMON_IPC_HANDLERS_HPP +#define COMMON_IPC_HANDLERS_HPP + +#include +#include + +namespace security_containers { +namespace ipc { + +template +struct MethodHandler { + typedef std::function(std::shared_ptr&)> type; +}; + + +template +struct ResultHandler { + typedef std::function&)> type; +}; + +} // namespace ipc +} // namespace security_containers + +#endif // COMMON_IPC_HANDLERS_HPP diff --git a/packaging/security-containers.spec b/packaging/security-containers.spec index 00a05a6..4453648 100644 --- a/packaging/security-containers.spec +++ b/packaging/security-containers.spec @@ -27,6 +27,7 @@ BuildRequires: pkgconfig(libLogger) BuildRequires: pkgconfig(libSimpleDbus) BuildRequires: pkgconfig(glib-2.0) BuildRequires: pkgconfig(libsystemd-journal) +BuildRequires: pkgconfig(libsystemd-daemon) BuildRequires: pkgconfig(libvirt-glib-1.0) BuildRequires: pkgconfig(sqlite3) Requires: libvirt-daemon >= 1.2.4 diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 2237f52..333171a 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -30,7 +30,7 @@ ADD_EXECUTABLE(${SERVER_CODENAME} ${project_SRCS} ${common_SRCS}) ## Link libraries ############################################################## FIND_PACKAGE(Boost COMPONENTS program_options system filesystem regex) PKG_CHECK_MODULES(SERVER_DEPS REQUIRED libvirt libvirt-glib-1.0 json gio-2.0 libsystemd-journal - libcap-ng libLogger libSimpleDbus libConfig) + libcap-ng libLogger libSimpleDbus libConfig libsystemd-daemon) INCLUDE_DIRECTORIES(${COMMON_FOLDER}) INCLUDE_DIRECTORIES(SYSTEM ${SERVER_DEPS_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS}) diff --git a/tests/unit_tests/CMakeLists.txt b/tests/unit_tests/CMakeLists.txt index 3bb265d..caea433 100644 --- a/tests/unit_tests/CMakeLists.txt +++ b/tests/unit_tests/CMakeLists.txt @@ -36,7 +36,8 @@ ADD_EXECUTABLE(${UT_SERVER_CODENAME} ${project_SRCS} ${common_SRCS} ${server_SRC FIND_PACKAGE (Boost COMPONENTS unit_test_framework system filesystem regex) PKG_CHECK_MODULES(UT_SERVER_DEPS REQUIRED libvirt libvirt-glib-1.0 json gio-2.0 - libsystemd-journal libcap-ng libLogger libSimpleDbus libConfig) + libsystemd-journal libcap-ng libLogger libSimpleDbus libConfig + libsystemd-daemon) INCLUDE_DIRECTORIES(${COMMON_FOLDER} ${SERVER_FOLDER} ${UNIT_TESTS_FOLDER} ${CLIENT_FOLDER}) INCLUDE_DIRECTORIES(SYSTEM ${UT_SERVER_DEPS_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS}) TARGET_LINK_LIBRARIES(${UT_SERVER_CODENAME} ${UT_SERVER_DEPS_LIBRARIES} ${Boost_LIBRARIES}) diff --git a/tests/unit_tests/ipc/ut-ipc.cpp b/tests/unit_tests/ipc/ut-ipc.cpp new file mode 100644 index 0000000..48ad1e5 --- /dev/null +++ b/tests/unit_tests/ipc/ut-ipc.cpp @@ -0,0 +1,391 @@ +/* + * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved + * + * Contact: Jan Olszak + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Tests of the IPC + */ + +// TODO: Test connection limit +// TODO: Refactor tests - function for setting up env + + +#include "config.hpp" +#include "ut.hpp" + +#include "ipc/service.hpp" +#include "ipc/client.hpp" +#include "ipc/types.hpp" + +#include "config/fields.hpp" +#include "logger/logger.hpp" + +#include +#include +#include +#include +#include + +using namespace security_containers; +using namespace security_containers::ipc; +namespace fs = boost::filesystem; + +namespace { +struct Fixture { + std::string socketPath; + + Fixture() + : socketPath(fs::unique_path("/tmp/ipc-%%%%.socket").string()) + { + } + ~Fixture() + { + fs::remove(socketPath); + } +}; + +struct SendData { + int intVal; + SendData(int i = 0): intVal(i) {} + + CONFIG_REGISTER + ( + intVal + ) +}; + +struct EmptyData { + CONFIG_REGISTER_EMPTY +}; + +std::shared_ptr returnEmptyCallback(std::shared_ptr&) +{ + return std::shared_ptr(new EmptyData()); +} + +std::shared_ptr returnDataCallback(std::shared_ptr&) +{ + return std::shared_ptr(new SendData(1)); +} + +std::shared_ptr echoCallback(std::shared_ptr& data) +{ + return data; +} + +void testEcho(Client& c, const Client::MethodID methodID) +{ + std::shared_ptr sentData(new SendData(34)); + std::shared_ptr recvData = c.callSync(methodID, sentData); + BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal); +} + +void testEcho(Service& s, const Client::MethodID methodID, const Service::PeerID peerID) +{ + std::shared_ptr sentData(new SendData(56)); + std::shared_ptr recvData = s.callSync(methodID, peerID, sentData); + BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal); +} + +} // namespace + + +BOOST_FIXTURE_TEST_SUITE(IPCSuite, Fixture) + +BOOST_AUTO_TEST_CASE(ConstructorDestructorTest) +{ + Service s(socketPath); + Client c(socketPath); +} + +BOOST_AUTO_TEST_CASE(ServiceAddRemoveMethodTest) +{ + Service s(socketPath); + + s.addMethodHandler(1, returnEmptyCallback); + s.addMethodHandler(1, returnDataCallback); + + s.start(); + + s.addMethodHandler(1, echoCallback); + s.addMethodHandler(2, returnDataCallback); + + Client c(socketPath); + c.start(); + testEcho(c, 1); + + s.removeMethod(1); + s.removeMethod(2); + + BOOST_CHECK_THROW(testEcho(c, 2), IPCException); +} + +BOOST_AUTO_TEST_CASE(ClientAddRemoveMethodTest) +{ + std::mutex mtx; + std::unique_lock lck(mtx); + std::condition_variable cv; + unsigned int peerID = 0; + auto newPeerCallback = [&cv, &peerID](unsigned int newPeerID) { + peerID = newPeerID; + cv.notify_one(); + }; + Service s(socketPath, newPeerCallback); + s.start(); + Client c(socketPath); + + c.addMethodHandler(1, returnEmptyCallback); + c.addMethodHandler(1, returnDataCallback); + + c.start(); + + c.addMethodHandler(1, echoCallback); + c.addMethodHandler(2, returnDataCallback); + + BOOST_CHECK(cv.wait_for(lck, std::chrono::milliseconds(1000), [&peerID]() { + return peerID != 0; + })); + + testEcho(s, 1, peerID); + + c.removeMethod(1); + c.removeMethod(2); + + BOOST_CHECK_THROW(testEcho(s, 1, peerID), IPCException); +} + +BOOST_AUTO_TEST_CASE(ServiceStartStopTest) +{ + Service s(socketPath); + + s.addMethodHandler(1, returnDataCallback); + + s.start(); + s.stop(); + s.start(); + s.stop(); + + s.start(); + s.start(); +} + +BOOST_AUTO_TEST_CASE(ClientStartStopTest) +{ + Service s(socketPath); + Client c(socketPath); + c.addMethodHandler(1, returnDataCallback); + + c.start(); + c.stop(); + c.start(); + c.stop(); + + c.start(); + c.start(); + + c.stop(); + c.stop(); +} + +BOOST_AUTO_TEST_CASE(SyncClientToServiceEchoTest) +{ + Service s(socketPath); + s.addMethodHandler(1, echoCallback); + s.addMethodHandler(2, echoCallback); + + s.start(); + Client c(socketPath); + c.start(); + testEcho(c, 1); + testEcho(c, 2); +} + +BOOST_AUTO_TEST_CASE(RestartTest) +{ + Service s(socketPath); + s.addMethodHandler(1, echoCallback); + s.start(); + s.addMethodHandler(2, echoCallback); + + Client c(socketPath); + c.start(); + testEcho(c, 1); + testEcho(c, 2); + + c.stop(); + c.start(); + + testEcho(c, 1); + testEcho(c, 2); + + s.stop(); + s.start(); + + testEcho(c, 1); + testEcho(c, 2); +} + +BOOST_AUTO_TEST_CASE(SyncServiceToClientEchoTest) +{ + std::mutex mtx; + std::unique_lock lck(mtx); + std::condition_variable cv; + unsigned int peerID = 0; + auto newPeerCallback = [&cv, &peerID](unsigned int newPeerID) { + peerID = newPeerID; + cv.notify_one(); + }; + Service s(socketPath, newPeerCallback); + s.start(); + Client c(socketPath); + c.addMethodHandler(1, echoCallback); + c.start(); + + BOOST_CHECK(cv.wait_for(lck, std::chrono::milliseconds(1000), [&peerID]() { + return peerID != 0; + })); + + std::shared_ptr sentData(new SendData(56)); + std::shared_ptr recvData = s.callSync(1, peerID, sentData); + BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal); +} + +BOOST_AUTO_TEST_CASE(AsyncClientToServiceEchoTest) +{ + // Setup Service and Client + Service s(socketPath); + s.addMethodHandler(1, echoCallback); + s.start(); + Client c(socketPath); + c.start(); + + std::mutex mtx; + std::unique_lock lck(mtx); + std::condition_variable cv; + + //Async call + std::shared_ptr sentData(new SendData(34)); + std::shared_ptr recvData; + auto dataBack = [&cv, &recvData](std::shared_ptr& data) { + recvData = data; + cv.notify_one(); + }; + c.callAsync(1, sentData, dataBack); + + // Wait for the response + BOOST_CHECK(cv.wait_for(lck, std::chrono::milliseconds(100), [&recvData]() { + return static_cast(recvData); + })); + + BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal); +} + +BOOST_AUTO_TEST_CASE(AsyncServiceToClientEchoTest) +{ + std::mutex mtx; + std::unique_lock lck(mtx); + std::condition_variable cv; + + // Setup Service and Client + unsigned int peerID = 0; + auto newPeerCallback = [&cv, &peerID](unsigned int newPeerID) { + peerID = newPeerID; + cv.notify_one(); + }; + Service s(socketPath, newPeerCallback); + s.start(); + Client c(socketPath); + c.addMethodHandler(1, echoCallback); + c.start(); + + // Wait for the connection + BOOST_CHECK(cv.wait_for(lck, std::chrono::milliseconds(1000), [&peerID]() { + return peerID != 0; + })); + + // Async call + std::shared_ptr sentData(new SendData(56)); + std::shared_ptr recvData; + + auto dataBack = [&cv, &recvData](std::shared_ptr& data) { + recvData = data; + cv.notify_one(); + }; + + s.callAsync(1, peerID, sentData, dataBack); + + // Wait for the response + BOOST_CHECK(cv.wait_for(lck, std::chrono::milliseconds(1000), [&recvData]() { + return recvData.get() != nullptr; + })); + + BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal); +} + + +BOOST_AUTO_TEST_CASE(SyncTimeoutTest) +{ + Service s(socketPath); + s.addMethodHandler(1, echoCallback); + + s.start(); + Client c(socketPath); + c.start(); + + std::shared_ptr sentData(new SendData(78)); + + BOOST_CHECK_THROW((c.callSync(1, sentData, 1)), IPCException); +} + + +// BOOST_AUTO_TEST_CASE(ConnectionLimitTest) +// { +// unsigned oldLimit = ipc::getMaxFDNumber(); +// ipc::setMaxFDNumber(50); + +// // Setup Service and many Clients +// Service s(socketPath); +// s.addMethodHandler(1, echoCallback); +// s.start(); + +// std::list clients; +// for (int i = 0; i < 100; ++i) { +// try { +// clients.push_back(Client(socketPath)); +// clients.back().start(); +// } catch (...) {} +// } + +// unsigned seed = std::chrono::system_clock::now().time_since_epoch().count(); +// std::mt19937 generator(seed); +// for (auto it = clients.begin(); it != clients.end(); ++it) { +// try { +// std::shared_ptr sentData(new SendData(generator())); +// std::shared_ptr recvData = it->callSync(1, sentData); +// BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal); +// } catch (...) {} +// } + +// ipc::setMaxFDNumber(oldLimit); +// } + + + +BOOST_AUTO_TEST_SUITE_END() -- 2.7.4