#include "ipc/result.hpp"
#include "ipc/epoll/thread-dispatcher.hpp"
#include "ipc/epoll/glib-dispatcher.hpp"
+#include "utils/channel.hpp"
#include "utils/glib-loop.hpp"
#include "utils/latch.hpp"
#include "utils/value-latch.hpp"
#include <chrono>
#include <utility>
#include <future>
+#include <semaphore.h>
#include <sys/types.h>
#include <sys/stat.h>
+#include <sys/wait.h>
#include <fcntl.h>
methodResult->set(returnData);
}
-PeerID connect(Service& s, Client& c)
+PeerID connectPeer(Service& s, Client& c)
{
// Connects the Client to the Service and returns Clients PeerID
ValueLatch<PeerID> peerIDLatch;
s.setMethodHandler<SendData, RecvData>(2, returnDataCallback);
Client c(F::getPoll(), SOCKET_PATH);
- connect(s, c);
+ connectPeer(s, c);
testEcho(c, 1);
s.removeMethod(1);
c.setMethodHandler<EmptyData, EmptyData>(1, returnEmptyCallback);
c.setMethodHandler<SendData, RecvData>(1, returnDataCallback);
- PeerID peerID = connect(s, c);
+ PeerID peerID = connectPeer(s, c);
c.setMethodHandler<SendData, RecvData>(1, echoCallback);
c.setMethodHandler<SendData, RecvData>(2, returnDataCallback);
BOOST_CHECK_THROW(testEcho(s, 1, peerID), IPCException);
}
+MULTI_FIXTURE_TEST_CASE(MethodResultGetPeerID, F, ThreadedFixture, GlibFixture)
+{
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
+
+ PeerID peerID = connectPeer(s, c);
+
+ s.setMethodHandler<SendData, RecvData>(
+ 1,
+ [&peerID](const PeerID,
+ std::shared_ptr<RecvData>&,
+ MethodResult::Pointer methodResult) {
+ methodResult->setVoid();
+ BOOST_CHECK_EQUAL(peerID, methodResult->getPeerID());
+ }
+ );
+
+ std::shared_ptr<SendData> sentData(new SendData(32));
+ std::shared_ptr<RecvData> recvData = c.callSync<SendData, RecvData>(1,
+ sentData,
+ TIMEOUT);
+}
+
MULTI_FIXTURE_TEST_CASE(ServiceStartStop, F, ThreadedFixture, GlibFixture)
{
Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler<SendData, RecvData>(2, echoCallback);
Client c(F::getPoll(), SOCKET_PATH);
- connect(s, c);
+ connectPeer(s, c);
testEcho(c, 1);
testEcho(c, 2);
Service s(F::getPoll(), SOCKET_PATH);
Client c(F::getPoll(), SOCKET_PATH);
c.setMethodHandler<SendData, RecvData>(1, echoCallback);
- PeerID peerID = connect(s, c);
+ PeerID peerID = connectPeer(s, c);
std::shared_ptr<SendData> sentData(new SendData(56));
std::shared_ptr<RecvData> recvData = s.callSync<SendData, RecvData>(1, peerID, sentData);
Service s(F::getPoll(), SOCKET_PATH);
Client c(F::getPoll(), SOCKET_PATH);
c.setMethodHandler<SendData, RecvData>(1, echoCallback);
- PeerID peerID = connect(s, c);
+ PeerID peerID = connectPeer(s, c);
// Async call
auto dataBack = [&recvDataLatch](Result<RecvData> && r) {
s.setMethodHandler<SendData, RecvData>(1, longEchoCallback);
Client c(F::getPoll(), SOCKET_PATH);
- connect(s, c);
+ connectPeer(s, c);
std::shared_ptr<SendData> sentData(new SendData(78));
BOOST_REQUIRE_THROW((c.callSync<SendData, RecvData>(1, sentData, TIMEOUT)), IPCException);
s.setMethodHandler<SendData, RecvData>(1, echoCallback);
Client c(F::getPoll(), SOCKET_PATH);
- connect(s, c);
+ connectPeer(s, c);
std::shared_ptr<ThrowOnAcceptData> throwingData(new ThrowOnAcceptData());
s.setMethodHandler<LongSendData, RecvData>(1, longEchoCallback);
Client c(F::getPoll(), SOCKET_PATH);
- connect(s, c);
+ connectPeer(s, c);
// Test timeout on read
std::shared_ptr<SendData> sentData(new SendData(334));
Service s(F::getPoll(), SOCKET_PATH);
Client c(F::getPoll(), SOCKET_PATH);
- connect(s, c);
+ connectPeer(s, c);
auto handlerA = [&recvDataLatchA](const PeerID, std::shared_ptr<RecvData>& data) {
recvDataLatchA.set(data);
c.setSignalHandler<RecvData>(1, handlerA);
c.setSignalHandler<RecvData>(2, handlerB);
- connect(s, c);
+ connectPeer(s, c);
// Wait for the information about the signals to propagate
std::this_thread::sleep_for(std::chrono::milliseconds(TIMEOUT));
Service s(F::getPoll(), SOCKET_PATH);
Client c(F::getPoll(), SOCKET_PATH);
- auto clientID = connect(s, c);
+ auto clientID = connectPeer(s, c);
auto throwingMethodHandler = [&](const PeerID, std::shared_ptr<RecvData>&, MethodResult::Pointer) {
throw IPCUserException(TEST_ERROR_CODE, TEST_ERROR_MESSAGE);
Service s(F::getPoll(), SOCKET_PATH);
Client c(F::getPoll(), SOCKET_PATH);
- auto clientID = connect(s, c);
+ auto clientID = connectPeer(s, c);
auto errorMethodHandler = [&](const PeerID, std::shared_ptr<RecvData>&, MethodResult::Pointer methodResult) {
std::async(std::launch::async, [&, methodResult] {
Client c(F::getPoll(), SOCKET_PATH);
s.setSignalHandler<RecvData>(2, signalHandler);
- connect(s, c);
+ connectPeer(s, c);
testEcho(c, 1);
s.setMethodHandler<FDData, EmptyData>(1, methodHandler);
Client c(F::getPoll(), SOCKET_PATH);
- connect(s, c);
+ connectPeer(s, c);
std::shared_ptr<FDData> fdData;
std::shared_ptr<EmptyData> sentData(new EmptyData());
::close(fdData->fd.value);
}
-// MULTI_FIXTURE_TEST_CASE(ConnectionLimit, F, ThreadedFixture, GlibFixture)
-// {
-// unsigned oldLimit = ipc::getMaxFDNumber();
-// ipc::setMaxFDNumber(50);
-
-// // Setup Service and many Clients
-// Service s(F::getPoll(), SOCKET_PATH);
-// s.setMethodHandler<SendData, RecvData>(1, echoCallback);
-// s.start();
-
-// std::list<Client> clients;
-// for (int i = 0; i < 100; ++i) {
-// try {
-// clients.push_back(Client(F::getPoll(), SOCKET_PATH));
-// clients.back().start();
-// } catch (...) {}
-// }
-
-// unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
-// std::mt19937 generator(seed);
-// for (auto it = clients.begin(); it != clients.end(); ++it) {
-// try {
-// std::shared_ptr<SendData> sentData(new SendData(generator()));
-// std::shared_ptr<RecvData> recvData = it->callSync<SendData, RecvData>(1, sentData);
-// BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
-// } catch (...) {}
-// }
-
-// ipc::setMaxFDNumber(oldLimit);
-// }
+BOOST_AUTO_TEST_CASE(ConnectionLimit)
+{
+ const unsigned oldLimit = utils::getMaxFDNumber();
+ const unsigned newLimit = 32;
+ ScopedDir scopedDir(TEST_DIR);
+
+ Channel c;
+
+ const pid_t chpid = ::fork();
+ BOOST_CHECK_NE(chpid, -1);
+
+ if (chpid) {
+ // Setup Service
+ ThreadDispatcher td;
+ Service s(td.getPoll(), SOCKET_PATH);
+ s.setMethodHandler<SendData, RecvData>(1, echoCallback);
+ s.start();
+
+ c.setLeft();
+ try {
+ // inform the Client
+ c.write(true);
+ } catch (...) {
+ kill(chpid, 9);
+ throw;
+ }
+
+ int status;
+ BOOST_CHECK_EQUAL(::waitpid(chpid, &status, 0), chpid);
+ BOOST_CHECK_EQUAL(status, EXIT_SUCCESS);
+ } else {
+ int ret = EXIT_FAILURE;
+ utils::setMaxFDNumber(newLimit);
+
+ c.setRight();
+ try {
+ // wait for the Service
+ c.read<char>();
+ } catch(...) {
+ ::_exit(EXIT_FAILURE);
+ }
+
+ // Setup Clients
+ ThreadDispatcher td;
+ std::list<Client> clients;
+ try {
+ for (unsigned i = 0; i < 2 * newLimit; ++i) {
+ clients.emplace_back(td.getPoll(), SOCKET_PATH);
+ clients.back().start();
+ }
+ } catch (const EventFDException& e) {
+ ret = EXIT_SUCCESS;
+ } catch (const IPCSocketException& e) {
+ if (e.getCode() == EMFILE) {
+ ret = EXIT_SUCCESS;
+ }
+ }
+
+ utils::setMaxFDNumber(oldLimit);
+
+ ::_exit(ret);
+ }
+}