#include <unistd.h>
#include <chrono>
#include <poll.h>
+#include <sys/socket.h>
namespace config {
void FDStore::write(const void* bufferPtr, const size_t size, const unsigned int timeoutMS)
{
- std::chrono::high_resolution_clock::time_point deadline = std::chrono::high_resolution_clock::now() +
- std::chrono::milliseconds(timeoutMS);
+ std::chrono::high_resolution_clock::time_point deadline =
+ std::chrono::high_resolution_clock::now() +
+ std::chrono::milliseconds(timeoutMS);
size_t nTotal = 0;
for (;;) {
- int n = ::write(mFD,
- reinterpret_cast<const char*>(bufferPtr) + nTotal,
- size - nTotal);
- if (n >= 0) {
+ ssize_t n = ::write(mFD,
+ reinterpret_cast<const char*>(bufferPtr) + nTotal,
+ size - nTotal);
+ if (n < 0) {
+ // Handle errors
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+ // Neglected errors
+ } else {
+ throw ConfigException("Error during writing: " + getSystemErrorMessage());
+ }
+ } else {
nTotal += n;
if (nTotal == size) {
- // All data is written, break loop
+ // All data is read, break loop
break;
}
- } else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
- // Neglected errors
- } else {
- throw ConfigException("Error during writing: " + getSystemErrorMessage());
}
waitForEvent(mFD, POLLOUT, deadline);
void FDStore::read(void* bufferPtr, const size_t size, const unsigned int timeoutMS)
{
- std::chrono::high_resolution_clock::time_point deadline = std::chrono::high_resolution_clock::now() +
- std::chrono::milliseconds(timeoutMS);
+ std::chrono::high_resolution_clock::time_point deadline =
+ std::chrono::high_resolution_clock::now() +
+ std::chrono::milliseconds(timeoutMS);
size_t nTotal = 0;
for (;;) {
- int n = ::read(mFD,
- reinterpret_cast<char*>(bufferPtr) + nTotal,
- size - nTotal);
- if (n >= 0) {
+ ssize_t n = ::read(mFD,
+ reinterpret_cast<char*>(bufferPtr) + nTotal,
+ size - nTotal);
+ if (n < 0) {
+ // Handle errors
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+ // Neglected errors
+ } else {
+ throw ConfigException("Error during reading: " + getSystemErrorMessage());
+ }
+ } else {
nTotal += n;
if (nTotal == size) {
// All data is read, break loop
if (n == 0) {
throw ConfigException("Peer disconnected");
}
- } else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
- // Neglected errors
+ }
+
+ waitForEvent(mFD, POLLIN, deadline);
+ }
+}
+
+
+void FDStore::sendFD(int fd, const unsigned int timeoutMS)
+{
+ std::chrono::high_resolution_clock::time_point deadline =
+ std::chrono::high_resolution_clock::now() +
+ std::chrono::milliseconds(timeoutMS);
+
+ // Space for the file descriptor
+ union {
+ struct cmsghdr cmh;
+ char control[CMSG_SPACE(sizeof(int))];
+ } controlUnion;
+
+ // Ensure at least 1 byte is transmited via the socket
+ struct iovec iov;
+ char buf = '!';
+ iov.iov_base = &buf;
+ iov.iov_len = sizeof(char);
+
+ // Fill the message to send:
+ // The socket has to be connected, so we don't need to specify the name
+ struct msghdr msgh;
+ ::memset(&msgh, 0, sizeof(msgh));
+
+ // Only iovec to transmit one element
+ msgh.msg_iov = &iov;
+ msgh.msg_iovlen = 1;
+
+ // Ancillary data buffer
+ msgh.msg_control = controlUnion.control;
+ msgh.msg_controllen = sizeof(controlUnion.control);
+
+ // Describe the data that we want to send
+ struct cmsghdr *cmhp;
+ cmhp = CMSG_FIRSTHDR(&msgh);
+ cmhp->cmsg_len = CMSG_LEN(sizeof(int));
+ cmhp->cmsg_level = SOL_SOCKET;
+ cmhp->cmsg_type = SCM_RIGHTS;
+ *(reinterpret_cast<int*>(CMSG_DATA(cmhp))) = fd;
+
+ // Send
+ for(;;) {
+ ssize_t ret = ::sendmsg(mFD, &msgh, MSG_NOSIGNAL);
+ if (ret < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+ // Neglected errors, retry
+ } else {
+ throw ConfigException("Error during sendmsg: " + getSystemErrorMessage());
+ }
+ } else if (ret == 0) {
+ // Retry the sending
} else {
- throw ConfigException("Error during reading: " + getSystemErrorMessage());
+ // We send only 1 byte of data. No need to repeat
+ break;
+ }
+
+ waitForEvent(mFD, POLLOUT, deadline);
+ }
+}
+
+
+int FDStore::receiveFD(const unsigned int timeoutMS)
+{
+ std::chrono::high_resolution_clock::time_point deadline =
+ std::chrono::high_resolution_clock::now() +
+ std::chrono::milliseconds(timeoutMS);
+
+ // Space for the file descriptor
+ union {
+ struct cmsghdr cmh;
+ char control[CMSG_SPACE(sizeof(int))];
+ } controlUnion;
+
+ // Describe the data that we want to recive
+ controlUnion.cmh.cmsg_len = CMSG_LEN(sizeof(int));
+ controlUnion.cmh.cmsg_level = SOL_SOCKET;
+ controlUnion.cmh.cmsg_type = SCM_RIGHTS;
+
+ // Setup the input buffer
+ // Ensure at least 1 byte is transmited via the socket
+ char buf;
+ struct iovec iov;
+ iov.iov_base = &buf;
+ iov.iov_len = sizeof(char);
+
+ // Set the ancillary data buffer
+ // The socket has to be connected, so we don't need to specify the name
+ struct msghdr msgh;
+ ::memset(&msgh, 0, sizeof(msgh));
+
+ msgh.msg_iov = &iov;
+ msgh.msg_iovlen = 1;
+
+ msgh.msg_control = controlUnion.control;
+ msgh.msg_controllen = sizeof(controlUnion.control);
+
+ // Receive
+ for(;;) {
+ ssize_t ret = ::recvmsg(mFD, &msgh, MSG_WAITALL);
+ if (ret < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+ // Neglected errors, retry
+ } else {
+ throw ConfigException("Error during recvmsg: " + getSystemErrorMessage());
+ }
+ } else if (ret == 0) {
+ throw ConfigException("Peer disconnected");
+ } else {
+ // We receive only 1 byte of data. No need to repeat
+ break;
}
waitForEvent(mFD, POLLIN, deadline);
}
+
+ struct cmsghdr *cmhp;
+ cmhp = CMSG_FIRSTHDR(&msgh);
+ if (cmhp == NULL || cmhp->cmsg_len != CMSG_LEN(sizeof(int))) {
+ throw ConfigException("Bad cmsg length");
+ } else if (cmhp->cmsg_level != SOL_SOCKET) {
+ throw ConfigException("cmsg_level != SOL_SOCKET");
+ } else if (cmhp->cmsg_type != SCM_RIGHTS) {
+ throw ConfigException("cmsg_type != SCM_RIGHTS");
+ }
+
+ return *(reinterpret_cast<int*>(CMSG_DATA(cmhp)));
}
+
} // namespace config
#include <cstddef>
+namespace {
+const unsigned int maxTimeout = 5000;
+} // namespace
+
namespace config {
class FDStore {
* @param size size of the buffer
* @param timeoutMS timeout in milliseconds
*/
- void write(const void* bufferPtr, const size_t size, const unsigned int timeoutMS = 5000);
+ void write(const void* bufferPtr, const size_t size, const unsigned int timeoutMS = maxTimeout);
/**
* Reads a value of the given type.
* @param size size of the buffer
* @param timeoutMS timeout in milliseconds
*/
- void read(void* bufferPtr, const size_t size, const unsigned int timeoutMS = 5000);
+ void read(void* bufferPtr, const size_t size, const unsigned int timeoutMS = maxTimeout);
+
+ void sendFD(int fd, const unsigned int timeoutMS = maxTimeout);
+
+ int receiveFD(const unsigned int timeoutMS = maxTimeout);
private:
int mFD;
#include <boost/preprocessor/variadic/to_list.hpp>
#include <boost/preprocessor/list/for_each.hpp>
+#include "config/types.hpp"
+
#if BOOST_PP_VARIADICS != 1
#error variadic macros not supported
#endif
#include "config/is-visitable.hpp"
#include "config/fdstore.hpp"
+#include "config/types.hpp"
#include <string>
mStore.read(&value.front(), size);
}
+ void readInternal(config::FileDescriptor& fd)
+ {
+ fd = mStore.receiveFD();
+ }
+
template<typename T, typename std::enable_if<std::is_arithmetic<T>::value, int>::type = 0>
void readInternal(T& value)
{
#include "config/is-visitable.hpp"
#include "config/fdstore.hpp"
+#include "config/types.hpp"
#include <string>
mStore.write(value.c_str(), value.size());
}
+ void writeInternal(const config::FileDescriptor& fd)
+ {
+ mStore.sendFD(fd.value);
+ }
+
template<typename T, typename std::enable_if<std::is_arithmetic<T>::value, int>::type = 0>
void writeInternal(const T& value)
{
--- /dev/null
+/*
+ * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Jan Olszak <j.olszak@samsung.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Types declarations
+ */
+
+#ifndef COMMON_CONFIG_TYPES_HPP
+#define COMMON_CONFIG_TYPES_HPP
+
+namespace config {
+
+/**
+ * Whenever possible, this type will be serialized using Linux file descriptor passing.
+ */
+struct FileDescriptor {
+ int value;
+ FileDescriptor(int fd = -1): value(fd) {}
+ FileDescriptor& operator=(const int fd) {
+ value = fd;
+ return *this;
+ }
+};
+
+}// config
+
+#endif //COMMON_CONFIG_TYPES_HPP
\ No newline at end of file
}
}
-}
+} // namespace
Socket::Socket(int socketFD)
: mFD(socketFD)
utils::read(mFD, bufferPtr, size);
}
-int Socket::getSystemdSocket(const std::string& path)
+int Socket::getSystemdSocketInternal(const std::string& path)
{
int n = ::sd_listen_fds(-1 /*Block further calls to sd_listen_fds*/);
if (n < 0) {
return -1;
}
-int Socket::createZoneSocket(const std::string& path)
+int Socket::createSocketInternal(const std::string& path)
{
// Isn't the path too long?
if (path.size() >= sizeof(sockaddr_un::sun_path)) {
Socket Socket::createSocket(const std::string& path)
{
// Initialize a socket
- int fd = getSystemdSocket(path);
- fd = fd != -1 ? fd : createZoneSocket(path);
+ int fd = getSystemdSocketInternal(path);
+ fd = fd != -1 ? fd : createSocketInternal(path);
return Socket(fd);
}
int mFD;
mutable std::recursive_mutex mCommunicationMutex;
- static int createZoneSocket(const std::string& path);
- static int getSystemdSocket(const std::string& path);
+ static int createSocketInternal(const std::string& path);
+ static int getSystemdSocketInternal(const std::string& path);
};
} // namespace ipc
#include "config/fields.hpp"
#include "logger/logger.hpp"
+#include <boost/filesystem.hpp>
+#include <fstream>
#include <atomic>
#include <string>
#include <thread>
#include <chrono>
#include <utility>
#include <future>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
using namespace ipc;
using namespace epoll;
using namespace utils;
using namespace std::placeholders;
+namespace fs = boost::filesystem;
// Timeout for sending one message
const int TIMEOUT = 1000 /*ms*/;
const std::string TEST_DIR = "/tmp/ut-ipc";
const std::string SOCKET_PATH = TEST_DIR + "/test.socket";
+const std::string TEST_FILE = TEST_DIR + "/file.txt";
struct FixtureBase {
ScopedDir mTestPathGuard;
struct ThreadedFixture : FixtureBase {
ThreadDispatcher dispatcher;
- EventPoll& getPoll() { return dispatcher.getPoll(); }
+ EventPoll& getPoll() {
+ return dispatcher.getPoll();
+ }
};
struct GlibFixture : FixtureBase {
ScopedGlibLoop glibLoop;
GlibDispatcher dispatcher;
- EventPoll& getPoll() { return dispatcher.getPoll(); }
+ EventPoll& getPoll() {
+ return dispatcher.getPoll();
+ }
};
struct SendData {
)
};
+struct FDData {
+ config::FileDescriptor fd;
+ FDData(int fd = -1): fd(fd) {}
+
+ CONFIG_REGISTER
+ (
+ fd
+ )
+};
+
struct LongSendData {
LongSendData(int i, int waitTime): mSendData(i), mWaitTime(waitTime), intVal(i) {}
BOOST_CHECK(l.wait(TIMEOUT));
}
+MULTI_FIXTURE_TEST_CASE(FDSendReceive, F, ThreadedFixture, GlibFixture)
+{
+ const char DATA[] = "Content of the file";
+ {
+ // Fill the file
+ fs::remove(TEST_FILE);
+ std::ofstream file(TEST_FILE);
+ file << DATA;
+ file.close();
+ }
+
+ auto methodHandler = [&](const PeerID, std::shared_ptr<EmptyData>&, MethodResult::Pointer methodResult) {
+ int fd = ::open(TEST_FILE.c_str(), O_RDONLY);
+ auto returnData = std::make_shared<FDData>(fd);
+ methodResult->set(returnData);
+ };
+
+ Service s(F::getPoll(), SOCKET_PATH);
+ s.setMethodHandler<FDData, EmptyData>(1, methodHandler);
+
+ Client c(F::getPoll(), SOCKET_PATH);
+ connect(s, c);
+
+ std::shared_ptr<FDData> fdData;
+ std::shared_ptr<EmptyData> sentData(new EmptyData());
+ fdData = c.callSync<EmptyData, FDData>(1, sentData, TIMEOUT);
+
+ // Use the file descriptor
+ char buffer[sizeof(DATA)];
+ BOOST_REQUIRE(::read(fdData->fd.value, buffer, sizeof(buffer))>0);
+ BOOST_REQUIRE(strncmp(DATA, buffer, strlen(DATA))==0);
+ ::close(fdData->fd.value);
+}
+
// MULTI_FIXTURE_TEST_CASE(ConnectionLimit, F, ThreadedFixture, GlibFixture)
// {
// unsigned oldLimit = ipc::getMaxFDNumber();