From 55f5ddd4e6d9aa2315fc0b62e87be09e037dc4c1 Mon Sep 17 00:00:00 2001 From: Aleksander Zdyb Date: Fri, 22 May 2015 11:15:31 +0200 Subject: [PATCH] Add feed utility Feed successively reads chunks of data from given fd. It invokes callback in case of: * data read * signal caught * EOD encountered * timeout Change-Id: I1bcdfd51e2a9d90f552587ec337ed35097cd135c --- src/Utils/Feed.cpp | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Utils/Feed.h | 65 ++++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+) create mode 100644 src/Utils/Feed.cpp create mode 100644 src/Utils/Feed.h diff --git a/src/Utils/Feed.cpp b/src/Utils/Feed.cpp new file mode 100644 index 0000000..c8a667c --- /dev/null +++ b/src/Utils/Feed.cpp @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * @file src/Utils/Feed.cpp + * @author Aleksander Zdyb + * @version 1.0 + */ + +#include +#include +#include +#include +#include +#include + +#include + +#include "Feed.h" + +namespace Utils { + +Feed::Feed(int inputFd, size_t readSize, int signalFd) : m_terminate(false), m_inputFd(inputFd), + m_readSize(readSize), m_signalFd(signalFd), + m_waitForever(true) +{ + m_maxFd = std::max(m_inputFd, m_signalFd) + 1; +} + +void Feed::start() { + m_terminate = false; + m_waitForever = true; + + while (m_terminate == false) { + const auto retval = waitForInput(); + + if (retval == InputType::Timeout) { + LOGD("Feed: timeout"); + onTimeout(); + m_waitForever = true; + continue; + } else if (retval == InputType::Error) { + // TODO: Consider invoking a callback here + LOGE("Error in select [errno=" + std::to_string(errno) + "]"); + continue; + } else if (retval == InputType::Signal) { + LOGD("Feed: signal"); + onSignal(m_signalFd); + continue; + } else if (retval == InputType::Input) { + LOGD("Feed: data"); + char dataChunk[m_readSize]; + const auto ret = TEMP_FAILURE_RETRY(read(m_inputFd, dataChunk, m_readSize)); + if (ret > 0) { + onData(dataChunk, ret); + m_waitForever = false; + } else if (ret == 0) { + onEod(); + break; + } else { + // TODO: Consider invoking a callback here + LOGE("Error reading audit input [errno=" + std::to_string(errno) + "]"); + } + } + } +} + +void Feed::stop() { + m_terminate = true; +} + +Feed::InputType Feed::waitForInput() { + fd_set readMask; + FD_ZERO(&readMask); + FD_SET(m_inputFd, &readMask); + FD_SET(m_signalFd, &readMask); + + int selectRet = -1; + if (m_waitForever) { + selectRet = TEMP_FAILURE_RETRY(select(m_maxFd, &readMask, nullptr, nullptr, nullptr)); + } else { + // TODO: Make timeout configurable + struct timeval timeout; + timeout.tv_sec = 5; + timeout.tv_usec = 0; + selectRet = TEMP_FAILURE_RETRY(select(m_maxFd, &readMask, nullptr, nullptr, &timeout)); + } + + if (selectRet > 0) { + if (FD_ISSET(m_signalFd, &readMask)) { + return InputType::Signal; + } else if (FD_ISSET(m_inputFd, &readMask)) { + return InputType::Input; + } + } else if (selectRet == 0) { + return InputType::Timeout; + } + + return InputType::Error; + +} + +} /* namespace Utils */ diff --git a/src/Utils/Feed.h b/src/Utils/Feed.h new file mode 100644 index 0000000..8ceb4c5 --- /dev/null +++ b/src/Utils/Feed.h @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * @file src/Utils/Feed.h + * @author Aleksander Zdyb + * @version 1.0 + */ + +#ifndef SRC_UTILS_FEED_H +#define SRC_UTILS_FEED_H + +#include + +#include + +namespace Utils { + +class Feed { +public: + Feed(int inputFd, size_t readSize, int signalFd); + virtual ~Feed() = default; + + // TODO: Consider using std::function as member, if the application stays single-threaded + boost::signals2::signal onData; + boost::signals2::signal onEod; + boost::signals2::signal onSignal; + boost::signals2::signal onTimeout; + + void start(); + void stop(); + +private: + enum class InputType : int8_t { + Error = -1, + Timeout = 0, + Input, + Signal = 127 + }; + + InputType waitForInput(); + + bool m_terminate; + int m_inputFd; + size_t m_readSize; + int m_signalFd; + bool m_waitForever; + int m_maxFd; +}; + +} /* namespace Utils */ + +#endif /* SRC_UTILS_FEED_H */ -- 2.7.4