## osquery v4.0.0
ADD_SUBDIRECTORY(core)
ADD_SUBDIRECTORY(database)
-ADD_SUBDIRECTORY(dispatcher)
ADD_SUBDIRECTORY(events)
ADD_SUBDIRECTORY(filesystem)
ADD_SUBDIRECTORY(logger)
#include "osquery/utils/info/platform_type.h"
#include <osquery/core.h>
#include <osquery/data_logger.h>
-#include <osquery/dispatcher.h>
#include <osquery/events.h>
#include <osquery/filesystem/filesystem.h>
#include <osquery/flags.h>
// Restore the default signal handler.
std::signal(num, SIG_DFL);
-
- osquery::Dispatcher::stopServices();
}
}
}
}
- // Attempt to be the only place in code where a join is attempted.
- Dispatcher::joinServices();
// End any event type run loops.
EventFactory::end(true);
// it is NOT waiting for a shutdown.
// Exceptions include: tight request / wait in an exception handler or
// custom signal handling.
- Dispatcher::stopServices();
waitForShutdown();
}
}
+++ /dev/null
-/**
- * Copyright (c) 2014-present, Facebook, Inc.
- * All rights reserved.
- *
- * This source code is licensed in accordance with the terms specified in
- * the LICENSE file found in the root directory of this source tree.
- */
-
-#include <poll.h>
-#include <pwd.h>
-
-#include <gtest/gtest.h>
-
-#include <boost/filesystem/operations.hpp>
-#include <boost/filesystem/path.hpp>
-#include <boost/noncopyable.hpp>
-
-#include <osquery/core.h>
-#include <osquery/dispatcher.h>
-#include <osquery/filesystem/filesystem.h>
-#include <osquery/logger.h>
-#include <osquery/process/process.h>
-#include <osquery/system.h>
-
-#include "osquery/filesystem/fileops.h"
-#include "osquery/tests/test_util.h"
-
-namespace fs = boost::filesystem;
-
-namespace osquery {
-
-class PermissionsTests : public testing::Test {
- public:
- PermissionsTests()
- : perm_path_(fs::temp_directory_path() /
- fs::unique_path("lowperms.%%%%.%%%%")) {}
-
- void SetUp() override {
- fs::create_directories(perm_path_);
- }
-
- void TearDown() override {
- fs::remove_all(perm_path_);
- }
-
- protected:
- fs::path perm_path_;
-};
-
-TEST_F(PermissionsTests, test_explicit_drop) {
- {
- auto dropper = DropPrivileges::get();
- EXPECT_TRUE(dropper->dropTo(getuid(), getgid()));
- // We can attempt to drop to the previously-dropped privilege.
- EXPECT_TRUE(dropper->dropTo(getuid(), getgid()));
- }
-
- {
- auto dropper = DropPrivileges::get();
- // Make sure that an out-of-scope dropper "restore"
- EXPECT_FALSE(dropper->dropped_);
-
- uid_t expected_user = 0U;
- EXPECT_EQ(dropper->to_user_, expected_user);
-
- gid_t expected_group = 0U;
- EXPECT_EQ(dropper->to_group_, expected_group);
-
- // Checking if we are generally in an unprivileged mode.
- auto dropper2 = DropPrivileges::get();
- EXPECT_FALSE(dropper2->dropped());
- }
-}
-
-TEST_F(PermissionsTests, test_path_drop) {
- if (getuid() != 0) {
- LOG(WARNING) << "Not root, skipping (path) unprivileged testing";
- return;
- }
-
- // Attempt to drop to nobody based on ownership of paths.
- auto nobody = getpwnam("nobody");
- ASSERT_NE(nobody, nullptr);
-
- {
- int status =
- chown(perm_path_.string().c_str(), nobody->pw_uid, nobody->pw_gid);
- ASSERT_EQ(status, 0);
-
- auto dropper = DropPrivileges::get();
- EXPECT_TRUE(dropper->dropToParent((perm_path_ / "ro").string()));
- EXPECT_TRUE(dropper->dropped_);
- EXPECT_EQ(dropper->to_user_, nobody->pw_uid);
-
- // Dropping "up" to root should fail.
- // Even though this is possible and may make sense, it is confusing!
- EXPECT_FALSE(dropper->dropTo(0, 0));
-
- // Make sure the dropper worked!
- EXPECT_EQ(geteuid(), nobody->pw_uid);
- }
-
- // Now that the dropper is gone, the effective user/group should be restored.
- EXPECT_EQ(geteuid(), getuid());
- EXPECT_EQ(getegid(), getgid());
-}
-
-TEST_F(PermissionsTests, test_functional_drop) {
- if (getuid() != 0) {
- LOG(WARNING) << "Not root, skipping (explicit) unprivileged testing";
- return;
- }
-
- auto file_path = kTestWorkingDirectory + "permissions-file2";
-
- {
- writeTextFile(file_path, "data");
- ASSERT_TRUE(platformChmod(file_path, 0400));
- }
-
- {
- auto nobody = getpwnam("nobody");
- auto dropper = DropPrivileges::get();
- dropper->dropTo(nobody->pw_uid, nobody->pw_gid);
- PlatformFile fd(file_path, PF_OPEN_EXISTING | PF_READ);
- EXPECT_FALSE(fd.isValid());
- }
-
- osquery::removePath(file_path);
-}
-
-TEST_F(PermissionsTests, test_nobody_drop) {
- if (getuid() != 0) {
- LOG(WARNING) << "Not root, skipping (explicit) unprivileged testing";
- return;
- }
-
- // Attempt to drop to nobody.
- auto nobody = getpwnam("nobody");
- ASSERT_NE(nobody, nullptr);
-
- {
- auto dropper = DropPrivileges::get();
- EXPECT_TRUE(dropper->dropTo(nobody->pw_uid, nobody->pw_gid));
- EXPECT_EQ(geteuid(), nobody->pw_uid);
- }
-
- {
- auto dropper = DropPrivileges::get();
- EXPECT_TRUE(dropper->dropTo(std::to_string(nobody->pw_uid),
- std::to_string(nobody->pw_gid)));
- EXPECT_EQ(geteuid(), nobody->pw_uid);
- }
-
- // Now that the dropper is gone, the effective user/group should be restored.
- EXPECT_EQ(geteuid(), getuid());
- EXPECT_EQ(getegid(), getgid());
-}
-
-std::string kMultiThreadPermissionPath;
-
-class PermissionsRunnable : public InternalRunnable {
- public:
- PermissionsRunnable() : InternalRunnable("PermissionsRunnable") {}
- PermissionsRunnable(const std::string& name) : InternalRunnable(name) {}
-
- private:
- virtual void start() override {
- while (!interrupted()) {
- if (!writeTextFile(kMultiThreadPermissionPath, "test")) {
- throw std::runtime_error("Cannot write " + kMultiThreadPermissionPath);
- }
- ticks++;
- }
- }
-
- public:
- std::atomic<size_t> ticks{0};
-};
-
-class PermissionsPollRunnable : public PermissionsRunnable {
- public:
- PermissionsPollRunnable() : PermissionsRunnable("PermissionsPollRunnable") {}
-
- private:
- void start() override {
- PlatformFile file(kMultiThreadPermissionPath,
- PF_OPEN_EXISTING | PF_READ | PF_NONBLOCK);
- auto file_fd = file.nativeHandle();
-
- struct pollfd fds[1];
- while (!interrupted()) {
- std::memset(fds, 0, sizeof(fds));
- fds[0].fd = file_fd;
-
- result = poll(fds, 1, 1);
- if (result == 0) {
- ticks++;
- }
- }
- }
-
- public:
- std::atomic<int> result;
-};
-
-bool waitForTick(const std::shared_ptr<PermissionsRunnable>& runnable) {
- size_t now = runnable->ticks;
- size_t timeout = 1000;
- size_t delay = 0;
- while (delay < timeout) {
- sleepFor(20);
- if (runnable->ticks > now) {
- return true;
- }
- sleepFor(200);
- delay += 220;
- }
- return false;
-}
-
-TEST_F(PermissionsTests, test_multi_thread_permissions) {
- if (getuid() != 0) {
- LOG(WARNING) << "Not root, skipping multi-thread deprivilege testing";
- return;
- }
-
- ASSERT_EQ(0U, geteuid());
-
- // Set the multi-thread path, which both threads will write into.
- auto multi_thread_path = perm_path_ / "threadperms.txt";
- kMultiThreadPermissionPath = multi_thread_path.string();
-
- // This thread has super-user permissions.
- ASSERT_TRUE(writeTextFile(kMultiThreadPermissionPath, "test", 600));
-
- // Start our permissions thread.
- auto perms_thread = std::make_shared<PermissionsRunnable>();
- Dispatcher::addService(perms_thread);
-
- // Wait for the permissions thread to write once.
- EXPECT_TRUE(waitForTick(perms_thread));
-
- // Attempt to drop to nobody.
- auto nobody = getpwnam("nobody");
- EXPECT_NE(nobody, nullptr);
-
- {
- auto dropper = DropPrivileges::get();
- EXPECT_TRUE(dropper->dropTo(nobody->pw_uid, nobody->pw_gid));
- EXPECT_EQ(geteuid(), nobody->pw_uid);
-
- // Now we wait for the permissions thread to write once while this thread's
- // permissions are dropped.
- EXPECT_TRUE(waitForTick(perms_thread));
- }
-
- Dispatcher::stopServices();
- Dispatcher::joinServices();
-}
-
-TEST_F(PermissionsTests, test_multi_thread_poll) {
- if (getuid() != 0) {
- LOG(WARNING) << "Not root, skipping multi-thread deprivilege testing";
- return;
- }
-
- ASSERT_EQ(0U, geteuid());
-
- // Set the multi-thread path, which both threads will write into.
- auto multi_thread_path = perm_path_ / "threadperms.txt";
- kMultiThreadPermissionPath = multi_thread_path.string();
-
- // Start our permissions thread.
- auto pool_thread = std::make_shared<PermissionsPollRunnable>();
- Dispatcher::addService(pool_thread);
-
- // Wait for the permissions thread to write once.
- EXPECT_TRUE(waitForTick(pool_thread));
-
- auto nobody = getpwnam("nobody");
- EXPECT_NE(nobody, nullptr);
- {
- auto dropper = DropPrivileges::get();
- EXPECT_TRUE(dropper->dropTo(nobody->pw_uid, nobody->pw_gid));
- EXPECT_EQ(geteuid(), nobody->pw_uid);
-
- EXPECT_TRUE(waitForTick(pool_thread));
- }
-
- Dispatcher::stopServices();
- Dispatcher::joinServices();
-}
-}
+++ /dev/null
-# Copyright (c) 2019 Samsung Electronics Co., Ltd All Rights Reserved
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License
-
-ADD_OSQUERY_LIBRARY(osquery_dispatcher dispatcher.cpp
- scheduler.cpp)
-
-FILE(GLOB OSQUERY_DISPATCHER_TESTS "tests/*.cpp")
-ADD_OSQUERY_TEST(${OSQUERY_DISPATCHER_TESTS})
+++ /dev/null
-/**
- * Copyright (c) 2014-present, Facebook, Inc.
- * All rights reserved.
- *
- * This source code is licensed in accordance with the terms specified in
- * the LICENSE file found in the root directory of this source tree.
- */
-
-#include <osquery/dispatcher.h>
-#include <osquery/flags.h>
-#include <osquery/logger.h>
-#include <osquery/process/process.h>
-
-namespace osquery {
-
-/// The worker_threads define the default thread pool size.
-FLAG(int32, worker_threads, 4, "Number of work dispatch threads");
-
-
-void InterruptableRunnable::interrupt() {
- // Set the service as interrupted.
- if (!interrupted_.exchange(true)) {
- // Tear down the service's resources such that exiting the expected run
- // loop within ::start does not need to.
- stop();
- std::lock_guard<std::mutex> lock(condition_lock);
- // Cancel the run loop's pause request.
- condition_.notify_one();
- }
-}
-
-bool InterruptableRunnable::interrupted() {
- return interrupted_;
-}
-
-void InterruptableRunnable::pause(std::chrono::milliseconds milli) {
- std::unique_lock<std::mutex> lock(condition_lock);
- if (!interrupted_) {
- condition_.wait_for(lock, milli);
- }
-}
-
-void InternalRunnable::run() {
- run_ = true;
- setThreadName(name());
- start();
-
- // The service is complete.
- Dispatcher::removeService(this);
-}
-
-Dispatcher& Dispatcher::instance() {
- static Dispatcher instance;
- return instance;
-}
-
-size_t Dispatcher::serviceCount() const {
- ReadLock lock(mutex_);
- return services_.size();
-}
-
-Status Dispatcher::addService(InternalRunnableRef service) {
- if (service->hasRun()) {
- return Status(1, "Cannot schedule a service twice");
- }
-
- auto& self = instance();
- if (self.stopping_) {
- // Cannot add a service while the dispatcher is stopping and no joins
- // have been requested.
- return Status(1, "Cannot add service, dispatcher is stopping");
- }
-
- auto thread = std::make_unique<std::thread>(
- std::bind(&InternalRunnable::run, &*service));
-
- DLOG(INFO) << "Adding new service: " << service->name() << " ("
- << service.get() << ") to thread: " << thread->get_id() << " ("
- << thread.get() << ") in process " << platformGetPid();
- {
- WriteLock lock(self.mutex_);
-
- self.service_threads_.push_back(std::move(thread));
- self.services_.push_back(std::move(service));
- }
- return Status::success();
-}
-
-void Dispatcher::removeService(const InternalRunnable* service) {
- auto& self = Dispatcher::instance();
- WriteLock lock(self.mutex_);
-
- // Remove the service.
- self.services_.erase(
- std::remove_if(self.services_.begin(),
- self.services_.end(),
- [service](const InternalRunnableRef& target) {
- return (target.get() == service);
- }),
- self.services_.end());
-}
-
-inline static void assureRun(const InternalRunnableRef& service) {
- while (true) {
- // Wait for each thread's entry point (start) meaning the thread context
- // was allocated and (run) was called by std::thread started.
- if (service->hasRun()) {
- break;
- }
- // We only need to check if std::terminate is called very quickly after
- // the std::thread is created.
- sleepFor(20);
- }
-}
-
-void Dispatcher::joinServices() {
- auto& self = instance();
- DLOG(INFO) << "Thread: " << std::this_thread::get_id()
- << " requesting a join";
-
- // Stops when service_threads_ is empty. Before stopping and releasing of the
- // lock, empties services_ .
- while (1) {
- InternalThreadRef thread = nullptr;
- {
- WriteLock lock(self.mutex_);
- if (!self.service_threads_.empty()) {
- thread = std::move(self.service_threads_.back());
- self.service_threads_.pop_back();
- } else {
- self.services_.clear();
- break;
- }
- }
- if (thread != nullptr) {
- thread->join();
- DLOG(INFO) << "Service thread: " << thread.get() << " has joined";
- }
- }
-
- self.stopping_ = false;
- DLOG(INFO) << "Services and threads have been cleared";
-}
-
-void Dispatcher::stopServices() {
- auto& self = instance();
- self.stopping_ = true;
-
- WriteLock lock(self.mutex_);
- DLOG(INFO) << "Thread: " << std::this_thread::get_id()
- << " requesting a stop";
- for (const auto& service : self.services_) {
- assureRun(service);
- service->interrupt();
- DLOG(INFO) << "Service: " << service.get() << " has been interrupted";
- }
-}
-} // namespace osquery
+++ /dev/null
-/**
- * Copyright (c) 2014-present, Facebook, Inc.
- * All rights reserved.
- *
- * This source code is licensed in accordance with the terms specified in
- * the LICENSE file found in the root directory of this source tree.
- */
-
-#include <algorithm>
-#include <ctime>
-
-#include <boost/format.hpp>
-#include <boost/io/detail/quoted_manip.hpp>
-
-#include <osquery/core.h>
-#include <osquery/data_logger.h>
-#include <osquery/database.h>
-#include <osquery/flags.h>
-#include <osquery/process/process.h>
-#include <osquery/query.h>
-#include <osquery/utils/system/time.h>
-
-#include "osquery/dispatcher/scheduler.h"
-#include "osquery/sql/sqlite_util.h"
-
-namespace osquery {
-
-FLAG(uint64, schedule_timeout, 0, "Limit the schedule, 0 for no limit");
-
-FLAG(uint64,
- schedule_max_drift,
- 60,
- "Max time drift in seconds. Scheduler tries to compensate the drift until "
- "the drift exceed this value. After it the drift will be reseted to zero "
- "and the compensation process will start from the beginning. It is needed "
- "to avoid the problem of endless compensation (which is CPU greedy) after "
- "a long SIGSTOP/SIGCONT pause or something similar. Set it to zero to "
- "switch off a drift compensation. Default: 60");
-
-FLAG(uint64,
- schedule_reload,
- 300,
- "Interval in seconds to reload database arenas");
-
-FLAG(uint64, schedule_epoch, 0, "Epoch for scheduled queries");
-
-HIDDEN_FLAG(bool,
- schedule_reload_sql,
- false,
- "Reload the SQL implementation during schedule reload");
-
-/// Used to bypass (optimize-out) the set-differential of query results.
-DECLARE_bool(events_optimize);
-
-SQLInternal monitor(const std::string& name, const ScheduledQuery& query) {
- // Snapshot the performance and times for the worker before running.
- auto pid = std::to_string(PlatformProcess::getCurrentPid());
- auto r0 = SQL::selectFrom({"resident_size", "user_time", "system_time"},
- "processes",
- "pid",
- EQUALS,
- pid);
- auto t0 = getUnixTime();
- SQLInternal sql(query.query, true);
- // Snapshot the performance after, and compare.
- auto t1 = getUnixTime();
- auto r1 = SQL::selectFrom({"resident_size", "user_time", "system_time"},
- "processes",
- "pid",
- EQUALS,
- pid);
- return sql;
-}
-
-Status launchQuery(const std::string& name, const ScheduledQuery& query) {
- // Execute the scheduled query and create a named query object.
- LOG(INFO) << "Executing scheduled query " << name << ": " << query.query;
-
- auto sql = monitor(name, query);
- if (!sql.getStatus().ok()) {
- LOG(ERROR) << "Error executing scheduled query " << name << ": "
- << sql.getStatus().toString();
- return Status::failure("Error executing scheduled query");
- }
-
- // Fill in a host identifier fields based on configuration or availability.
- std::string ident = getHostIdentifier();
-
- // A query log item contains an optional set of differential results or
- // a copy of the most-recent execution alongside some query metadata.
- QueryLogItem item;
- item.name = name;
- item.identifier = ident;
- item.time = osquery::getUnixTime();
- item.epoch = FLAGS_schedule_epoch;
- item.calendar_time = osquery::getAsciiTime();
-
- if (query.options.count("snapshot") && query.options.at("snapshot")) {
- // This is a snapshot query, emit results with a differential or state.
- item.snapshot_results = std::move(sql.rowsTyped());
- logSnapshotQuery(item);
- return Status::success();
- }
-
- // Create a database-backed set of query results.
- auto dbQuery = Query(name, query);
- // Comparisons and stores must include escaped data.
- sql.escapeResults();
- Status status;
- DiffResults& diff_results = item.results;
- // Add this execution's set of results to the database-tracked named query.
- // We can then ask for a differential from the last time this named query
- // was executed by exact matching each row.
- if (!FLAGS_events_optimize || !sql.eventBased()) {
- status = dbQuery.addNewResults(
- std::move(sql.rowsTyped()), item.epoch, item.counter, diff_results);
- if (!status.ok()) {
- std::string line = "Error adding new results to database for query " +
- name + ": " + status.what();
- LOG(ERROR) << line;
-
- // If the database is not available then the daemon cannot continue.
- Initializer::requestShutdown(EXIT_CATASTROPHIC, line);
- }
- } else {
- diff_results.added = std::move(sql.rowsTyped());
- }
-
- if (query.options.count("removed") && !query.options.at("removed")) {
- diff_results.removed.clear();
- }
-
- if (diff_results.added.empty() && diff_results.removed.empty()) {
- // No diff results or events to emit.
- return status;
- }
-
- VLOG(1) << "Found results for query: " << name;
-
- status = logQueryLogItem(item);
- if (!status.ok()) {
- // If log directory is not available, then the daemon shouldn't continue.
- std::string error = "Error logging the results of query: " + name + ": " +
- status.toString();
- LOG(ERROR) << error;
- Initializer::requestShutdown(EXIT_CATASTROPHIC, error);
- }
- return status;
-}
-
-void SchedulerRunner::start() {
- // Start the counter at the second.
- auto i = osquery::getUnixTime();
- for (; (timeout_ == 0) || (i <= timeout_); ++i) {
- auto start_time_point = std::chrono::steady_clock::now();
- if (FLAGS_schedule_reload > 0 && (i % FLAGS_schedule_reload) == 0) {
- if (FLAGS_schedule_reload_sql) {
- SQLiteDBManager::resetPrimary();
- }
- resetDatabase();
- }
-
- // GLog is not re-entrant, so logs must be flushed in a dedicated thread.
- if ((i % 3) == 0) {
- relayStatusLogs(true);
- }
- auto loop_step_duration =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now() - start_time_point);
- if (loop_step_duration + time_drift_ < interval_) {
- pause(std::chrono::milliseconds(interval_ - loop_step_duration -
- time_drift_));
- time_drift_ = std::chrono::milliseconds::zero();
- } else {
- time_drift_ += loop_step_duration - interval_;
- if (time_drift_ > max_time_drift_) {
- // giving up
- time_drift_ = std::chrono::milliseconds::zero();
- }
- }
- if (interrupted()) {
- break;
- }
- }
-}
-
-std::chrono::milliseconds SchedulerRunner::getCurrentTimeDrift() const
- noexcept {
- return time_drift_;
-}
-
-void startScheduler() {
- startScheduler(static_cast<unsigned long int>(FLAGS_schedule_timeout), 1);
-}
-
-void startScheduler(unsigned long int timeout, size_t interval) {
- Dispatcher::addService(std::make_shared<SchedulerRunner>(
- timeout, interval, std::chrono::seconds{FLAGS_schedule_max_drift}));
-}
-} // namespace osquery
+++ /dev/null
-/**
- * Copyright (c) 2014-present, Facebook, Inc.
- * All rights reserved.
- *
- * This source code is licensed in accordance with the terms specified in
- * the LICENSE file found in the root directory of this source tree.
- */
-
-#pragma once
-
-#include <chrono>
-#include <map>
-
-#include <osquery/dispatcher.h>
-
-#include "osquery/sql/sqlite_util.h"
-
-namespace osquery {
-
-/// A Dispatcher service thread that watches an ExtensionManagerHandler.
-class SchedulerRunner : public InternalRunnable {
- public:
- SchedulerRunner(
- unsigned long int timeout,
- size_t interval,
- std::chrono::milliseconds max_time_drift = std::chrono::seconds::zero())
- : InternalRunnable("SchedulerRunner"),
- interval_{std::chrono::seconds{interval}},
- timeout_(timeout),
- time_drift_{std::chrono::milliseconds::zero()},
- max_time_drift_{max_time_drift} {}
-
- public:
- /// The Dispatcher thread entry point.
- void start() override;
-
- /// The Dispatcher interrupt point.
- void stop() override {}
-
- /// Accumulated for some time time drift to compensate.
- std::chrono::milliseconds getCurrentTimeDrift() const noexcept;
-
- private:
- /// Interval in seconds between schedule steps.
- const std::chrono::milliseconds interval_;
-
- /// Maximum number of steps.
- const unsigned long int timeout_;
-
- /// Accumulated for some time time drift to compensate.
- /// It will be either reduced during compensation process or
- /// after exceding the limit @see max_time_drift_
- std::chrono::milliseconds time_drift_;
-
- const std::chrono::milliseconds max_time_drift_;
-};
-
-SQLInternal monitor(const std::string& name, const ScheduledQuery& query);
-
-/// Start querying according to the config's schedule
-void startScheduler();
-
-/// Helper scheduler start with variable settings for testing.
-void startScheduler(unsigned long int timeout, size_t interval);
-}
+++ /dev/null
-/**
- * Copyright (c) 2014-present, Facebook, Inc.
- * All rights reserved.
- *
- * This source code is licensed in accordance with the terms specified in
- * the LICENSE file found in the root directory of this source tree.
- */
-
-#include <chrono>
-
-#include <gtest/gtest.h>
-
-#include <osquery/dispatcher.h>
-#include <osquery/utils/status/status.h>
-
-namespace osquery {
-
-class DispatcherTests : public testing::Test {
- void TearDown() override {
- Dispatcher::instance().resetStopping();
- }
-};
-
-TEST_F(DispatcherTests, test_singleton) {
- auto& one = Dispatcher::instance();
- auto& two = Dispatcher::instance();
- EXPECT_EQ(&one, &two);
-}
-
-class InternalTestableRunnable : public InternalRunnable {
- public:
- InternalTestableRunnable(const std::string& name) : InternalRunnable(name) {}
-
- bool interrupted() override {
- // A small conditional to force-skip an interruption check, used in testing.
- if (!checked_) {
- checked_ = true;
- return false;
- } else {
- return InternalRunnable::interrupted();
- }
- }
-
- private:
- /// Testing only, track the interruptible check for interruption.
- bool checked_{false};
-};
-
-class TestRunnable : public InternalTestableRunnable {
- public:
- explicit TestRunnable() : InternalTestableRunnable("TestRunnable") {}
-
- virtual void start() override {
- ++i;
- }
-
- void reset() {
- i = 0;
- }
-
- size_t count() {
- return i;
- }
-
- private:
- static std::atomic<size_t> i;
-};
-
-std::atomic<size_t> TestRunnable::i{0};
-
-TEST_F(DispatcherTests, test_service_count) {
- auto runnable = std::make_shared<TestRunnable>();
-
- auto service_count = Dispatcher::instance().serviceCount();
- // The service exits after incrementing.
- auto s = Dispatcher::addService(runnable);
- EXPECT_TRUE(s);
-
- // Wait for the service to stop.
- Dispatcher::joinServices();
-
- // Make sure the service is removed.
- EXPECT_EQ(service_count, Dispatcher::instance().serviceCount());
-}
-
-TEST_F(DispatcherTests, test_run) {
- auto runnable = std::make_shared<TestRunnable>();
- runnable->reset();
-
- // The service exits after incrementing.
- Dispatcher::addService(runnable);
- Dispatcher::joinServices();
- EXPECT_EQ(1U, runnable->count());
- EXPECT_TRUE(runnable->hasRun());
-
- // This runnable cannot be executed again.
- auto s = Dispatcher::addService(runnable);
- EXPECT_FALSE(s);
-
- Dispatcher::joinServices();
- EXPECT_EQ(1U, runnable->count());
-}
-
-TEST_F(DispatcherTests, test_independent_run) {
- // Nothing stops two instances of the same service from running.
- auto r1 = std::make_shared<TestRunnable>();
- auto r2 = std::make_shared<TestRunnable>();
- r1->reset();
-
- Dispatcher::addService(r1);
- Dispatcher::addService(r2);
- Dispatcher::joinServices();
-
- EXPECT_EQ(2U, r1->count());
-}
-
-class BlockingTestRunnable : public InternalTestableRunnable {
- public:
- explicit BlockingTestRunnable()
- : InternalTestableRunnable("BlockingTestRunnable") {}
-
- virtual void start() override {
- // Wow that's a long sleep!
- pause(std::chrono::seconds(100));
- }
-};
-
-TEST_F(DispatcherTests, test_interruption) {
- auto r1 = std::make_shared<BlockingTestRunnable>();
- Dispatcher::addService(r1);
-
- // This service would normally wait for 100 seconds.
- r1->interrupt();
-
- Dispatcher::joinServices();
- EXPECT_TRUE(r1->hasRun());
-}
-
-TEST_F(DispatcherTests, test_stop_dispatcher) {
- Dispatcher::stopServices();
-
- auto r1 = std::make_shared<TestRunnable>();
- auto s = Dispatcher::addService(r1);
- EXPECT_FALSE(s);
-}
-}
} else if (publisher->hasStarted()) {
return Status(1, "Cannot restart an event publisher");
}
- setThreadName(publisher->name());
VLOG(1) << "Starting event publisher run loop: " + type_id;
publisher->hasStarted(true);
break;
}
publisher->restart_count_++;
- // This is a 'default' cool-off implemented in InterruptableRunnable.
- // If a publisher fails to perform some sort of interruption point, this
- // prevents the thread from thrashing through exiting checks.
- publisher->pause(std::chrono::milliseconds(200));
}
if (!status.ok()) {
// The runloop status is not reflective of the event type's.
// If the run loop did run the tear down and erase will happen in the
// event thread wrapper when isEnding is next checked.
ef.event_pubs_.erase(type_id);
- } else {
- publisher->stop();
}
}
return Status::success();
+++ /dev/null
-/**
- * Copyright (c) 2014-present, Facebook, Inc.
- * All rights reserved.
- *
- * This source code is licensed in accordance with the terms specified in
- * the LICENSE file found in the root directory of this source tree.
- */
-
-#pragma once
-
-#include <atomic>
-#include <chrono>
-#include <condition_variable>
-#include <memory>
-#include <thread>
-#include <vector>
-
-#include <gtest/gtest_prod.h>
-
-#include <boost/noncopyable.hpp>
-
-#include <osquery/core.h>
-#include <osquery/utils/mutex.h>
-
-namespace osquery {
-
-class Status;
-class Dispatcher;
-
-
-class InterruptableRunnable {
- public:
- virtual ~InterruptableRunnable() = default;
-
- /**
- * @brief The std::thread's interruption point.
- */
- virtual void interrupt() final;
-
- /// Returns the runner name
- std::string name() const {
- return runnable_name_;
- }
-
- protected:
- /// Allow the runnable to check interruption.
- virtual bool interrupted();
-
- /// Require the runnable thread to define a stop/interrupt point.
- virtual void stop() = 0;
-
- /// Put the runnable into an interruptible sleep.
- void pause(std::chrono::milliseconds milli);
-
- /// Name of the InterruptableRunnable which is also the thread name
- std::string runnable_name_;
-
- private:
- /**
- * @brief Used to wait for the interruption notification while sleeping
- */
- std::mutex condition_lock;
-
- /// If a service includes a run loop it should check for interrupted.
- std::atomic<bool> interrupted_{false};
-
- /// Wait for notification or a pause expiration.
- std::condition_variable condition_;
-
- private:
- FRIEND_TEST(DispatcherTests, test_run);
- FRIEND_TEST(DispatcherTests, test_independent_run);
- FRIEND_TEST(DispatcherTests, test_interruption);
- FRIEND_TEST(BufferedLogForwarderTests, test_async);
-};
-
-class InternalRunnable : private boost::noncopyable,
- public InterruptableRunnable {
- public:
- InternalRunnable(const std::string& name) : run_(false) {
- runnable_name_ = name;
- }
- virtual ~InternalRunnable() override = default;
-
- public:
- /**
- * @brief The std::thread entrypoint.
- *
- * This is used by the Dispatcher only.
- */
- virtual void run() final;
-
- /**
- * @brief Check if the thread's entrypoint (run) executed.
- *
- * It is possible for the Runnable to be allocated without the thread context.
- * #hasRun makes a much better guess at the state of the thread.
- * If it has run then stop must be called.
- */
- bool hasRun() {
- return run_;
- }
-
- protected:
- /// Require the runnable thread define an entrypoint.
- virtual void start() = 0;
-
- /// The runnable thread may optionally define a stop/interrupt point.
- void stop() override {}
-
- private:
- std::atomic<bool> run_{false};
-};
-
-/// An internal runnable used throughout osquery as dispatcher services.
-using InternalRunnableRef = std::shared_ptr<InternalRunnable>;
-using InternalThreadRef = std::unique_ptr<std::thread>;
-
-/**
- * @brief Singleton for queuing asynchronous tasks to be executed in parallel
- *
- * Dispatcher is a singleton which can be used to coordinate the parallel
- * execution of asynchronous tasks across an application. Internally,
- * Dispatcher is back by the Apache Thrift thread pool.
- */
-class Dispatcher : private boost::noncopyable {
- public:
- /**
- * @brief The primary way to access the Dispatcher factory facility.
- *
- * @code{.cpp} auto dispatch = osquery::Dispatcher::instance(); @endcode
- *
- * @return The osquery::Dispatcher instance.
- */
- static Dispatcher& instance();
-
- /// See `add`, but services are not limited to a thread poll size.
- static Status addService(InternalRunnableRef service);
-
- /// See `join`, but applied to osquery services.
- static void joinServices();
-
- /// Destroy and stop all osquery service threads and service objects.
- static void stopServices();
-
- /// Return number of services.
- size_t serviceCount() const;
-
- private:
- /**
- * @brief Default constructor.
- *
- * Since instances of Dispatcher should only be created via instance(),
- * Dispatcher's constructor is private.
- */
- Dispatcher() = default;
-
- private:
- /// When a service ends, it will remove itself from the dispatcher.
- static void removeService(const InternalRunnable* service);
-
- private:
- /// For testing only, reset the stopping status for unittests.
- void resetStopping() {
- stopping_ = false;
- }
-
- private:
- /// The set of shared osquery service threads.
- std::vector<InternalThreadRef> service_threads_;
-
- /// The set of shared osquery services.
- std::vector<InternalRunnableRef> services_;
-
- // Protection around service access.
- mutable Mutex mutex_;
-
-
- /**
- * @brief Signal to the Dispatcher that no services should be created.
- *
- * The Dispatcher will not add services if it is shutting down until
- * a join has completed of existing services.
- *
- * This prevents a very strange race where the dispatcher is signaled to
- * abort or interrupt and serviced are sill waiting to be added.
- * A future join will be requested AFTER all services were expected to have
- * been interrupted.
- */
- std::atomic<bool> stopping_{false};
-
- private:
- friend class InternalRunnable;
- friend class ExtensionsTests;
- friend class DispatcherTests;
-};
-} // namespace osquery
+++ /dev/null
-/**
- * Copyright (c) 2014-present, Facebook, Inc.
- * All rights reserved.
- *
- * This source code is licensed in accordance with the terms specified in
- * the LICENSE file found in the root directory of this source tree.
- */
-
-#pragma once
-
-#include <string>
-#include <vector>
-
-#include <osquery/plugins/plugin.h>
-#include <osquery/query.h>
-#include <osquery/utils/status/status.h>
-
-namespace osquery {
-
-/**
- * @brief Small struct containing the query and ID information for a
- * distributed query
- */
-struct DistributedQueryRequest {
- public:
- explicit DistributedQueryRequest() {}
-
- std::string query;
- std::string id;
-};
-
-/**
- * @brief Serialize a DistributedQueryRequest into a property tree
- *
- * @param r the DistributedQueryRequest to serialize
- * @param doc the input JSON managed document
- * @param obj the output rapidjson document [object]
- *
- * @return Status indicating the success or failure of the operation
- */
-Status serializeDistributedQueryRequest(const DistributedQueryRequest& r,
- JSON& doc,
- rapidjson::Value& obj);
-
-/**
- * @brief Serialize a DistributedQueryRequest object into a JSON string
- *
- * @param r the DistributedQueryRequest to serialize
- * @param json the output JSON string
- *
- * @return Status indicating the success or failure of the operation
- */
-Status serializeDistributedQueryRequestJSON(const DistributedQueryRequest& r,
- std::string& json);
-
-/**
- * @brief Deserialize a DistributedQueryRequest object from a property tree
- *
- * @param obj the input rapidjson value [object]
- * @param r the output DistributedQueryRequest structure
- *
- * @return Status indicating the success or failure of the operation
- */
-Status deserializeDistributedQueryRequest(const rapidjson::Value& obj,
- DistributedQueryRequest& r);
-
-/**
- * @brief Deserialize a DistributedQueryRequest object from a JSON string
- *
- * @param json the input JSON string
- * @param r the output DistributedQueryRequest structure
- *
- * @return Status indicating the success or failure of the operation
- */
-Status deserializeDistributedQueryRequestJSON(const std::string& json,
- DistributedQueryRequest& r);
-
-/**
- * @brief Small struct containing the results of a distributed query
- */
-struct DistributedQueryResult {
- public:
- DistributedQueryResult() {}
- DistributedQueryResult(const DistributedQueryRequest& req,
- const QueryData& res,
- const ColumnNames& cols,
- const Status& s)
- : request(req), results(res), columns(cols), status(s) {}
-
- DistributedQueryRequest request;
- QueryData results;
- ColumnNames columns;
- Status status;
-};
-
-/**
- * @brief Serialize a DistributedQueryResult into a property tree
- *
- * @param r the DistributedQueryResult to serialize
- * @param doc the input JSON managed document
- * @param obj the output rapidjson document [object]
- *
- * @return Status indicating the success or failure of the operation
- */
-Status serializeDistributedQueryResult(const DistributedQueryResult& r,
- JSON& doc,
- rapidjson::Value& obj);
-/**
- * @brief Serialize a DistributedQueryResult object into a JSON string
- *
- * @param r the DistributedQueryResult to serialize
- * @param json the output JSON string
- *
- * @return Status indicating the success or failure of the operation
- */
-Status serializeDistributedQueryResultJSON(const DistributedQueryResult& r,
- std::string& json);
-
-/**
- * @brief Deserialize a DistributedQueryResult object from a property tree
- *
- * @param obj the input rapidjson document [object]
- * @param r the output DistributedQueryResult structure
- *
- * @return Status indicating the success or failure of the operation
- */
-Status deserializeDistributedQueryResult(const rapidjson::Value& obj,
- DistributedQueryResult& r);
-
-/**
- * @brief Deserialize a DistributedQueryResult object from a JSON string
- *
- * @param json the input JSON string
- * @param r the output DistributedQueryResult structure
- *
- * @return Status indicating the success or failure of the operation
- */
-Status deserializeDistributedQueryResultJSON(const std::string& json,
- DistributedQueryResult& r);
-
-class DistributedPlugin : public Plugin {
- public:
- /**
- * @brief Get the queries to be executed
- *
- * Consider the following example JSON which represents the expected format
- *
- * @code{.json}
- * {
- * "queries": {
- * "id1": "select * from osquery_info",
- * "id2": "select * from osquery_schedule"
- * }
- * }
- * @endcode
- *
- * @param json is the string to populate the queries data structure with
- * @return a Status indicating the success or failure of the operation
- */
- virtual Status getQueries(std::string& json) = 0;
-
- /**
- * @brief Write the results that were executed
- *
- * Consider the following JSON which represents the format that will be used:
- *
- * @code{.json}
- * {
- * "queries": {
- * "id1": [
- * {
- * "col1": "val1",
- * "col2": "val2"
- * },
- * {
- * "col1": "val1",
- * "col2": "val2"
- * }
- * ],
- * "id2": [
- * {
- * "col1": "val1",
- * "col2": "val2"
- * }
- * ]
- * }
- * }
- * @endcode
- *
- * @param json is the results data to write
- * @return a Status indicating the success or failure of the operation
- */
- virtual Status writeResults(const std::string& json) = 0;
-
- /// Main entrypoint for distributed plugin requests
- Status call(const PluginRequest& request, PluginResponse& response) override;
-};
-
-/**
- * @brief Class for managing the set of distributed queries to execute
- *
- * Consider the following workflow example, without any error handling
- *
- * @code{.cpp}
- * auto dist = Distributed();
- * while (true) {
- * dist.pullUpdates();
- * if (dist.getPendingQueryCount() > 0) {
- * dist.runQueries();
- * }
- * }
- * @endcode
- */
-class Distributed {
- public:
- /// Default constructor
- Distributed() {}
-
- /// Retrieve queued queries from a remote server
- Status pullUpdates();
-
- /// Get the number of queries which are waiting to be executed
- size_t getPendingQueryCount();
-
- /// Get the number of results which are waiting to be flushed
- size_t getCompletedCount();
-
- /// Serialize result data into a JSON string and clear the results
- Status serializeResults(std::string& json);
-
- /// Process and execute queued queries
- Status runQueries();
-
- // Getter for ID of currently executing request
- static std::string getCurrentRequestId();
-
- protected:
- /**
- * @brief Process several queries from a distributed plugin
- *
- * Given a response from a distributed plugin, parse the results and enqueue
- * them in the internal state of the class
- *
- * @param work is the string from DistributedPlugin::getQueries
- * @return a Status indicating the success or failure of the operation
- */
- Status acceptWork(const std::string& work);
-
- /**
- * @brief Pop a request object off of the queries_ member
- *
- * @return a DistributedQueryRequest object which needs to be executed
- */
- DistributedQueryRequest popRequest();
-
- /**
- * @brief Queue a result to be batch sent to the server
- *
- * @param result is a DistributedQueryResult object to be sent to the server
- */
- void addResult(const DistributedQueryResult& result);
-
- /**
- * @brief Flush all of the collected results to the server
- */
- Status flushCompleted();
-
- // Setter for ID of currently executing request
- static void setCurrentRequestId(const std::string& cReqId);
-
- std::vector<DistributedQueryResult> results_;
-
- // ID of the currently executing query
- static std::string currentRequestId_;
-
- private:
- friend class DistributedTests;
- FRIEND_TEST(DistributedTests, DISABLED_test_workflow);
-};
-}
#include <gtest/gtest_prod.h>
#include <osquery/core.h>
-#include <osquery/dispatcher.h>
#include <osquery/tables.h>
#include <osquery/utils/status/status.h>
+#include <osquery/utils/mutex.h>
namespace osquery {
};
class EventPublisherPlugin : public Plugin,
- public InterruptableRunnable,
public Eventer {
public:
/**
return Status(1, "No run loop required");
}
- /**
- * @brief Allow the EventFactory to interrupt the run loop.
- *
- * Assume the main thread may ask the run loop to stop at anytime.
- * Before end is called the publisher's `isEnding` is set and the EventFactory
- * run loop manager will exit the stepping loop and fall through to a call
- * to tearDown followed by a removal of the publisher.
- */
- void stop() override {}
-
/// This is a plugin type and must implement a call method.
Status call(const PluginRequest& /*request*/,
PluginResponse& /*response*/) override {
#include <osquery/core.h>
#include <osquery/database.h>
#include <osquery/devtools/devtools.h>
-#include <osquery/dispatcher/scheduler.h>
#include <osquery/filesystem/fileops.h>
#include <osquery/flags.h>
#include <osquery/logger.h>
int startDaemon(Initializer& runner) {
runner.start();
- // Begin the schedule runloop.
- startScheduler();
-
// osquery::events::init_syscall_tracing();
// Finally wait for a signal / interrupt to shutdown.