From 7ea2c7bdeffd0ebc8514840af180574b835b6784 Mon Sep 17 00:00:00 2001 From: Jan Olszak Date: Mon, 17 Nov 2014 12:58:32 +0100 Subject: [PATCH] IPC: NONBLOCK sockets [Bug/Feature] All writes and reads have timeout Timeout in callSync removes the peer [Cause] N/A [Solution] N/A [Verification] Build, install, run tests Change-Id: I86213b04e435a48bc56ae6f995a0c364b712a4d0 --- common/ipc/internals/event-queue.hpp | 12 ++++ common/ipc/internals/eventfd.cpp | 2 +- common/ipc/internals/processor.cpp | 120 +++++++++++++++++++++++++++------- common/ipc/internals/processor.hpp | 92 +++++++++++++++++++------- common/ipc/internals/socket.cpp | 12 +++- common/ipc/internals/utils.cpp | 123 ++++++++++++++++++++++++++--------- common/ipc/internals/utils.hpp | 6 +- common/ipc/types.cpp | 4 ++ common/ipc/types.hpp | 2 + tests/unit_tests/ipc/ut-ipc.cpp | 60 +++++++++++++++++ 10 files changed, 349 insertions(+), 84 deletions(-) diff --git a/common/ipc/internals/event-queue.hpp b/common/ipc/internals/event-queue.hpp index 82cb2ff..b50f0c4 100644 --- a/common/ipc/internals/event-queue.hpp +++ b/common/ipc/internals/event-queue.hpp @@ -71,6 +71,11 @@ public: */ MessageType receive(); + /** + * @return is the queue empty + */ + bool isEmpty(); + private: typedef std::lock_guard Lock; @@ -106,6 +111,13 @@ MessageType EventQueue::receive() return mess; } +template +bool EventQueue::isEmpty() +{ + Lock lock(mCommunicationMutex); + return mMessages.empty(); +} + } // namespace ipc } // namespace security_containers diff --git a/common/ipc/internals/eventfd.cpp b/common/ipc/internals/eventfd.cpp index c8a17b6..37cf7dd 100644 --- a/common/ipc/internals/eventfd.cpp +++ b/common/ipc/internals/eventfd.cpp @@ -39,7 +39,7 @@ namespace ipc { EventFD::EventFD() { - mFD = ::eventfd(0, EFD_SEMAPHORE); + mFD = ::eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK); if (mFD == -1) { LOGE("Error in eventfd: " << std::string(strerror(errno))); throw IPCException("Error in eventfd: " + std::string(strerror(errno))); diff --git a/common/ipc/internals/processor.cpp b/common/ipc/internals/processor.cpp index 5634880..9677991 100644 --- a/common/ipc/internals/processor.cpp +++ b/common/ipc/internals/processor.cpp @@ -48,9 +48,6 @@ namespace ipc { LOGE("Callback threw an error: " << e.what()); \ } - - - const Processor::MethodID Processor::RETURN_METHOD_ID = std::numeric_limits::max(); Processor::Processor(const PeerCallback& newPeerCallback, @@ -118,9 +115,31 @@ Processor::PeerID Processor::addPeer(const std::shared_ptr& socketPtr) return peerID; } -void Processor::removePeer(const PeerID peerID, Status status) +void Processor::removePeer(const PeerID peerID) { - LOGW("Removing naughty peer. ID: " << peerID); + std::shared_ptr conditionPtr(new std::condition_variable()); + + { + Lock lock(mSocketsMutex); + RemovePeerRequest request(peerID, conditionPtr); + mPeersToDelete.push(std::move(request)); + } + + mEventQueue.send(Event::DELETE_PEER); + + auto isPeerDeleted = [&peerID, this] { + Lock lock(mSocketsMutex); + return mSockets.count(peerID) == 0; + }; + + std::mutex mutex; + std::unique_lock lock(mutex); + conditionPtr->wait(lock, isPeerDeleted); +} + +void Processor::removePeerInternal(const PeerID peerID, Status status) +{ + LOGW("Removing peer. ID: " << peerID); { Lock lock(mSocketsMutex); mSockets.erase(peerID); @@ -141,9 +160,49 @@ void Processor::removePeer(const PeerID peerID, Status status) } } + if (mRemovedPeerCallback) { + // Notify about the deletion + mRemovedPeerCallback(peerID); + } + resetPolling(); } +void Processor::cleanCommunication() +{ + while (!mEventQueue.isEmpty()) { + switch (mEventQueue.receive()) { + case Event::FINISH: { + LOGD("Event FINISH after FINISH"); + break; + } + case Event::CALL: { + LOGD("Event CALL after FINISH"); + Call call = getCall(); + IGNORE_EXCEPTIONS(call.process(Status::CLOSING, call.data)); + break; + } + + case Event::NEW_PEER: { + LOGD("Event NEW_PEER after FINISH"); + break; + } + + case Event::DELETE_PEER: { + LOGD("Event DELETE_PEER after FINISH"); + RemovePeerRequest request; + { + Lock lock(mSocketsMutex); + request = std::move(mPeersToDelete.front()); + mPeersToDelete.pop(); + } + request.conditionPtr->notify_all(); + break; + } + } + } +} + void Processor::resetPolling() { LOGI("Resetting polling"); @@ -198,6 +257,8 @@ void Processor::run() continue; } } + + cleanCommunication(); } @@ -215,11 +276,10 @@ bool Processor::handleLostConnections() peersToRemove.push_back(socketIt->first); } } - } for (const PeerID peerID : peersToRemove) { - removePeer(peerID, Status::PEER_DISCONNECTED); + removePeerInternal(peerID, Status::PEER_DISCONNECTED); } return !peersToRemove.empty(); @@ -280,7 +340,7 @@ bool Processor::onReturnValue(const PeerID peerID, mReturnCallbacks.erase(messageID); } catch (const std::out_of_range&) { LOGW("No return callback for messageID: " << messageID); - removePeer(peerID, Status::NAUGHTY_PEER); + removePeerInternal(peerID, Status::NAUGHTY_PEER); return true; } @@ -291,7 +351,7 @@ bool Processor::onReturnValue(const PeerID peerID, } catch (const std::exception& e) { LOGE("Exception during parsing: " << e.what()); IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data)); - removePeer(peerID, Status::PARSING_ERROR); + removePeerInternal(peerID, Status::PARSING_ERROR); return true; } @@ -314,7 +374,7 @@ bool Processor::onRemoteCall(const PeerID peerID, methodCallbacks = mMethodsCallbacks.at(methodID); } catch (const std::out_of_range&) { LOGW("No method callback for methodID: " << methodID); - removePeer(peerID, Status::NAUGHTY_PEER); + removePeerInternal(peerID, Status::NAUGHTY_PEER); return true; } @@ -324,7 +384,7 @@ bool Processor::onRemoteCall(const PeerID peerID, data = methodCallbacks->parse(socket.getFD()); } catch (const std::exception& e) { LOGE("Exception during parsing: " << e.what()); - removePeer(peerID, Status::PARSING_ERROR); + removePeerInternal(peerID, Status::PARSING_ERROR); return true; } @@ -334,7 +394,7 @@ bool Processor::onRemoteCall(const PeerID peerID, returnData = methodCallbacks->method(data); } catch (const std::exception& e) { LOGE("Exception in method handler: " << e.what()); - removePeer(peerID, Status::NAUGHTY_PEER); + removePeerInternal(peerID, Status::NAUGHTY_PEER); return true; } @@ -347,7 +407,7 @@ bool Processor::onRemoteCall(const PeerID peerID, methodCallbacks->serialize(socket.getFD(), returnData); } catch (const std::exception& e) { LOGE("Exception during serialization: " << e.what()); - removePeer(peerID, Status::SERIALIZATION_ERROR); + removePeerInternal(peerID, Status::SERIALIZATION_ERROR); return true; } @@ -402,6 +462,20 @@ bool Processor::handleEvent() } return true; } + + case Event::DELETE_PEER: { + LOGD("Event DELETE_PEER"); + RemovePeerRequest request; + { + Lock lock(mSocketsMutex); + request = std::move(mPeersToDelete.front()); + mPeersToDelete.pop(); + } + + removePeerInternal(request.peerID, Status::REMOVED_PEER); + request.conditionPtr->notify_all(); + return true; + } } return false; @@ -447,39 +521,37 @@ bool Processor::handleCall() return false; } - MessageID messageID = getNextMessageID(); - { // Set what to do with the return message Lock lock(mReturnCallbacksMutex); - if (mReturnCallbacks.count(messageID) != 0) { - LOGE("There already was a return callback for messageID: " << messageID); + if (mReturnCallbacks.count(call.messageID) != 0) { + LOGE("There already was a return callback for messageID: " << call.messageID); } // move insertion - mReturnCallbacks[messageID] = std::move(ReturnCallbacks(call.peerID, - std::move(call.parse), - std::move(call.process))); + mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerID, + std::move(call.parse), + std::move(call.process))); } try { // Send the call with the socket Socket::Guard guard = socketPtr->getGuard(); socketPtr->write(&call.methodID, sizeof(call.methodID)); - socketPtr->write(&messageID, sizeof(messageID)); + socketPtr->write(&call.messageID, sizeof(call.messageID)); call.serialize(socketPtr->getFD(), call.data); } catch (const std::exception& e) { LOGE("Error during sending a message: " << e.what()); // Inform about the error - IGNORE_EXCEPTIONS(mReturnCallbacks[messageID].process(Status::SERIALIZATION_ERROR, call.data)); + IGNORE_EXCEPTIONS(mReturnCallbacks[call.messageID].process(Status::SERIALIZATION_ERROR, call.data)); { Lock lock(mReturnCallbacksMutex); - mReturnCallbacks.erase(messageID); + mReturnCallbacks.erase(call.messageID); } - removePeer(call.peerID, Status::SERIALIZATION_ERROR); + removePeerInternal(call.peerID, Status::SERIALIZATION_ERROR); return true; } diff --git a/common/ipc/internals/processor.hpp b/common/ipc/internals/processor.hpp index 3df3f9c..e34bdb6 100644 --- a/common/ipc/internals/processor.hpp +++ b/common/ipc/internals/processor.hpp @@ -65,7 +65,6 @@ const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500; * - Rest: The data written in a callback. One type per method.ReturnCallbacks * * TODO: -* - error codes passed to async callbacks * - remove ReturnCallbacks on peer disconnect * - on sync timeout erase the return callback * - don't throw timeout if the message is already processed @@ -73,12 +72,15 @@ const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500; * - removePeer API function * - error handling - special message type * - some mutexes may not be needed +* - make addPeer synchronous like removePeer */ class Processor { public: typedef std::function PeerCallback; typedef unsigned int PeerID; typedef unsigned int MethodID; + typedef unsigned int MessageID; + /** * Method ID. Used to indicate a message with the return value. @@ -122,6 +124,13 @@ public: PeerID addPeer(const std::shared_ptr& socketPtr); /** + * Request removing peer and wait + * + * @param peerID id of the peer + */ + void removePeer(const PeerID peerID); + + /** * Saves the callbacks connected to the method id. * When a message with the given method id is received, * the data will be passed to the serialization callback through file descriptor. @@ -171,17 +180,16 @@ public: * @tparam ReceivedDataType data type to receive */ template - void callAsync(const MethodID methodID, - const PeerID peerID, - const std::shared_ptr& data, - const typename ResultHandler::type& process); + MessageID callAsync(const MethodID methodID, + const PeerID peerID, + const std::shared_ptr& data, + const typename ResultHandler::type& process); private: typedef std::function& data)> SerializeCallback; typedef std::function(int fd)> ParseCallback; typedef std::lock_guard Lock; - typedef unsigned int MessageID; struct Call { Call(const Call& other) = delete; @@ -195,6 +203,7 @@ private: SerializeCallback serialize; ParseCallback parse; ResultHandler::type process; + MessageID messageID; }; struct MethodHandlers { @@ -238,10 +247,26 @@ private: std::shared_ptr socketPtr; }; + struct RemovePeerRequest { + RemovePeerRequest(const RemovePeerRequest& other) = delete; + RemovePeerRequest& operator=(const RemovePeerRequest&) = delete; + RemovePeerRequest() = default; + RemovePeerRequest(RemovePeerRequest&&) = default; + RemovePeerRequest& operator=(RemovePeerRequest &&) = default; + + RemovePeerRequest(const PeerID peerID, + const std::shared_ptr& conditionPtr) + : peerID(peerID), conditionPtr(conditionPtr) {} + + PeerID peerID; + std::shared_ptr conditionPtr; + }; + enum class Event : int { FINISH, // Shutdown request CALL, // New method call in the queue - NEW_PEER // New peer in the queue + NEW_PEER, // New peer in the queue + DELETE_PEER // Delete peer }; EventQueue mEventQueue; @@ -258,6 +283,7 @@ private: std::mutex mSocketsMutex; std::unordered_map > mSockets; std::queue mNewSockets; + std::queue mPeersToDelete; // Mutex for modifying the map with return callbacks std::mutex mReturnCallbacksMutex; @@ -292,8 +318,8 @@ private: MessageID getNextMessageID(); PeerID getNextPeerID(); Call getCall(); - void removePeer(const PeerID peerID, Status status); - + void removePeerInternal(const PeerID peerID, Status status); + void cleanCommunication(); }; template @@ -336,10 +362,10 @@ void Processor::addMethodHandler(const MethodID methodID, } template -void Processor::callAsync(const MethodID methodID, - const PeerID peerID, - const std::shared_ptr& data, - const typename ResultHandler::type& process) +Processor::MessageID Processor::callAsync(const MethodID methodID, + const PeerID peerID, + const std::shared_ptr& data, + const typename ResultHandler::type& process) { static_assert(config::isVisitable::value, "Use the libConfig library"); @@ -357,6 +383,7 @@ void Processor::callAsync(const MethodID methodID, call.peerID = peerID; call.methodID = methodID; call.data = data; + call.messageID = getNextMessageID(); call.parse = [](const int fd)->std::shared_ptr { std::shared_ptr data(new ReceivedDataType()); @@ -379,6 +406,8 @@ void Processor::callAsync(const MethodID methodID, } mEventQueue.send(Event::CALL); + + return call.messageID; } @@ -400,30 +429,43 @@ std::shared_ptr Processor::callSync(const MethodID methodID, std::shared_ptr result; - std::mutex mtx; - std::unique_lock lck(mtx); + std::mutex mutex; std::condition_variable cv; Status returnStatus = ipc::Status::UNDEFINED; - auto process = [&result, &cv, &returnStatus](Status status, std::shared_ptr returnedData) { + auto process = [&result, &mutex, &cv, &returnStatus](Status status, std::shared_ptr returnedData) { + std::unique_lock lock(mutex); returnStatus = status; result = returnedData; - cv.notify_one(); + cv.notify_all(); }; - callAsync(methodID, - peerID, - data, - process); + MessageID messageID = callAsync(methodID, + peerID, + data, + process); auto isResultInitialized = [&returnStatus]() { return returnStatus != ipc::Status::UNDEFINED; }; - if (!cv.wait_for(lck, std::chrono::milliseconds(timeoutMS), isResultInitialized)) { - LOGE("Function call timeout; methodID: " << methodID); - throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID)); + std::unique_lock lock(mutex); + if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) { + bool isTimeout = false; + { + Lock lock(mReturnCallbacksMutex); + if (1 == mReturnCallbacks.erase(messageID)) { + isTimeout = true; + } + } + if (isTimeout) { + removePeer(peerID); + LOGE("Function call timeout; methodID: " << methodID); + throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID)); + } else { + // Timeout started during the return value processing, so wait for it to finish + cv.wait(lock, isResultInitialized); + } } throwOnError(returnStatus); diff --git a/common/ipc/internals/socket.cpp b/common/ipc/internals/socket.cpp index 002b9cf..4ac977a 100644 --- a/common/ipc/internals/socket.cpp +++ b/common/ipc/internals/socket.cpp @@ -102,8 +102,8 @@ int Socket::getSystemdSocket(const std::string& path) { int n = ::sd_listen_fds(-1 /*Block further calls to sd_listen_fds*/); if (n < 0) { - LOGE("sd_listen_fds fails with errno: " + n); - throw IPCException("sd_listen_fds fails with errno: " + n); + LOGE("sd_listen_fds fails with errno: " << n); + throw IPCException("sd_listen_fds fails with errno: " + std::to_string(n)); } for (int fd = SD_LISTEN_FDS_START; @@ -193,6 +193,14 @@ Socket Socket::connectSocket(const std::string& path) throw IPCException("Error in connect: " + std::string(strerror(errno))); } + // Nonblock socket + int flags = fcntl(fd, F_GETFL, 0); + if (-1 == fcntl(fd, F_SETFL, flags | O_NONBLOCK)) { + ::close(fd); + LOGE("Error in fcntl: " + std::string(strerror(errno))); + throw IPCException("Error in fcntl: " + std::string(strerror(errno))); + } + return Socket(fd); } diff --git a/common/ipc/internals/utils.cpp b/common/ipc/internals/utils.cpp index e98b60d..bb11c80 100644 --- a/common/ipc/internals/utils.cpp +++ b/common/ipc/internals/utils.cpp @@ -30,16 +30,63 @@ #include #include +#include #include - +#include #include #include namespace fs = boost::filesystem; +namespace chr = std::chrono; namespace security_containers { namespace ipc { +namespace { + +void waitForEvent(int fd, + short event, + const chr::high_resolution_clock::time_point deadline) +{ + // Wait for the rest of the data + struct pollfd fds[1]; + fds[0].fd = fd; + fds[0].events = event | POLLHUP; + + for (;;) { + chr::milliseconds timeoutMS = chr::duration_cast(deadline - chr::high_resolution_clock::now()); + if (timeoutMS.count() < 0) { + LOGE("Timeout in read"); + throw IPCException("Timeout in read"); + } + + int ret = ::poll(fds, 1 /*fds size*/, timeoutMS.count()); + + if (ret == -1) { + if (errno == EINTR) { + continue; + } + LOGE("Error in poll: " + std::string(strerror(errno))); + throw IPCException("Error in poll: " + std::string(strerror(errno))); + } + + if (ret == 0) { + LOGE("Timeout in read"); + throw IPCException("Timeout in read"); + } + + if (fds[0].revents & POLLHUP) { + LOGE("Peer disconnected"); + throw IPCException("Peer disconnected"); + } + + // Here Comes the Sun + break; + } +} + +} // namespace + void close(int fd) { if (fd < 0) { @@ -59,46 +106,62 @@ void close(int fd) } } -void write(int fd, const void* bufferPtr, const size_t size) +void write(int fd, const void* bufferPtr, const size_t size, int timeoutMS) { + chr::high_resolution_clock::time_point deadline = chr::high_resolution_clock::now() + + chr::milliseconds(timeoutMS); + size_t nTotal = 0; - int n; + for (;;) { + int n = ::write(fd, + reinterpret_cast(bufferPtr) + nTotal, + size - nTotal); + if (n > 0) { + nTotal += n; + } else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { + // Neglected errors + LOGD("Retrying write"); + } else { + LOGE("Error during reading: " + std::string(strerror(errno))); + throw IPCException("Error during reading: " + std::string(strerror(errno))); + } - do { - n = ::write(fd, - reinterpret_cast(bufferPtr) + nTotal, - size - nTotal); - if (n < 0) { - if (errno == EINTR) { - LOGD("Write interrupted by a signal, retrying"); - continue; - } - LOGE("Error during writing: " + std::string(strerror(errno))); - throw IPCException("Error during witting: " + std::string(strerror(errno))); + if (nTotal >= size) { + // All data is written, break loop + break; + } else { + waitForEvent(fd, POLLOUT, deadline); } - nTotal += n; - } while (nTotal < size); + } } -void read(int fd, void* bufferPtr, const size_t size) +void read(int fd, void* bufferPtr, const size_t size, int timeoutMS) { - size_t nTotal = 0; - int n; + chr::high_resolution_clock::time_point deadline = chr::high_resolution_clock::now() + + chr::milliseconds(timeoutMS); - do { - n = ::read(fd, - reinterpret_cast(bufferPtr) + nTotal, - size - nTotal); - if (n < 0) { - if (errno == EINTR) { - LOGD("Read interrupted by a signal, retrying"); - continue; - } + size_t nTotal = 0; + for (;;) { + int n = ::read(fd, + reinterpret_cast(bufferPtr) + nTotal, + size - nTotal); + if (n > 0) { + nTotal += n; + } else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { + // Neglected errors + LOGD("Retrying read"); + } else { LOGE("Error during reading: " + std::string(strerror(errno))); throw IPCException("Error during reading: " + std::string(strerror(errno))); } - nTotal += n; - } while (nTotal < size); + + if (nTotal >= size) { + // All data is read, break loop + break; + } else { + waitForEvent(fd, POLLIN, deadline); + } + } } unsigned int getMaxFDNumber() diff --git a/common/ipc/internals/utils.hpp b/common/ipc/internals/utils.hpp index 0b1815d..a9a79a0 100644 --- a/common/ipc/internals/utils.hpp +++ b/common/ipc/internals/utils.hpp @@ -42,8 +42,9 @@ void close(int fd); * @param fd file descriptor * @param bufferPtr pointer to the data buffer * @param size size of data to write + * @param timeoutMS timeout in milliseconds */ -void write(int fd, const void* bufferPtr, const size_t size); +void write(int fd, const void* bufferPtr, const size_t size, int timeoutMS = 500); /** * Read from a file descriptor, throw on error. @@ -51,8 +52,9 @@ void write(int fd, const void* bufferPtr, const size_t size); * @param fd file descriptor * @param bufferPtr pointer to the data buffer * @param size size of the data to read + * @param timeoutMS timeout in milliseconds */ -void read(int fd, void* bufferPtr, const size_t size); +void read(int fd, void* bufferPtr, const size_t size, int timeoutMS = 500); /** * @return the max number of file descriptors for this process. diff --git a/common/ipc/types.cpp b/common/ipc/types.cpp index e0ffc5b..bce862c 100644 --- a/common/ipc/types.cpp +++ b/common/ipc/types.cpp @@ -37,6 +37,8 @@ std::string toString(const Status status) case Status::SERIALIZATION_ERROR: return "Exception during writing/serializing data to the socket"; case Status::PEER_DISCONNECTED: return "No such peer. Might got disconnected."; case Status::NAUGHTY_PEER: return "Peer performed a forbidden action."; + case Status::REMOVED_PEER: return "Removing peer"; + case Status::CLOSING: return "Closing IPC"; case Status::UNDEFINED: return "Undefined state"; default: return "Unknown status"; } @@ -56,6 +58,8 @@ void throwOnError(const Status status) case Status::SERIALIZATION_ERROR: throw IPCSerializationException(message); case Status::PEER_DISCONNECTED: throw IPCPeerDisconnectedException(message); case Status::NAUGHTY_PEER: throw IPCNaughtyPeerException(message); + case Status::REMOVED_PEER: throw IPCException(message); + case Status::CLOSING: throw IPCException(message); case Status::UNDEFINED: throw IPCException(message); default: return throw IPCException(message); } diff --git a/common/ipc/types.hpp b/common/ipc/types.hpp index c07e504..1bfaa4e 100644 --- a/common/ipc/types.hpp +++ b/common/ipc/types.hpp @@ -40,6 +40,8 @@ enum class Status : int { SERIALIZATION_ERROR, PEER_DISCONNECTED, NAUGHTY_PEER, + REMOVED_PEER, + CLOSING, UNDEFINED }; diff --git a/tests/unit_tests/ipc/ut-ipc.cpp b/tests/unit_tests/ipc/ut-ipc.cpp index ea70b45..5d284bd 100644 --- a/tests/unit_tests/ipc/ut-ipc.cpp +++ b/tests/unit_tests/ipc/ut-ipc.cpp @@ -71,6 +71,27 @@ struct SendData { ) }; +struct LongSendData { + LongSendData(int i = 0, int waitTime = 1000): mSendData(i), mWaitTime(waitTime), intVal(i) {} + + template + void accept(Visitor visitor) + { + std::this_thread::sleep_for(std::chrono::milliseconds(mWaitTime)); + mSendData.accept(visitor); + } + template + void accept(Visitor visitor) const + { + std::this_thread::sleep_for(std::chrono::milliseconds(mWaitTime)); + mSendData.accept(visitor); + } + + SendData mSendData; + int mWaitTime; + int intVal; +}; + struct EmptyData { CONFIG_REGISTER_EMPTY }; @@ -441,6 +462,45 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerErrorTest) } +BOOST_AUTO_TEST_CASE(ReadTimeoutTest) +{ + Service s(socketPath); + auto longEchoCallback = [](std::shared_ptr& data) { + return std::shared_ptr(new LongSendData(data->intVal)); + }; + s.addMethodHandler(1, longEchoCallback); + s.start(); + + Client c(socketPath); + c.start(); + + // Test timeout on read + std::shared_ptr sentData(new SendData(334)); + BOOST_CHECK_THROW((c.callSync(1, sentData, 100)), IPCException); +} + + +BOOST_AUTO_TEST_CASE(WriteTimeoutTest) +{ + Service s(socketPath); + s.addMethodHandler(1, echoCallback); + s.start(); + + Client c(socketPath); + c.start(); + + // Test echo with a minimal timeout + std::shared_ptr sentDataA(new LongSendData(34, 10 /*ms*/)); + std::shared_ptr recvData = c.callSync(1, sentDataA, 100); + BOOST_CHECK_EQUAL(recvData->intVal, sentDataA->intVal); + + // Test timeout on write + std::shared_ptr sentDataB(new LongSendData(34, 1000 /*ms*/)); + BOOST_CHECK_THROW((c.callSync(1, sentDataB, 100)), IPCTimeoutException); +} + + + // BOOST_AUTO_TEST_CASE(ConnectionLimitTest) // { // unsigned oldLimit = ipc::getMaxFDNumber(); -- 2.7.4