From: Sangwan Kwon Date: Tue, 14 Jan 2020 04:16:09 +0000 (+0900) Subject: osquery: Remove dispatcher X-Git-Tag: submit/tizen/20200810.073515~99^2~3 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=3733b9a9040ef974e0db266e0f25de25f1a4a932;p=platform%2Fcore%2Fsecurity%2Fvist.git osquery: Remove dispatcher Signed-off-by: Sangwan Kwon --- diff --git a/src/osquery/CMakeLists.txt b/src/osquery/CMakeLists.txt index 850c3cf..c0e05fe 100644 --- a/src/osquery/CMakeLists.txt +++ b/src/osquery/CMakeLists.txt @@ -45,7 +45,6 @@ ENDIF(DEFINED GBS_BUILD) ## osquery v4.0.0 ADD_SUBDIRECTORY(core) ADD_SUBDIRECTORY(database) -ADD_SUBDIRECTORY(dispatcher) ADD_SUBDIRECTORY(events) ADD_SUBDIRECTORY(filesystem) ADD_SUBDIRECTORY(logger) diff --git a/src/osquery/core/init.cpp b/src/osquery/core/init.cpp index 53bdf24..be40f24 100644 --- a/src/osquery/core/init.cpp +++ b/src/osquery/core/init.cpp @@ -35,7 +35,6 @@ #include "osquery/utils/info/platform_type.h" #include #include -#include #include #include #include @@ -123,8 +122,6 @@ void signalHandler(int num) { // Restore the default signal handler. std::signal(num, SIG_DFL); - - osquery::Dispatcher::stopServices(); } } @@ -510,8 +507,6 @@ void Initializer::waitForShutdown() { } } - // Attempt to be the only place in code where a join is attempted. - Dispatcher::joinServices(); // End any event type run loops. EventFactory::end(true); @@ -537,7 +532,6 @@ void Initializer::requestShutdown(int retcode) { // it is NOT waiting for a shutdown. // Exceptions include: tight request / wait in an exception handler or // custom signal handling. - Dispatcher::stopServices(); waitForShutdown(); } } diff --git a/src/osquery/core/tests/posix/permissions_tests.cpp b/src/osquery/core/tests/posix/permissions_tests.cpp deleted file mode 100644 index 6bc3b7b..0000000 --- a/src/osquery/core/tests/posix/permissions_tests.cpp +++ /dev/null @@ -1,294 +0,0 @@ -/** - * Copyright (c) 2014-present, Facebook, Inc. - * All rights reserved. - * - * This source code is licensed in accordance with the terms specified in - * the LICENSE file found in the root directory of this source tree. - */ - -#include -#include - -#include - -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include "osquery/filesystem/fileops.h" -#include "osquery/tests/test_util.h" - -namespace fs = boost::filesystem; - -namespace osquery { - -class PermissionsTests : public testing::Test { - public: - PermissionsTests() - : perm_path_(fs::temp_directory_path() / - fs::unique_path("lowperms.%%%%.%%%%")) {} - - void SetUp() override { - fs::create_directories(perm_path_); - } - - void TearDown() override { - fs::remove_all(perm_path_); - } - - protected: - fs::path perm_path_; -}; - -TEST_F(PermissionsTests, test_explicit_drop) { - { - auto dropper = DropPrivileges::get(); - EXPECT_TRUE(dropper->dropTo(getuid(), getgid())); - // We can attempt to drop to the previously-dropped privilege. - EXPECT_TRUE(dropper->dropTo(getuid(), getgid())); - } - - { - auto dropper = DropPrivileges::get(); - // Make sure that an out-of-scope dropper "restore" - EXPECT_FALSE(dropper->dropped_); - - uid_t expected_user = 0U; - EXPECT_EQ(dropper->to_user_, expected_user); - - gid_t expected_group = 0U; - EXPECT_EQ(dropper->to_group_, expected_group); - - // Checking if we are generally in an unprivileged mode. - auto dropper2 = DropPrivileges::get(); - EXPECT_FALSE(dropper2->dropped()); - } -} - -TEST_F(PermissionsTests, test_path_drop) { - if (getuid() != 0) { - LOG(WARNING) << "Not root, skipping (path) unprivileged testing"; - return; - } - - // Attempt to drop to nobody based on ownership of paths. - auto nobody = getpwnam("nobody"); - ASSERT_NE(nobody, nullptr); - - { - int status = - chown(perm_path_.string().c_str(), nobody->pw_uid, nobody->pw_gid); - ASSERT_EQ(status, 0); - - auto dropper = DropPrivileges::get(); - EXPECT_TRUE(dropper->dropToParent((perm_path_ / "ro").string())); - EXPECT_TRUE(dropper->dropped_); - EXPECT_EQ(dropper->to_user_, nobody->pw_uid); - - // Dropping "up" to root should fail. - // Even though this is possible and may make sense, it is confusing! - EXPECT_FALSE(dropper->dropTo(0, 0)); - - // Make sure the dropper worked! - EXPECT_EQ(geteuid(), nobody->pw_uid); - } - - // Now that the dropper is gone, the effective user/group should be restored. - EXPECT_EQ(geteuid(), getuid()); - EXPECT_EQ(getegid(), getgid()); -} - -TEST_F(PermissionsTests, test_functional_drop) { - if (getuid() != 0) { - LOG(WARNING) << "Not root, skipping (explicit) unprivileged testing"; - return; - } - - auto file_path = kTestWorkingDirectory + "permissions-file2"; - - { - writeTextFile(file_path, "data"); - ASSERT_TRUE(platformChmod(file_path, 0400)); - } - - { - auto nobody = getpwnam("nobody"); - auto dropper = DropPrivileges::get(); - dropper->dropTo(nobody->pw_uid, nobody->pw_gid); - PlatformFile fd(file_path, PF_OPEN_EXISTING | PF_READ); - EXPECT_FALSE(fd.isValid()); - } - - osquery::removePath(file_path); -} - -TEST_F(PermissionsTests, test_nobody_drop) { - if (getuid() != 0) { - LOG(WARNING) << "Not root, skipping (explicit) unprivileged testing"; - return; - } - - // Attempt to drop to nobody. - auto nobody = getpwnam("nobody"); - ASSERT_NE(nobody, nullptr); - - { - auto dropper = DropPrivileges::get(); - EXPECT_TRUE(dropper->dropTo(nobody->pw_uid, nobody->pw_gid)); - EXPECT_EQ(geteuid(), nobody->pw_uid); - } - - { - auto dropper = DropPrivileges::get(); - EXPECT_TRUE(dropper->dropTo(std::to_string(nobody->pw_uid), - std::to_string(nobody->pw_gid))); - EXPECT_EQ(geteuid(), nobody->pw_uid); - } - - // Now that the dropper is gone, the effective user/group should be restored. - EXPECT_EQ(geteuid(), getuid()); - EXPECT_EQ(getegid(), getgid()); -} - -std::string kMultiThreadPermissionPath; - -class PermissionsRunnable : public InternalRunnable { - public: - PermissionsRunnable() : InternalRunnable("PermissionsRunnable") {} - PermissionsRunnable(const std::string& name) : InternalRunnable(name) {} - - private: - virtual void start() override { - while (!interrupted()) { - if (!writeTextFile(kMultiThreadPermissionPath, "test")) { - throw std::runtime_error("Cannot write " + kMultiThreadPermissionPath); - } - ticks++; - } - } - - public: - std::atomic ticks{0}; -}; - -class PermissionsPollRunnable : public PermissionsRunnable { - public: - PermissionsPollRunnable() : PermissionsRunnable("PermissionsPollRunnable") {} - - private: - void start() override { - PlatformFile file(kMultiThreadPermissionPath, - PF_OPEN_EXISTING | PF_READ | PF_NONBLOCK); - auto file_fd = file.nativeHandle(); - - struct pollfd fds[1]; - while (!interrupted()) { - std::memset(fds, 0, sizeof(fds)); - fds[0].fd = file_fd; - - result = poll(fds, 1, 1); - if (result == 0) { - ticks++; - } - } - } - - public: - std::atomic result; -}; - -bool waitForTick(const std::shared_ptr& runnable) { - size_t now = runnable->ticks; - size_t timeout = 1000; - size_t delay = 0; - while (delay < timeout) { - sleepFor(20); - if (runnable->ticks > now) { - return true; - } - sleepFor(200); - delay += 220; - } - return false; -} - -TEST_F(PermissionsTests, test_multi_thread_permissions) { - if (getuid() != 0) { - LOG(WARNING) << "Not root, skipping multi-thread deprivilege testing"; - return; - } - - ASSERT_EQ(0U, geteuid()); - - // Set the multi-thread path, which both threads will write into. - auto multi_thread_path = perm_path_ / "threadperms.txt"; - kMultiThreadPermissionPath = multi_thread_path.string(); - - // This thread has super-user permissions. - ASSERT_TRUE(writeTextFile(kMultiThreadPermissionPath, "test", 600)); - - // Start our permissions thread. - auto perms_thread = std::make_shared(); - Dispatcher::addService(perms_thread); - - // Wait for the permissions thread to write once. - EXPECT_TRUE(waitForTick(perms_thread)); - - // Attempt to drop to nobody. - auto nobody = getpwnam("nobody"); - EXPECT_NE(nobody, nullptr); - - { - auto dropper = DropPrivileges::get(); - EXPECT_TRUE(dropper->dropTo(nobody->pw_uid, nobody->pw_gid)); - EXPECT_EQ(geteuid(), nobody->pw_uid); - - // Now we wait for the permissions thread to write once while this thread's - // permissions are dropped. - EXPECT_TRUE(waitForTick(perms_thread)); - } - - Dispatcher::stopServices(); - Dispatcher::joinServices(); -} - -TEST_F(PermissionsTests, test_multi_thread_poll) { - if (getuid() != 0) { - LOG(WARNING) << "Not root, skipping multi-thread deprivilege testing"; - return; - } - - ASSERT_EQ(0U, geteuid()); - - // Set the multi-thread path, which both threads will write into. - auto multi_thread_path = perm_path_ / "threadperms.txt"; - kMultiThreadPermissionPath = multi_thread_path.string(); - - // Start our permissions thread. - auto pool_thread = std::make_shared(); - Dispatcher::addService(pool_thread); - - // Wait for the permissions thread to write once. - EXPECT_TRUE(waitForTick(pool_thread)); - - auto nobody = getpwnam("nobody"); - EXPECT_NE(nobody, nullptr); - { - auto dropper = DropPrivileges::get(); - EXPECT_TRUE(dropper->dropTo(nobody->pw_uid, nobody->pw_gid)); - EXPECT_EQ(geteuid(), nobody->pw_uid); - - EXPECT_TRUE(waitForTick(pool_thread)); - } - - Dispatcher::stopServices(); - Dispatcher::joinServices(); -} -} diff --git a/src/osquery/dispatcher/CMakeLists.txt b/src/osquery/dispatcher/CMakeLists.txt deleted file mode 100644 index 82b34a2..0000000 --- a/src/osquery/dispatcher/CMakeLists.txt +++ /dev/null @@ -1,19 +0,0 @@ -# Copyright (c) 2019 Samsung Electronics Co., Ltd All Rights Reserved -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License - -ADD_OSQUERY_LIBRARY(osquery_dispatcher dispatcher.cpp - scheduler.cpp) - -FILE(GLOB OSQUERY_DISPATCHER_TESTS "tests/*.cpp") -ADD_OSQUERY_TEST(${OSQUERY_DISPATCHER_TESTS}) diff --git a/src/osquery/dispatcher/dispatcher.cpp b/src/osquery/dispatcher/dispatcher.cpp deleted file mode 100644 index a634855..0000000 --- a/src/osquery/dispatcher/dispatcher.cpp +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Copyright (c) 2014-present, Facebook, Inc. - * All rights reserved. - * - * This source code is licensed in accordance with the terms specified in - * the LICENSE file found in the root directory of this source tree. - */ - -#include -#include -#include -#include - -namespace osquery { - -/// The worker_threads define the default thread pool size. -FLAG(int32, worker_threads, 4, "Number of work dispatch threads"); - - -void InterruptableRunnable::interrupt() { - // Set the service as interrupted. - if (!interrupted_.exchange(true)) { - // Tear down the service's resources such that exiting the expected run - // loop within ::start does not need to. - stop(); - std::lock_guard lock(condition_lock); - // Cancel the run loop's pause request. - condition_.notify_one(); - } -} - -bool InterruptableRunnable::interrupted() { - return interrupted_; -} - -void InterruptableRunnable::pause(std::chrono::milliseconds milli) { - std::unique_lock lock(condition_lock); - if (!interrupted_) { - condition_.wait_for(lock, milli); - } -} - -void InternalRunnable::run() { - run_ = true; - setThreadName(name()); - start(); - - // The service is complete. - Dispatcher::removeService(this); -} - -Dispatcher& Dispatcher::instance() { - static Dispatcher instance; - return instance; -} - -size_t Dispatcher::serviceCount() const { - ReadLock lock(mutex_); - return services_.size(); -} - -Status Dispatcher::addService(InternalRunnableRef service) { - if (service->hasRun()) { - return Status(1, "Cannot schedule a service twice"); - } - - auto& self = instance(); - if (self.stopping_) { - // Cannot add a service while the dispatcher is stopping and no joins - // have been requested. - return Status(1, "Cannot add service, dispatcher is stopping"); - } - - auto thread = std::make_unique( - std::bind(&InternalRunnable::run, &*service)); - - DLOG(INFO) << "Adding new service: " << service->name() << " (" - << service.get() << ") to thread: " << thread->get_id() << " (" - << thread.get() << ") in process " << platformGetPid(); - { - WriteLock lock(self.mutex_); - - self.service_threads_.push_back(std::move(thread)); - self.services_.push_back(std::move(service)); - } - return Status::success(); -} - -void Dispatcher::removeService(const InternalRunnable* service) { - auto& self = Dispatcher::instance(); - WriteLock lock(self.mutex_); - - // Remove the service. - self.services_.erase( - std::remove_if(self.services_.begin(), - self.services_.end(), - [service](const InternalRunnableRef& target) { - return (target.get() == service); - }), - self.services_.end()); -} - -inline static void assureRun(const InternalRunnableRef& service) { - while (true) { - // Wait for each thread's entry point (start) meaning the thread context - // was allocated and (run) was called by std::thread started. - if (service->hasRun()) { - break; - } - // We only need to check if std::terminate is called very quickly after - // the std::thread is created. - sleepFor(20); - } -} - -void Dispatcher::joinServices() { - auto& self = instance(); - DLOG(INFO) << "Thread: " << std::this_thread::get_id() - << " requesting a join"; - - // Stops when service_threads_ is empty. Before stopping and releasing of the - // lock, empties services_ . - while (1) { - InternalThreadRef thread = nullptr; - { - WriteLock lock(self.mutex_); - if (!self.service_threads_.empty()) { - thread = std::move(self.service_threads_.back()); - self.service_threads_.pop_back(); - } else { - self.services_.clear(); - break; - } - } - if (thread != nullptr) { - thread->join(); - DLOG(INFO) << "Service thread: " << thread.get() << " has joined"; - } - } - - self.stopping_ = false; - DLOG(INFO) << "Services and threads have been cleared"; -} - -void Dispatcher::stopServices() { - auto& self = instance(); - self.stopping_ = true; - - WriteLock lock(self.mutex_); - DLOG(INFO) << "Thread: " << std::this_thread::get_id() - << " requesting a stop"; - for (const auto& service : self.services_) { - assureRun(service); - service->interrupt(); - DLOG(INFO) << "Service: " << service.get() << " has been interrupted"; - } -} -} // namespace osquery diff --git a/src/osquery/dispatcher/scheduler.cpp b/src/osquery/dispatcher/scheduler.cpp deleted file mode 100644 index 2902b75..0000000 --- a/src/osquery/dispatcher/scheduler.cpp +++ /dev/null @@ -1,200 +0,0 @@ -/** - * Copyright (c) 2014-present, Facebook, Inc. - * All rights reserved. - * - * This source code is licensed in accordance with the terms specified in - * the LICENSE file found in the root directory of this source tree. - */ - -#include -#include - -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#include "osquery/dispatcher/scheduler.h" -#include "osquery/sql/sqlite_util.h" - -namespace osquery { - -FLAG(uint64, schedule_timeout, 0, "Limit the schedule, 0 for no limit"); - -FLAG(uint64, - schedule_max_drift, - 60, - "Max time drift in seconds. Scheduler tries to compensate the drift until " - "the drift exceed this value. After it the drift will be reseted to zero " - "and the compensation process will start from the beginning. It is needed " - "to avoid the problem of endless compensation (which is CPU greedy) after " - "a long SIGSTOP/SIGCONT pause or something similar. Set it to zero to " - "switch off a drift compensation. Default: 60"); - -FLAG(uint64, - schedule_reload, - 300, - "Interval in seconds to reload database arenas"); - -FLAG(uint64, schedule_epoch, 0, "Epoch for scheduled queries"); - -HIDDEN_FLAG(bool, - schedule_reload_sql, - false, - "Reload the SQL implementation during schedule reload"); - -/// Used to bypass (optimize-out) the set-differential of query results. -DECLARE_bool(events_optimize); - -SQLInternal monitor(const std::string& name, const ScheduledQuery& query) { - // Snapshot the performance and times for the worker before running. - auto pid = std::to_string(PlatformProcess::getCurrentPid()); - auto r0 = SQL::selectFrom({"resident_size", "user_time", "system_time"}, - "processes", - "pid", - EQUALS, - pid); - auto t0 = getUnixTime(); - SQLInternal sql(query.query, true); - // Snapshot the performance after, and compare. - auto t1 = getUnixTime(); - auto r1 = SQL::selectFrom({"resident_size", "user_time", "system_time"}, - "processes", - "pid", - EQUALS, - pid); - return sql; -} - -Status launchQuery(const std::string& name, const ScheduledQuery& query) { - // Execute the scheduled query and create a named query object. - LOG(INFO) << "Executing scheduled query " << name << ": " << query.query; - - auto sql = monitor(name, query); - if (!sql.getStatus().ok()) { - LOG(ERROR) << "Error executing scheduled query " << name << ": " - << sql.getStatus().toString(); - return Status::failure("Error executing scheduled query"); - } - - // Fill in a host identifier fields based on configuration or availability. - std::string ident = getHostIdentifier(); - - // A query log item contains an optional set of differential results or - // a copy of the most-recent execution alongside some query metadata. - QueryLogItem item; - item.name = name; - item.identifier = ident; - item.time = osquery::getUnixTime(); - item.epoch = FLAGS_schedule_epoch; - item.calendar_time = osquery::getAsciiTime(); - - if (query.options.count("snapshot") && query.options.at("snapshot")) { - // This is a snapshot query, emit results with a differential or state. - item.snapshot_results = std::move(sql.rowsTyped()); - logSnapshotQuery(item); - return Status::success(); - } - - // Create a database-backed set of query results. - auto dbQuery = Query(name, query); - // Comparisons and stores must include escaped data. - sql.escapeResults(); - Status status; - DiffResults& diff_results = item.results; - // Add this execution's set of results to the database-tracked named query. - // We can then ask for a differential from the last time this named query - // was executed by exact matching each row. - if (!FLAGS_events_optimize || !sql.eventBased()) { - status = dbQuery.addNewResults( - std::move(sql.rowsTyped()), item.epoch, item.counter, diff_results); - if (!status.ok()) { - std::string line = "Error adding new results to database for query " + - name + ": " + status.what(); - LOG(ERROR) << line; - - // If the database is not available then the daemon cannot continue. - Initializer::requestShutdown(EXIT_CATASTROPHIC, line); - } - } else { - diff_results.added = std::move(sql.rowsTyped()); - } - - if (query.options.count("removed") && !query.options.at("removed")) { - diff_results.removed.clear(); - } - - if (diff_results.added.empty() && diff_results.removed.empty()) { - // No diff results or events to emit. - return status; - } - - VLOG(1) << "Found results for query: " << name; - - status = logQueryLogItem(item); - if (!status.ok()) { - // If log directory is not available, then the daemon shouldn't continue. - std::string error = "Error logging the results of query: " + name + ": " + - status.toString(); - LOG(ERROR) << error; - Initializer::requestShutdown(EXIT_CATASTROPHIC, error); - } - return status; -} - -void SchedulerRunner::start() { - // Start the counter at the second. - auto i = osquery::getUnixTime(); - for (; (timeout_ == 0) || (i <= timeout_); ++i) { - auto start_time_point = std::chrono::steady_clock::now(); - if (FLAGS_schedule_reload > 0 && (i % FLAGS_schedule_reload) == 0) { - if (FLAGS_schedule_reload_sql) { - SQLiteDBManager::resetPrimary(); - } - resetDatabase(); - } - - // GLog is not re-entrant, so logs must be flushed in a dedicated thread. - if ((i % 3) == 0) { - relayStatusLogs(true); - } - auto loop_step_duration = - std::chrono::duration_cast( - std::chrono::steady_clock::now() - start_time_point); - if (loop_step_duration + time_drift_ < interval_) { - pause(std::chrono::milliseconds(interval_ - loop_step_duration - - time_drift_)); - time_drift_ = std::chrono::milliseconds::zero(); - } else { - time_drift_ += loop_step_duration - interval_; - if (time_drift_ > max_time_drift_) { - // giving up - time_drift_ = std::chrono::milliseconds::zero(); - } - } - if (interrupted()) { - break; - } - } -} - -std::chrono::milliseconds SchedulerRunner::getCurrentTimeDrift() const - noexcept { - return time_drift_; -} - -void startScheduler() { - startScheduler(static_cast(FLAGS_schedule_timeout), 1); -} - -void startScheduler(unsigned long int timeout, size_t interval) { - Dispatcher::addService(std::make_shared( - timeout, interval, std::chrono::seconds{FLAGS_schedule_max_drift})); -} -} // namespace osquery diff --git a/src/osquery/dispatcher/scheduler.h b/src/osquery/dispatcher/scheduler.h deleted file mode 100644 index 045bc5c..0000000 --- a/src/osquery/dispatcher/scheduler.h +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Copyright (c) 2014-present, Facebook, Inc. - * All rights reserved. - * - * This source code is licensed in accordance with the terms specified in - * the LICENSE file found in the root directory of this source tree. - */ - -#pragma once - -#include -#include - -#include - -#include "osquery/sql/sqlite_util.h" - -namespace osquery { - -/// A Dispatcher service thread that watches an ExtensionManagerHandler. -class SchedulerRunner : public InternalRunnable { - public: - SchedulerRunner( - unsigned long int timeout, - size_t interval, - std::chrono::milliseconds max_time_drift = std::chrono::seconds::zero()) - : InternalRunnable("SchedulerRunner"), - interval_{std::chrono::seconds{interval}}, - timeout_(timeout), - time_drift_{std::chrono::milliseconds::zero()}, - max_time_drift_{max_time_drift} {} - - public: - /// The Dispatcher thread entry point. - void start() override; - - /// The Dispatcher interrupt point. - void stop() override {} - - /// Accumulated for some time time drift to compensate. - std::chrono::milliseconds getCurrentTimeDrift() const noexcept; - - private: - /// Interval in seconds between schedule steps. - const std::chrono::milliseconds interval_; - - /// Maximum number of steps. - const unsigned long int timeout_; - - /// Accumulated for some time time drift to compensate. - /// It will be either reduced during compensation process or - /// after exceding the limit @see max_time_drift_ - std::chrono::milliseconds time_drift_; - - const std::chrono::milliseconds max_time_drift_; -}; - -SQLInternal monitor(const std::string& name, const ScheduledQuery& query); - -/// Start querying according to the config's schedule -void startScheduler(); - -/// Helper scheduler start with variable settings for testing. -void startScheduler(unsigned long int timeout, size_t interval); -} diff --git a/src/osquery/dispatcher/tests/dispatcher.cpp b/src/osquery/dispatcher/tests/dispatcher.cpp deleted file mode 100644 index 1a62a2d..0000000 --- a/src/osquery/dispatcher/tests/dispatcher.cpp +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Copyright (c) 2014-present, Facebook, Inc. - * All rights reserved. - * - * This source code is licensed in accordance with the terms specified in - * the LICENSE file found in the root directory of this source tree. - */ - -#include - -#include - -#include -#include - -namespace osquery { - -class DispatcherTests : public testing::Test { - void TearDown() override { - Dispatcher::instance().resetStopping(); - } -}; - -TEST_F(DispatcherTests, test_singleton) { - auto& one = Dispatcher::instance(); - auto& two = Dispatcher::instance(); - EXPECT_EQ(&one, &two); -} - -class InternalTestableRunnable : public InternalRunnable { - public: - InternalTestableRunnable(const std::string& name) : InternalRunnable(name) {} - - bool interrupted() override { - // A small conditional to force-skip an interruption check, used in testing. - if (!checked_) { - checked_ = true; - return false; - } else { - return InternalRunnable::interrupted(); - } - } - - private: - /// Testing only, track the interruptible check for interruption. - bool checked_{false}; -}; - -class TestRunnable : public InternalTestableRunnable { - public: - explicit TestRunnable() : InternalTestableRunnable("TestRunnable") {} - - virtual void start() override { - ++i; - } - - void reset() { - i = 0; - } - - size_t count() { - return i; - } - - private: - static std::atomic i; -}; - -std::atomic TestRunnable::i{0}; - -TEST_F(DispatcherTests, test_service_count) { - auto runnable = std::make_shared(); - - auto service_count = Dispatcher::instance().serviceCount(); - // The service exits after incrementing. - auto s = Dispatcher::addService(runnable); - EXPECT_TRUE(s); - - // Wait for the service to stop. - Dispatcher::joinServices(); - - // Make sure the service is removed. - EXPECT_EQ(service_count, Dispatcher::instance().serviceCount()); -} - -TEST_F(DispatcherTests, test_run) { - auto runnable = std::make_shared(); - runnable->reset(); - - // The service exits after incrementing. - Dispatcher::addService(runnable); - Dispatcher::joinServices(); - EXPECT_EQ(1U, runnable->count()); - EXPECT_TRUE(runnable->hasRun()); - - // This runnable cannot be executed again. - auto s = Dispatcher::addService(runnable); - EXPECT_FALSE(s); - - Dispatcher::joinServices(); - EXPECT_EQ(1U, runnable->count()); -} - -TEST_F(DispatcherTests, test_independent_run) { - // Nothing stops two instances of the same service from running. - auto r1 = std::make_shared(); - auto r2 = std::make_shared(); - r1->reset(); - - Dispatcher::addService(r1); - Dispatcher::addService(r2); - Dispatcher::joinServices(); - - EXPECT_EQ(2U, r1->count()); -} - -class BlockingTestRunnable : public InternalTestableRunnable { - public: - explicit BlockingTestRunnable() - : InternalTestableRunnable("BlockingTestRunnable") {} - - virtual void start() override { - // Wow that's a long sleep! - pause(std::chrono::seconds(100)); - } -}; - -TEST_F(DispatcherTests, test_interruption) { - auto r1 = std::make_shared(); - Dispatcher::addService(r1); - - // This service would normally wait for 100 seconds. - r1->interrupt(); - - Dispatcher::joinServices(); - EXPECT_TRUE(r1->hasRun()); -} - -TEST_F(DispatcherTests, test_stop_dispatcher) { - Dispatcher::stopServices(); - - auto r1 = std::make_shared(); - auto s = Dispatcher::addService(r1); - EXPECT_FALSE(s); -} -} diff --git a/src/osquery/events/events.cpp b/src/osquery/events/events.cpp index ace461e..149521e 100644 --- a/src/osquery/events/events.cpp +++ b/src/osquery/events/events.cpp @@ -658,7 +658,6 @@ Status EventFactory::run(const std::string& type_id) { } else if (publisher->hasStarted()) { return Status(1, "Cannot restart an event publisher"); } - setThreadName(publisher->name()); VLOG(1) << "Starting event publisher run loop: " + type_id; publisher->hasStarted(true); @@ -670,10 +669,6 @@ Status EventFactory::run(const std::string& type_id) { break; } publisher->restart_count_++; - // This is a 'default' cool-off implemented in InterruptableRunnable. - // If a publisher fails to perform some sort of interruption point, this - // prevents the thread from thrashing through exiting checks. - publisher->pause(std::chrono::milliseconds(200)); } if (!status.ok()) { // The runloop status is not reflective of the event type's. @@ -881,8 +876,6 @@ Status EventFactory::deregisterEventPublisher(const std::string& type_id) { // If the run loop did run the tear down and erase will happen in the // event thread wrapper when isEnding is next checked. ef.event_pubs_.erase(type_id); - } else { - publisher->stop(); } } return Status::success(); diff --git a/src/osquery/include/osquery/dispatcher.h b/src/osquery/include/osquery/dispatcher.h deleted file mode 100644 index dc29a81..0000000 --- a/src/osquery/include/osquery/dispatcher.h +++ /dev/null @@ -1,197 +0,0 @@ -/** - * Copyright (c) 2014-present, Facebook, Inc. - * All rights reserved. - * - * This source code is licensed in accordance with the terms specified in - * the LICENSE file found in the root directory of this source tree. - */ - -#pragma once - -#include -#include -#include -#include -#include -#include - -#include - -#include - -#include -#include - -namespace osquery { - -class Status; -class Dispatcher; - - -class InterruptableRunnable { - public: - virtual ~InterruptableRunnable() = default; - - /** - * @brief The std::thread's interruption point. - */ - virtual void interrupt() final; - - /// Returns the runner name - std::string name() const { - return runnable_name_; - } - - protected: - /// Allow the runnable to check interruption. - virtual bool interrupted(); - - /// Require the runnable thread to define a stop/interrupt point. - virtual void stop() = 0; - - /// Put the runnable into an interruptible sleep. - void pause(std::chrono::milliseconds milli); - - /// Name of the InterruptableRunnable which is also the thread name - std::string runnable_name_; - - private: - /** - * @brief Used to wait for the interruption notification while sleeping - */ - std::mutex condition_lock; - - /// If a service includes a run loop it should check for interrupted. - std::atomic interrupted_{false}; - - /// Wait for notification or a pause expiration. - std::condition_variable condition_; - - private: - FRIEND_TEST(DispatcherTests, test_run); - FRIEND_TEST(DispatcherTests, test_independent_run); - FRIEND_TEST(DispatcherTests, test_interruption); - FRIEND_TEST(BufferedLogForwarderTests, test_async); -}; - -class InternalRunnable : private boost::noncopyable, - public InterruptableRunnable { - public: - InternalRunnable(const std::string& name) : run_(false) { - runnable_name_ = name; - } - virtual ~InternalRunnable() override = default; - - public: - /** - * @brief The std::thread entrypoint. - * - * This is used by the Dispatcher only. - */ - virtual void run() final; - - /** - * @brief Check if the thread's entrypoint (run) executed. - * - * It is possible for the Runnable to be allocated without the thread context. - * #hasRun makes a much better guess at the state of the thread. - * If it has run then stop must be called. - */ - bool hasRun() { - return run_; - } - - protected: - /// Require the runnable thread define an entrypoint. - virtual void start() = 0; - - /// The runnable thread may optionally define a stop/interrupt point. - void stop() override {} - - private: - std::atomic run_{false}; -}; - -/// An internal runnable used throughout osquery as dispatcher services. -using InternalRunnableRef = std::shared_ptr; -using InternalThreadRef = std::unique_ptr; - -/** - * @brief Singleton for queuing asynchronous tasks to be executed in parallel - * - * Dispatcher is a singleton which can be used to coordinate the parallel - * execution of asynchronous tasks across an application. Internally, - * Dispatcher is back by the Apache Thrift thread pool. - */ -class Dispatcher : private boost::noncopyable { - public: - /** - * @brief The primary way to access the Dispatcher factory facility. - * - * @code{.cpp} auto dispatch = osquery::Dispatcher::instance(); @endcode - * - * @return The osquery::Dispatcher instance. - */ - static Dispatcher& instance(); - - /// See `add`, but services are not limited to a thread poll size. - static Status addService(InternalRunnableRef service); - - /// See `join`, but applied to osquery services. - static void joinServices(); - - /// Destroy and stop all osquery service threads and service objects. - static void stopServices(); - - /// Return number of services. - size_t serviceCount() const; - - private: - /** - * @brief Default constructor. - * - * Since instances of Dispatcher should only be created via instance(), - * Dispatcher's constructor is private. - */ - Dispatcher() = default; - - private: - /// When a service ends, it will remove itself from the dispatcher. - static void removeService(const InternalRunnable* service); - - private: - /// For testing only, reset the stopping status for unittests. - void resetStopping() { - stopping_ = false; - } - - private: - /// The set of shared osquery service threads. - std::vector service_threads_; - - /// The set of shared osquery services. - std::vector services_; - - // Protection around service access. - mutable Mutex mutex_; - - - /** - * @brief Signal to the Dispatcher that no services should be created. - * - * The Dispatcher will not add services if it is shutting down until - * a join has completed of existing services. - * - * This prevents a very strange race where the dispatcher is signaled to - * abort or interrupt and serviced are sill waiting to be added. - * A future join will be requested AFTER all services were expected to have - * been interrupted. - */ - std::atomic stopping_{false}; - - private: - friend class InternalRunnable; - friend class ExtensionsTests; - friend class DispatcherTests; -}; -} // namespace osquery diff --git a/src/osquery/include/osquery/distributed.h b/src/osquery/include/osquery/distributed.h deleted file mode 100644 index 837bcdb..0000000 --- a/src/osquery/include/osquery/distributed.h +++ /dev/null @@ -1,280 +0,0 @@ -/** - * Copyright (c) 2014-present, Facebook, Inc. - * All rights reserved. - * - * This source code is licensed in accordance with the terms specified in - * the LICENSE file found in the root directory of this source tree. - */ - -#pragma once - -#include -#include - -#include -#include -#include - -namespace osquery { - -/** - * @brief Small struct containing the query and ID information for a - * distributed query - */ -struct DistributedQueryRequest { - public: - explicit DistributedQueryRequest() {} - - std::string query; - std::string id; -}; - -/** - * @brief Serialize a DistributedQueryRequest into a property tree - * - * @param r the DistributedQueryRequest to serialize - * @param doc the input JSON managed document - * @param obj the output rapidjson document [object] - * - * @return Status indicating the success or failure of the operation - */ -Status serializeDistributedQueryRequest(const DistributedQueryRequest& r, - JSON& doc, - rapidjson::Value& obj); - -/** - * @brief Serialize a DistributedQueryRequest object into a JSON string - * - * @param r the DistributedQueryRequest to serialize - * @param json the output JSON string - * - * @return Status indicating the success or failure of the operation - */ -Status serializeDistributedQueryRequestJSON(const DistributedQueryRequest& r, - std::string& json); - -/** - * @brief Deserialize a DistributedQueryRequest object from a property tree - * - * @param obj the input rapidjson value [object] - * @param r the output DistributedQueryRequest structure - * - * @return Status indicating the success or failure of the operation - */ -Status deserializeDistributedQueryRequest(const rapidjson::Value& obj, - DistributedQueryRequest& r); - -/** - * @brief Deserialize a DistributedQueryRequest object from a JSON string - * - * @param json the input JSON string - * @param r the output DistributedQueryRequest structure - * - * @return Status indicating the success or failure of the operation - */ -Status deserializeDistributedQueryRequestJSON(const std::string& json, - DistributedQueryRequest& r); - -/** - * @brief Small struct containing the results of a distributed query - */ -struct DistributedQueryResult { - public: - DistributedQueryResult() {} - DistributedQueryResult(const DistributedQueryRequest& req, - const QueryData& res, - const ColumnNames& cols, - const Status& s) - : request(req), results(res), columns(cols), status(s) {} - - DistributedQueryRequest request; - QueryData results; - ColumnNames columns; - Status status; -}; - -/** - * @brief Serialize a DistributedQueryResult into a property tree - * - * @param r the DistributedQueryResult to serialize - * @param doc the input JSON managed document - * @param obj the output rapidjson document [object] - * - * @return Status indicating the success or failure of the operation - */ -Status serializeDistributedQueryResult(const DistributedQueryResult& r, - JSON& doc, - rapidjson::Value& obj); -/** - * @brief Serialize a DistributedQueryResult object into a JSON string - * - * @param r the DistributedQueryResult to serialize - * @param json the output JSON string - * - * @return Status indicating the success or failure of the operation - */ -Status serializeDistributedQueryResultJSON(const DistributedQueryResult& r, - std::string& json); - -/** - * @brief Deserialize a DistributedQueryResult object from a property tree - * - * @param obj the input rapidjson document [object] - * @param r the output DistributedQueryResult structure - * - * @return Status indicating the success or failure of the operation - */ -Status deserializeDistributedQueryResult(const rapidjson::Value& obj, - DistributedQueryResult& r); - -/** - * @brief Deserialize a DistributedQueryResult object from a JSON string - * - * @param json the input JSON string - * @param r the output DistributedQueryResult structure - * - * @return Status indicating the success or failure of the operation - */ -Status deserializeDistributedQueryResultJSON(const std::string& json, - DistributedQueryResult& r); - -class DistributedPlugin : public Plugin { - public: - /** - * @brief Get the queries to be executed - * - * Consider the following example JSON which represents the expected format - * - * @code{.json} - * { - * "queries": { - * "id1": "select * from osquery_info", - * "id2": "select * from osquery_schedule" - * } - * } - * @endcode - * - * @param json is the string to populate the queries data structure with - * @return a Status indicating the success or failure of the operation - */ - virtual Status getQueries(std::string& json) = 0; - - /** - * @brief Write the results that were executed - * - * Consider the following JSON which represents the format that will be used: - * - * @code{.json} - * { - * "queries": { - * "id1": [ - * { - * "col1": "val1", - * "col2": "val2" - * }, - * { - * "col1": "val1", - * "col2": "val2" - * } - * ], - * "id2": [ - * { - * "col1": "val1", - * "col2": "val2" - * } - * ] - * } - * } - * @endcode - * - * @param json is the results data to write - * @return a Status indicating the success or failure of the operation - */ - virtual Status writeResults(const std::string& json) = 0; - - /// Main entrypoint for distributed plugin requests - Status call(const PluginRequest& request, PluginResponse& response) override; -}; - -/** - * @brief Class for managing the set of distributed queries to execute - * - * Consider the following workflow example, without any error handling - * - * @code{.cpp} - * auto dist = Distributed(); - * while (true) { - * dist.pullUpdates(); - * if (dist.getPendingQueryCount() > 0) { - * dist.runQueries(); - * } - * } - * @endcode - */ -class Distributed { - public: - /// Default constructor - Distributed() {} - - /// Retrieve queued queries from a remote server - Status pullUpdates(); - - /// Get the number of queries which are waiting to be executed - size_t getPendingQueryCount(); - - /// Get the number of results which are waiting to be flushed - size_t getCompletedCount(); - - /// Serialize result data into a JSON string and clear the results - Status serializeResults(std::string& json); - - /// Process and execute queued queries - Status runQueries(); - - // Getter for ID of currently executing request - static std::string getCurrentRequestId(); - - protected: - /** - * @brief Process several queries from a distributed plugin - * - * Given a response from a distributed plugin, parse the results and enqueue - * them in the internal state of the class - * - * @param work is the string from DistributedPlugin::getQueries - * @return a Status indicating the success or failure of the operation - */ - Status acceptWork(const std::string& work); - - /** - * @brief Pop a request object off of the queries_ member - * - * @return a DistributedQueryRequest object which needs to be executed - */ - DistributedQueryRequest popRequest(); - - /** - * @brief Queue a result to be batch sent to the server - * - * @param result is a DistributedQueryResult object to be sent to the server - */ - void addResult(const DistributedQueryResult& result); - - /** - * @brief Flush all of the collected results to the server - */ - Status flushCompleted(); - - // Setter for ID of currently executing request - static void setCurrentRequestId(const std::string& cReqId); - - std::vector results_; - - // ID of the currently executing query - static std::string currentRequestId_; - - private: - friend class DistributedTests; - FRIEND_TEST(DistributedTests, DISABLED_test_workflow); -}; -} diff --git a/src/osquery/include/osquery/events.h b/src/osquery/include/osquery/events.h index 67f3482..862a70b 100644 --- a/src/osquery/include/osquery/events.h +++ b/src/osquery/include/osquery/events.h @@ -20,9 +20,9 @@ #include #include -#include #include #include +#include namespace osquery { @@ -216,7 +216,6 @@ class Eventer { }; class EventPublisherPlugin : public Plugin, - public InterruptableRunnable, public Eventer { public: /** @@ -261,16 +260,6 @@ class EventPublisherPlugin : public Plugin, return Status(1, "No run loop required"); } - /** - * @brief Allow the EventFactory to interrupt the run loop. - * - * Assume the main thread may ask the run loop to stop at anytime. - * Before end is called the publisher's `isEnding` is set and the EventFactory - * run loop manager will exit the stepping loop and fall through to a call - * to tearDown followed by a removal of the publisher. - */ - void stop() override {} - /// This is a plugin type and must implement a call method. Status call(const PluginRequest& /*request*/, PluginResponse& /*response*/) override { diff --git a/src/osquery/main/main.cpp b/src/osquery/main/main.cpp index 3364629..3953e9d 100644 --- a/src/osquery/main/main.cpp +++ b/src/osquery/main/main.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include #include @@ -88,9 +87,6 @@ int profile(int argc, char* argv[]) { int startDaemon(Initializer& runner) { runner.start(); - // Begin the schedule runloop. - startScheduler(); - // osquery::events::init_syscall_tracing(); // Finally wait for a signal / interrupt to shutdown.