osquery: Remove dispatcher
authorSangwan Kwon <sangwan.kwon@samsung.com>
Tue, 14 Jan 2020 04:16:09 +0000 (13:16 +0900)
committerSangwan Kwon <sangwan.kwon@samsung.com>
Tue, 14 Jan 2020 04:16:09 +0000 (13:16 +0900)
Signed-off-by: Sangwan Kwon <sangwan.kwon@samsung.com>
13 files changed:
src/osquery/CMakeLists.txt
src/osquery/core/init.cpp
src/osquery/core/tests/posix/permissions_tests.cpp [deleted file]
src/osquery/dispatcher/CMakeLists.txt [deleted file]
src/osquery/dispatcher/dispatcher.cpp [deleted file]
src/osquery/dispatcher/scheduler.cpp [deleted file]
src/osquery/dispatcher/scheduler.h [deleted file]
src/osquery/dispatcher/tests/dispatcher.cpp [deleted file]
src/osquery/events/events.cpp
src/osquery/include/osquery/dispatcher.h [deleted file]
src/osquery/include/osquery/distributed.h [deleted file]
src/osquery/include/osquery/events.h
src/osquery/main/main.cpp

index 850c3cf6e6baaa320dd4bae8574f6940b8448d37..c0e05fe98ea907faaef530fdfd6808eebc631b33 100644 (file)
@@ -45,7 +45,6 @@ ENDIF(DEFINED GBS_BUILD)
 ## osquery v4.0.0
 ADD_SUBDIRECTORY(core)
 ADD_SUBDIRECTORY(database)
-ADD_SUBDIRECTORY(dispatcher)
 ADD_SUBDIRECTORY(events)
 ADD_SUBDIRECTORY(filesystem)
 ADD_SUBDIRECTORY(logger)
index 53bdf2498e3c410216f60031700e3cb8517ee6c1..be40f243d6e1ee50018efae257a096b928469a4b 100644 (file)
@@ -35,7 +35,6 @@
 #include "osquery/utils/info/platform_type.h"
 #include <osquery/core.h>
 #include <osquery/data_logger.h>
-#include <osquery/dispatcher.h>
 #include <osquery/events.h>
 #include <osquery/filesystem/filesystem.h>
 #include <osquery/flags.h>
@@ -123,8 +122,6 @@ void signalHandler(int num) {
 
       // Restore the default signal handler.
       std::signal(num, SIG_DFL);
-
-      osquery::Dispatcher::stopServices();
     }
   }
 
@@ -510,8 +507,6 @@ void Initializer::waitForShutdown() {
     }
   }
 
-  // Attempt to be the only place in code where a join is attempted.
-  Dispatcher::joinServices();
   // End any event type run loops.
   EventFactory::end(true);
 
@@ -537,7 +532,6 @@ void Initializer::requestShutdown(int retcode) {
     // it is NOT waiting for a shutdown.
     // Exceptions include: tight request / wait in an exception handler or
     // custom signal handling.
-    Dispatcher::stopServices();
     waitForShutdown();
   }
 }
diff --git a/src/osquery/core/tests/posix/permissions_tests.cpp b/src/osquery/core/tests/posix/permissions_tests.cpp
deleted file mode 100644 (file)
index 6bc3b7b..0000000
+++ /dev/null
@@ -1,294 +0,0 @@
-/**
- *  Copyright (c) 2014-present, Facebook, Inc.
- *  All rights reserved.
- *
- *  This source code is licensed in accordance with the terms specified in
- *  the LICENSE file found in the root directory of this source tree.
- */
-
-#include <poll.h>
-#include <pwd.h>
-
-#include <gtest/gtest.h>
-
-#include <boost/filesystem/operations.hpp>
-#include <boost/filesystem/path.hpp>
-#include <boost/noncopyable.hpp>
-
-#include <osquery/core.h>
-#include <osquery/dispatcher.h>
-#include <osquery/filesystem/filesystem.h>
-#include <osquery/logger.h>
-#include <osquery/process/process.h>
-#include <osquery/system.h>
-
-#include "osquery/filesystem/fileops.h"
-#include "osquery/tests/test_util.h"
-
-namespace fs = boost::filesystem;
-
-namespace osquery {
-
-class PermissionsTests : public testing::Test {
- public:
-  PermissionsTests()
-      : perm_path_(fs::temp_directory_path() /
-                   fs::unique_path("lowperms.%%%%.%%%%")) {}
-
-  void SetUp() override {
-    fs::create_directories(perm_path_);
-  }
-
-  void TearDown() override {
-    fs::remove_all(perm_path_);
-  }
-
- protected:
-  fs::path perm_path_;
-};
-
-TEST_F(PermissionsTests, test_explicit_drop) {
-  {
-    auto dropper = DropPrivileges::get();
-    EXPECT_TRUE(dropper->dropTo(getuid(), getgid()));
-    // We can attempt to drop to the previously-dropped privilege.
-    EXPECT_TRUE(dropper->dropTo(getuid(), getgid()));
-  }
-
-  {
-    auto dropper = DropPrivileges::get();
-    // Make sure that an out-of-scope dropper "restore"
-    EXPECT_FALSE(dropper->dropped_);
-
-    uid_t expected_user = 0U;
-    EXPECT_EQ(dropper->to_user_, expected_user);
-
-    gid_t expected_group = 0U;
-    EXPECT_EQ(dropper->to_group_, expected_group);
-
-    // Checking if we are generally in an unprivileged mode.
-    auto dropper2 = DropPrivileges::get();
-    EXPECT_FALSE(dropper2->dropped());
-  }
-}
-
-TEST_F(PermissionsTests, test_path_drop) {
-  if (getuid() != 0) {
-    LOG(WARNING) << "Not root, skipping (path) unprivileged testing";
-    return;
-  }
-
-  // Attempt to drop to nobody based on ownership of paths.
-  auto nobody = getpwnam("nobody");
-  ASSERT_NE(nobody, nullptr);
-
-  {
-    int status =
-        chown(perm_path_.string().c_str(), nobody->pw_uid, nobody->pw_gid);
-    ASSERT_EQ(status, 0);
-
-    auto dropper = DropPrivileges::get();
-    EXPECT_TRUE(dropper->dropToParent((perm_path_ / "ro").string()));
-    EXPECT_TRUE(dropper->dropped_);
-    EXPECT_EQ(dropper->to_user_, nobody->pw_uid);
-
-    // Dropping "up" to root should fail.
-    // Even though this is possible and may make sense, it is confusing!
-    EXPECT_FALSE(dropper->dropTo(0, 0));
-
-    // Make sure the dropper worked!
-    EXPECT_EQ(geteuid(), nobody->pw_uid);
-  }
-
-  // Now that the dropper is gone, the effective user/group should be restored.
-  EXPECT_EQ(geteuid(), getuid());
-  EXPECT_EQ(getegid(), getgid());
-}
-
-TEST_F(PermissionsTests, test_functional_drop) {
-  if (getuid() != 0) {
-    LOG(WARNING) << "Not root, skipping (explicit) unprivileged testing";
-    return;
-  }
-
-  auto file_path = kTestWorkingDirectory + "permissions-file2";
-
-  {
-    writeTextFile(file_path, "data");
-    ASSERT_TRUE(platformChmod(file_path, 0400));
-  }
-
-  {
-    auto nobody = getpwnam("nobody");
-    auto dropper = DropPrivileges::get();
-    dropper->dropTo(nobody->pw_uid, nobody->pw_gid);
-    PlatformFile fd(file_path, PF_OPEN_EXISTING | PF_READ);
-    EXPECT_FALSE(fd.isValid());
-  }
-
-  osquery::removePath(file_path);
-}
-
-TEST_F(PermissionsTests, test_nobody_drop) {
-  if (getuid() != 0) {
-    LOG(WARNING) << "Not root, skipping (explicit) unprivileged testing";
-    return;
-  }
-
-  // Attempt to drop to nobody.
-  auto nobody = getpwnam("nobody");
-  ASSERT_NE(nobody, nullptr);
-
-  {
-    auto dropper = DropPrivileges::get();
-    EXPECT_TRUE(dropper->dropTo(nobody->pw_uid, nobody->pw_gid));
-    EXPECT_EQ(geteuid(), nobody->pw_uid);
-  }
-
-  {
-    auto dropper = DropPrivileges::get();
-    EXPECT_TRUE(dropper->dropTo(std::to_string(nobody->pw_uid),
-                                std::to_string(nobody->pw_gid)));
-    EXPECT_EQ(geteuid(), nobody->pw_uid);
-  }
-
-  // Now that the dropper is gone, the effective user/group should be restored.
-  EXPECT_EQ(geteuid(), getuid());
-  EXPECT_EQ(getegid(), getgid());
-}
-
-std::string kMultiThreadPermissionPath;
-
-class PermissionsRunnable : public InternalRunnable {
- public:
-  PermissionsRunnable() : InternalRunnable("PermissionsRunnable") {}
-  PermissionsRunnable(const std::string& name) : InternalRunnable(name) {}
-
- private:
-  virtual void start() override {
-    while (!interrupted()) {
-      if (!writeTextFile(kMultiThreadPermissionPath, "test")) {
-        throw std::runtime_error("Cannot write " + kMultiThreadPermissionPath);
-      }
-      ticks++;
-    }
-  }
-
- public:
-  std::atomic<size_t> ticks{0};
-};
-
-class PermissionsPollRunnable : public PermissionsRunnable {
- public:
-  PermissionsPollRunnable() : PermissionsRunnable("PermissionsPollRunnable") {}
-
- private:
-  void start() override {
-    PlatformFile file(kMultiThreadPermissionPath,
-                      PF_OPEN_EXISTING | PF_READ | PF_NONBLOCK);
-    auto file_fd = file.nativeHandle();
-
-    struct pollfd fds[1];
-    while (!interrupted()) {
-      std::memset(fds, 0, sizeof(fds));
-      fds[0].fd = file_fd;
-
-      result = poll(fds, 1, 1);
-      if (result == 0) {
-        ticks++;
-      }
-    }
-  }
-
- public:
-  std::atomic<int> result;
-};
-
-bool waitForTick(const std::shared_ptr<PermissionsRunnable>& runnable) {
-  size_t now = runnable->ticks;
-  size_t timeout = 1000;
-  size_t delay = 0;
-  while (delay < timeout) {
-    sleepFor(20);
-    if (runnable->ticks > now) {
-      return true;
-    }
-    sleepFor(200);
-    delay += 220;
-  }
-  return false;
-}
-
-TEST_F(PermissionsTests, test_multi_thread_permissions) {
-  if (getuid() != 0) {
-    LOG(WARNING) << "Not root, skipping multi-thread deprivilege testing";
-    return;
-  }
-
-  ASSERT_EQ(0U, geteuid());
-
-  // Set the multi-thread path, which both threads will write into.
-  auto multi_thread_path = perm_path_ / "threadperms.txt";
-  kMultiThreadPermissionPath = multi_thread_path.string();
-
-  // This thread has super-user permissions.
-  ASSERT_TRUE(writeTextFile(kMultiThreadPermissionPath, "test", 600));
-
-  // Start our permissions thread.
-  auto perms_thread = std::make_shared<PermissionsRunnable>();
-  Dispatcher::addService(perms_thread);
-
-  // Wait for the permissions thread to write once.
-  EXPECT_TRUE(waitForTick(perms_thread));
-
-  // Attempt to drop to nobody.
-  auto nobody = getpwnam("nobody");
-  EXPECT_NE(nobody, nullptr);
-
-  {
-    auto dropper = DropPrivileges::get();
-    EXPECT_TRUE(dropper->dropTo(nobody->pw_uid, nobody->pw_gid));
-    EXPECT_EQ(geteuid(), nobody->pw_uid);
-
-    // Now we wait for the permissions thread to write once while this thread's
-    // permissions are dropped.
-    EXPECT_TRUE(waitForTick(perms_thread));
-  }
-
-  Dispatcher::stopServices();
-  Dispatcher::joinServices();
-}
-
-TEST_F(PermissionsTests, test_multi_thread_poll) {
-  if (getuid() != 0) {
-    LOG(WARNING) << "Not root, skipping multi-thread deprivilege testing";
-    return;
-  }
-
-  ASSERT_EQ(0U, geteuid());
-
-  // Set the multi-thread path, which both threads will write into.
-  auto multi_thread_path = perm_path_ / "threadperms.txt";
-  kMultiThreadPermissionPath = multi_thread_path.string();
-
-  // Start our permissions thread.
-  auto pool_thread = std::make_shared<PermissionsPollRunnable>();
-  Dispatcher::addService(pool_thread);
-
-  // Wait for the permissions thread to write once.
-  EXPECT_TRUE(waitForTick(pool_thread));
-
-  auto nobody = getpwnam("nobody");
-  EXPECT_NE(nobody, nullptr);
-  {
-    auto dropper = DropPrivileges::get();
-    EXPECT_TRUE(dropper->dropTo(nobody->pw_uid, nobody->pw_gid));
-    EXPECT_EQ(geteuid(), nobody->pw_uid);
-
-    EXPECT_TRUE(waitForTick(pool_thread));
-  }
-
-  Dispatcher::stopServices();
-  Dispatcher::joinServices();
-}
-}
diff --git a/src/osquery/dispatcher/CMakeLists.txt b/src/osquery/dispatcher/CMakeLists.txt
deleted file mode 100644 (file)
index 82b34a2..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-#  Copyright (c) 2019 Samsung Electronics Co., Ltd All Rights Reserved
-#
-#  Licensed under the Apache License, Version 2.0 (the "License");
-#  you may not use this file except in compliance with the License.
-#  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License
-
-ADD_OSQUERY_LIBRARY(osquery_dispatcher dispatcher.cpp
-                                                                          scheduler.cpp)
-
-FILE(GLOB OSQUERY_DISPATCHER_TESTS "tests/*.cpp")
-ADD_OSQUERY_TEST(${OSQUERY_DISPATCHER_TESTS})
diff --git a/src/osquery/dispatcher/dispatcher.cpp b/src/osquery/dispatcher/dispatcher.cpp
deleted file mode 100644 (file)
index a634855..0000000
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- *  Copyright (c) 2014-present, Facebook, Inc.
- *  All rights reserved.
- *
- *  This source code is licensed in accordance with the terms specified in
- *  the LICENSE file found in the root directory of this source tree.
- */
-
-#include <osquery/dispatcher.h>
-#include <osquery/flags.h>
-#include <osquery/logger.h>
-#include <osquery/process/process.h>
-
-namespace osquery {
-
-/// The worker_threads define the default thread pool size.
-FLAG(int32, worker_threads, 4, "Number of work dispatch threads");
-
-
-void InterruptableRunnable::interrupt() {
-  // Set the service as interrupted.
-  if (!interrupted_.exchange(true)) {
-    // Tear down the service's resources such that exiting the expected run
-    // loop within ::start does not need to.
-    stop();
-    std::lock_guard<std::mutex> lock(condition_lock);
-    // Cancel the run loop's pause request.
-    condition_.notify_one();
-  }
-}
-
-bool InterruptableRunnable::interrupted() {
-  return interrupted_;
-}
-
-void InterruptableRunnable::pause(std::chrono::milliseconds milli) {
-  std::unique_lock<std::mutex> lock(condition_lock);
-  if (!interrupted_) {
-    condition_.wait_for(lock, milli);
-  }
-}
-
-void InternalRunnable::run() {
-  run_ = true;
-  setThreadName(name());
-  start();
-
-  // The service is complete.
-  Dispatcher::removeService(this);
-}
-
-Dispatcher& Dispatcher::instance() {
-  static Dispatcher instance;
-  return instance;
-}
-
-size_t Dispatcher::serviceCount() const {
-  ReadLock lock(mutex_);
-  return services_.size();
-}
-
-Status Dispatcher::addService(InternalRunnableRef service) {
-  if (service->hasRun()) {
-    return Status(1, "Cannot schedule a service twice");
-  }
-
-  auto& self = instance();
-  if (self.stopping_) {
-    // Cannot add a service while the dispatcher is stopping and no joins
-    // have been requested.
-    return Status(1, "Cannot add service, dispatcher is stopping");
-  }
-
-  auto thread = std::make_unique<std::thread>(
-      std::bind(&InternalRunnable::run, &*service));
-
-  DLOG(INFO) << "Adding new service: " << service->name() << " ("
-             << service.get() << ") to thread: " << thread->get_id() << " ("
-             << thread.get() << ") in process " << platformGetPid();
-  {
-    WriteLock lock(self.mutex_);
-
-    self.service_threads_.push_back(std::move(thread));
-    self.services_.push_back(std::move(service));
-  }
-  return Status::success();
-}
-
-void Dispatcher::removeService(const InternalRunnable* service) {
-  auto& self = Dispatcher::instance();
-  WriteLock lock(self.mutex_);
-
-  // Remove the service.
-  self.services_.erase(
-      std::remove_if(self.services_.begin(),
-                     self.services_.end(),
-                     [service](const InternalRunnableRef& target) {
-                       return (target.get() == service);
-                     }),
-      self.services_.end());
-}
-
-inline static void assureRun(const InternalRunnableRef& service) {
-  while (true) {
-    // Wait for each thread's entry point (start) meaning the thread context
-    // was allocated and (run) was called by std::thread started.
-    if (service->hasRun()) {
-      break;
-    }
-    // We only need to check if std::terminate is called very quickly after
-    // the std::thread is created.
-    sleepFor(20);
-  }
-}
-
-void Dispatcher::joinServices() {
-  auto& self = instance();
-  DLOG(INFO) << "Thread: " << std::this_thread::get_id()
-             << " requesting a join";
-
-  // Stops when service_threads_ is empty. Before stopping and releasing of the
-  // lock, empties services_ .
-  while (1) {
-    InternalThreadRef thread = nullptr;
-    {
-      WriteLock lock(self.mutex_);
-      if (!self.service_threads_.empty()) {
-        thread = std::move(self.service_threads_.back());
-        self.service_threads_.pop_back();
-      } else {
-        self.services_.clear();
-        break;
-      }
-    }
-    if (thread != nullptr) {
-      thread->join();
-      DLOG(INFO) << "Service thread: " << thread.get() << " has joined";
-    }
-  }
-
-  self.stopping_ = false;
-  DLOG(INFO) << "Services and threads have been cleared";
-}
-
-void Dispatcher::stopServices() {
-  auto& self = instance();
-  self.stopping_ = true;
-
-  WriteLock lock(self.mutex_);
-  DLOG(INFO) << "Thread: " << std::this_thread::get_id()
-             << " requesting a stop";
-  for (const auto& service : self.services_) {
-    assureRun(service);
-    service->interrupt();
-    DLOG(INFO) << "Service: " << service.get() << " has been interrupted";
-  }
-}
-} // namespace osquery
diff --git a/src/osquery/dispatcher/scheduler.cpp b/src/osquery/dispatcher/scheduler.cpp
deleted file mode 100644 (file)
index 2902b75..0000000
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- *  Copyright (c) 2014-present, Facebook, Inc.
- *  All rights reserved.
- *
- *  This source code is licensed in accordance with the terms specified in
- *  the LICENSE file found in the root directory of this source tree.
- */
-
-#include <algorithm>
-#include <ctime>
-
-#include <boost/format.hpp>
-#include <boost/io/detail/quoted_manip.hpp>
-
-#include <osquery/core.h>
-#include <osquery/data_logger.h>
-#include <osquery/database.h>
-#include <osquery/flags.h>
-#include <osquery/process/process.h>
-#include <osquery/query.h>
-#include <osquery/utils/system/time.h>
-
-#include "osquery/dispatcher/scheduler.h"
-#include "osquery/sql/sqlite_util.h"
-
-namespace osquery {
-
-FLAG(uint64, schedule_timeout, 0, "Limit the schedule, 0 for no limit");
-
-FLAG(uint64,
-     schedule_max_drift,
-     60,
-     "Max time drift in seconds. Scheduler tries to compensate the drift until "
-     "the drift exceed this value. After it the drift will be reseted to zero "
-     "and the compensation process will start from the beginning. It is needed "
-     "to avoid the problem of endless compensation (which is CPU greedy) after "
-     "a long SIGSTOP/SIGCONT pause or something similar. Set it to zero to "
-     "switch off a drift compensation. Default: 60");
-
-FLAG(uint64,
-     schedule_reload,
-     300,
-     "Interval in seconds to reload database arenas");
-
-FLAG(uint64, schedule_epoch, 0, "Epoch for scheduled queries");
-
-HIDDEN_FLAG(bool,
-            schedule_reload_sql,
-            false,
-            "Reload the SQL implementation during schedule reload");
-
-/// Used to bypass (optimize-out) the set-differential of query results.
-DECLARE_bool(events_optimize);
-
-SQLInternal monitor(const std::string& name, const ScheduledQuery& query) {
-  // Snapshot the performance and times for the worker before running.
-  auto pid = std::to_string(PlatformProcess::getCurrentPid());
-  auto r0 = SQL::selectFrom({"resident_size", "user_time", "system_time"},
-                            "processes",
-                            "pid",
-                            EQUALS,
-                            pid);
-  auto t0 = getUnixTime();
-  SQLInternal sql(query.query, true);
-  // Snapshot the performance after, and compare.
-  auto t1 = getUnixTime();
-  auto r1 = SQL::selectFrom({"resident_size", "user_time", "system_time"},
-                            "processes",
-                            "pid",
-                            EQUALS,
-                            pid);
-  return sql;
-}
-
-Status launchQuery(const std::string& name, const ScheduledQuery& query) {
-  // Execute the scheduled query and create a named query object.
-  LOG(INFO) << "Executing scheduled query " << name << ": " << query.query;
-
-  auto sql = monitor(name, query);
-  if (!sql.getStatus().ok()) {
-    LOG(ERROR) << "Error executing scheduled query " << name << ": "
-               << sql.getStatus().toString();
-    return Status::failure("Error executing scheduled query");
-  }
-
-  // Fill in a host identifier fields based on configuration or availability.
-  std::string ident = getHostIdentifier();
-
-  // A query log item contains an optional set of differential results or
-  // a copy of the most-recent execution alongside some query metadata.
-  QueryLogItem item;
-  item.name = name;
-  item.identifier = ident;
-  item.time = osquery::getUnixTime();
-  item.epoch = FLAGS_schedule_epoch;
-  item.calendar_time = osquery::getAsciiTime();
-
-  if (query.options.count("snapshot") && query.options.at("snapshot")) {
-    // This is a snapshot query, emit results with a differential or state.
-    item.snapshot_results = std::move(sql.rowsTyped());
-    logSnapshotQuery(item);
-    return Status::success();
-  }
-
-  // Create a database-backed set of query results.
-  auto dbQuery = Query(name, query);
-  // Comparisons and stores must include escaped data.
-  sql.escapeResults();
-  Status status;
-  DiffResults& diff_results = item.results;
-  // Add this execution's set of results to the database-tracked named query.
-  // We can then ask for a differential from the last time this named query
-  // was executed by exact matching each row.
-  if (!FLAGS_events_optimize || !sql.eventBased()) {
-    status = dbQuery.addNewResults(
-        std::move(sql.rowsTyped()), item.epoch, item.counter, diff_results);
-    if (!status.ok()) {
-      std::string line = "Error adding new results to database for query " +
-                         name + ": " + status.what();
-      LOG(ERROR) << line;
-
-      // If the database is not available then the daemon cannot continue.
-      Initializer::requestShutdown(EXIT_CATASTROPHIC, line);
-    }
-  } else {
-    diff_results.added = std::move(sql.rowsTyped());
-  }
-
-  if (query.options.count("removed") && !query.options.at("removed")) {
-    diff_results.removed.clear();
-  }
-
-  if (diff_results.added.empty() && diff_results.removed.empty()) {
-    // No diff results or events to emit.
-    return status;
-  }
-
-  VLOG(1) << "Found results for query: " << name;
-
-  status = logQueryLogItem(item);
-  if (!status.ok()) {
-    // If log directory is not available, then the daemon shouldn't continue.
-    std::string error = "Error logging the results of query: " + name + ": " +
-                        status.toString();
-    LOG(ERROR) << error;
-    Initializer::requestShutdown(EXIT_CATASTROPHIC, error);
-  }
-  return status;
-}
-
-void SchedulerRunner::start() {
-  // Start the counter at the second.
-  auto i = osquery::getUnixTime();
-  for (; (timeout_ == 0) || (i <= timeout_); ++i) {
-    auto start_time_point = std::chrono::steady_clock::now();
-    if (FLAGS_schedule_reload > 0 && (i % FLAGS_schedule_reload) == 0) {
-      if (FLAGS_schedule_reload_sql) {
-        SQLiteDBManager::resetPrimary();
-      }
-      resetDatabase();
-    }
-
-    // GLog is not re-entrant, so logs must be flushed in a dedicated thread.
-    if ((i % 3) == 0) {
-      relayStatusLogs(true);
-    }
-    auto loop_step_duration =
-        std::chrono::duration_cast<std::chrono::milliseconds>(
-            std::chrono::steady_clock::now() - start_time_point);
-    if (loop_step_duration + time_drift_ < interval_) {
-      pause(std::chrono::milliseconds(interval_ - loop_step_duration -
-                                      time_drift_));
-      time_drift_ = std::chrono::milliseconds::zero();
-    } else {
-      time_drift_ += loop_step_duration - interval_;
-      if (time_drift_ > max_time_drift_) {
-        // giving up
-        time_drift_ = std::chrono::milliseconds::zero();
-      }
-    }
-    if (interrupted()) {
-      break;
-    }
-  }
-}
-
-std::chrono::milliseconds SchedulerRunner::getCurrentTimeDrift() const
-    noexcept {
-  return time_drift_;
-}
-
-void startScheduler() {
-  startScheduler(static_cast<unsigned long int>(FLAGS_schedule_timeout), 1);
-}
-
-void startScheduler(unsigned long int timeout, size_t interval) {
-  Dispatcher::addService(std::make_shared<SchedulerRunner>(
-      timeout, interval, std::chrono::seconds{FLAGS_schedule_max_drift}));
-}
-} // namespace osquery
diff --git a/src/osquery/dispatcher/scheduler.h b/src/osquery/dispatcher/scheduler.h
deleted file mode 100644 (file)
index 045bc5c..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- *  Copyright (c) 2014-present, Facebook, Inc.
- *  All rights reserved.
- *
- *  This source code is licensed in accordance with the terms specified in
- *  the LICENSE file found in the root directory of this source tree.
- */
-
-#pragma once
-
-#include <chrono>
-#include <map>
-
-#include <osquery/dispatcher.h>
-
-#include "osquery/sql/sqlite_util.h"
-
-namespace osquery {
-
-/// A Dispatcher service thread that watches an ExtensionManagerHandler.
-class SchedulerRunner : public InternalRunnable {
- public:
-  SchedulerRunner(
-      unsigned long int timeout,
-      size_t interval,
-      std::chrono::milliseconds max_time_drift = std::chrono::seconds::zero())
-      : InternalRunnable("SchedulerRunner"),
-        interval_{std::chrono::seconds{interval}},
-        timeout_(timeout),
-        time_drift_{std::chrono::milliseconds::zero()},
-        max_time_drift_{max_time_drift} {}
-
- public:
-  /// The Dispatcher thread entry point.
-  void start() override;
-
-  /// The Dispatcher interrupt point.
-  void stop() override {}
-
-  /// Accumulated for some time time drift to compensate.
-  std::chrono::milliseconds getCurrentTimeDrift() const noexcept;
-
- private:
-  /// Interval in seconds between schedule steps.
-  const std::chrono::milliseconds interval_;
-
-  /// Maximum number of steps.
-  const unsigned long int timeout_;
-
-  /// Accumulated for some time time drift to compensate.
-  /// It will be either reduced during compensation process or
-  /// after exceding the limit @see max_time_drift_
-  std::chrono::milliseconds time_drift_;
-
-  const std::chrono::milliseconds max_time_drift_;
-};
-
-SQLInternal monitor(const std::string& name, const ScheduledQuery& query);
-
-/// Start querying according to the config's schedule
-void startScheduler();
-
-/// Helper scheduler start with variable settings for testing.
-void startScheduler(unsigned long int timeout, size_t interval);
-}
diff --git a/src/osquery/dispatcher/tests/dispatcher.cpp b/src/osquery/dispatcher/tests/dispatcher.cpp
deleted file mode 100644 (file)
index 1a62a2d..0000000
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- *  Copyright (c) 2014-present, Facebook, Inc.
- *  All rights reserved.
- *
- *  This source code is licensed in accordance with the terms specified in
- *  the LICENSE file found in the root directory of this source tree.
- */
-
-#include <chrono>
-
-#include <gtest/gtest.h>
-
-#include <osquery/dispatcher.h>
-#include <osquery/utils/status/status.h>
-
-namespace osquery {
-
-class DispatcherTests : public testing::Test {
-  void TearDown() override {
-    Dispatcher::instance().resetStopping();
-  }
-};
-
-TEST_F(DispatcherTests, test_singleton) {
-  auto& one = Dispatcher::instance();
-  auto& two = Dispatcher::instance();
-  EXPECT_EQ(&one, &two);
-}
-
-class InternalTestableRunnable : public InternalRunnable {
- public:
-  InternalTestableRunnable(const std::string& name) : InternalRunnable(name) {}
-
-  bool interrupted() override {
-    // A small conditional to force-skip an interruption check, used in testing.
-    if (!checked_) {
-      checked_ = true;
-      return false;
-    } else {
-      return InternalRunnable::interrupted();
-    }
-  }
-
- private:
-  /// Testing only, track the interruptible check for interruption.
-  bool checked_{false};
-};
-
-class TestRunnable : public InternalTestableRunnable {
- public:
-  explicit TestRunnable() : InternalTestableRunnable("TestRunnable") {}
-
-  virtual void start() override {
-    ++i;
-  }
-
-  void reset() {
-    i = 0;
-  }
-
-  size_t count() {
-    return i;
-  }
-
- private:
-  static std::atomic<size_t> i;
-};
-
-std::atomic<size_t> TestRunnable::i{0};
-
-TEST_F(DispatcherTests, test_service_count) {
-  auto runnable = std::make_shared<TestRunnable>();
-
-  auto service_count = Dispatcher::instance().serviceCount();
-  // The service exits after incrementing.
-  auto s = Dispatcher::addService(runnable);
-  EXPECT_TRUE(s);
-
-  // Wait for the service to stop.
-  Dispatcher::joinServices();
-
-  // Make sure the service is removed.
-  EXPECT_EQ(service_count, Dispatcher::instance().serviceCount());
-}
-
-TEST_F(DispatcherTests, test_run) {
-  auto runnable = std::make_shared<TestRunnable>();
-  runnable->reset();
-
-  // The service exits after incrementing.
-  Dispatcher::addService(runnable);
-  Dispatcher::joinServices();
-  EXPECT_EQ(1U, runnable->count());
-  EXPECT_TRUE(runnable->hasRun());
-
-  // This runnable cannot be executed again.
-  auto s = Dispatcher::addService(runnable);
-  EXPECT_FALSE(s);
-
-  Dispatcher::joinServices();
-  EXPECT_EQ(1U, runnable->count());
-}
-
-TEST_F(DispatcherTests, test_independent_run) {
-  // Nothing stops two instances of the same service from running.
-  auto r1 = std::make_shared<TestRunnable>();
-  auto r2 = std::make_shared<TestRunnable>();
-  r1->reset();
-
-  Dispatcher::addService(r1);
-  Dispatcher::addService(r2);
-  Dispatcher::joinServices();
-
-  EXPECT_EQ(2U, r1->count());
-}
-
-class BlockingTestRunnable : public InternalTestableRunnable {
- public:
-  explicit BlockingTestRunnable()
-      : InternalTestableRunnable("BlockingTestRunnable") {}
-
-  virtual void start() override {
-    // Wow that's a long sleep!
-    pause(std::chrono::seconds(100));
-  }
-};
-
-TEST_F(DispatcherTests, test_interruption) {
-  auto r1 = std::make_shared<BlockingTestRunnable>();
-  Dispatcher::addService(r1);
-
-  // This service would normally wait for 100 seconds.
-  r1->interrupt();
-
-  Dispatcher::joinServices();
-  EXPECT_TRUE(r1->hasRun());
-}
-
-TEST_F(DispatcherTests, test_stop_dispatcher) {
-  Dispatcher::stopServices();
-
-  auto r1 = std::make_shared<TestRunnable>();
-  auto s = Dispatcher::addService(r1);
-  EXPECT_FALSE(s);
-}
-}
index ace461e5e7da8341fd954629c83831d11182e8ca..149521e01bced75504a0f7716198c7bda5c507f8 100644 (file)
@@ -658,7 +658,6 @@ Status EventFactory::run(const std::string& type_id) {
   } else if (publisher->hasStarted()) {
     return Status(1, "Cannot restart an event publisher");
   }
-  setThreadName(publisher->name());
   VLOG(1) << "Starting event publisher run loop: " + type_id;
   publisher->hasStarted(true);
 
@@ -670,10 +669,6 @@ Status EventFactory::run(const std::string& type_id) {
       break;
     }
     publisher->restart_count_++;
-    // This is a 'default' cool-off implemented in InterruptableRunnable.
-    // If a publisher fails to perform some sort of interruption point, this
-    // prevents the thread from thrashing through exiting checks.
-    publisher->pause(std::chrono::milliseconds(200));
   }
   if (!status.ok()) {
     // The runloop status is not reflective of the event type's.
@@ -881,8 +876,6 @@ Status EventFactory::deregisterEventPublisher(const std::string& type_id) {
       // If the run loop did run the tear down and erase will happen in the
       // event thread wrapper when isEnding is next checked.
       ef.event_pubs_.erase(type_id);
-    } else {
-      publisher->stop();
     }
   }
   return Status::success();
diff --git a/src/osquery/include/osquery/dispatcher.h b/src/osquery/include/osquery/dispatcher.h
deleted file mode 100644 (file)
index dc29a81..0000000
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- *  Copyright (c) 2014-present, Facebook, Inc.
- *  All rights reserved.
- *
- *  This source code is licensed in accordance with the terms specified in
- *  the LICENSE file found in the root directory of this source tree.
- */
-
-#pragma once
-
-#include <atomic>
-#include <chrono>
-#include <condition_variable>
-#include <memory>
-#include <thread>
-#include <vector>
-
-#include <gtest/gtest_prod.h>
-
-#include <boost/noncopyable.hpp>
-
-#include <osquery/core.h>
-#include <osquery/utils/mutex.h>
-
-namespace osquery {
-
-class Status;
-class Dispatcher;
-
-
-class InterruptableRunnable {
- public:
-  virtual ~InterruptableRunnable() = default;
-
-  /**
-   * @brief The std::thread's interruption point.
-   */
-  virtual void interrupt() final;
-
-  /// Returns the runner name
-  std::string name() const {
-    return runnable_name_;
-  }
-
- protected:
-  /// Allow the runnable to check interruption.
-  virtual bool interrupted();
-
-  /// Require the runnable thread to define a stop/interrupt point.
-  virtual void stop() = 0;
-
-  /// Put the runnable into an interruptible sleep.
-  void pause(std::chrono::milliseconds milli);
-
-  /// Name of the InterruptableRunnable which is also the thread name
-  std::string runnable_name_;
-
- private:
-  /**
-   * @brief Used to wait for the interruption notification while sleeping
-   */
-  std::mutex condition_lock;
-
-  /// If a service includes a run loop it should check for interrupted.
-  std::atomic<bool> interrupted_{false};
-
-  /// Wait for notification or a pause expiration.
-  std::condition_variable condition_;
-
- private:
-  FRIEND_TEST(DispatcherTests, test_run);
-  FRIEND_TEST(DispatcherTests, test_independent_run);
-  FRIEND_TEST(DispatcherTests, test_interruption);
-  FRIEND_TEST(BufferedLogForwarderTests, test_async);
-};
-
-class InternalRunnable : private boost::noncopyable,
-                         public InterruptableRunnable {
- public:
-  InternalRunnable(const std::string& name) : run_(false) {
-    runnable_name_ = name;
-  }
-  virtual ~InternalRunnable() override = default;
-
- public:
-  /**
-   * @brief The std::thread entrypoint.
-   *
-   * This is used by the Dispatcher only.
-   */
-  virtual void run() final;
-
-  /**
-   * @brief Check if the thread's entrypoint (run) executed.
-   *
-   * It is possible for the Runnable to be allocated without the thread context.
-   * #hasRun makes a much better guess at the state of the thread.
-   * If it has run then stop must be called.
-   */
-  bool hasRun() {
-    return run_;
-  }
-
- protected:
-  /// Require the runnable thread define an entrypoint.
-  virtual void start() = 0;
-
-  /// The runnable thread may optionally define a stop/interrupt point.
-  void stop() override {}
-
- private:
-  std::atomic<bool> run_{false};
-};
-
-/// An internal runnable used throughout osquery as dispatcher services.
-using InternalRunnableRef = std::shared_ptr<InternalRunnable>;
-using InternalThreadRef = std::unique_ptr<std::thread>;
-
-/**
- * @brief Singleton for queuing asynchronous tasks to be executed in parallel
- *
- * Dispatcher is a singleton which can be used to coordinate the parallel
- * execution of asynchronous tasks across an application. Internally,
- * Dispatcher is back by the Apache Thrift thread pool.
- */
-class Dispatcher : private boost::noncopyable {
- public:
-  /**
-   * @brief The primary way to access the Dispatcher factory facility.
-   *
-   * @code{.cpp} auto dispatch = osquery::Dispatcher::instance(); @endcode
-   *
-   * @return The osquery::Dispatcher instance.
-   */
-  static Dispatcher& instance();
-
-  /// See `add`, but services are not limited to a thread poll size.
-  static Status addService(InternalRunnableRef service);
-
-  /// See `join`, but applied to osquery services.
-  static void joinServices();
-
-  /// Destroy and stop all osquery service threads and service objects.
-  static void stopServices();
-
-  /// Return number of services.
-  size_t serviceCount() const;
-
- private:
-  /**
-   * @brief Default constructor.
-   *
-   * Since instances of Dispatcher should only be created via instance(),
-   * Dispatcher's constructor is private.
-   */
-  Dispatcher() = default;
-
- private:
-  /// When a service ends, it will remove itself from the dispatcher.
-  static void removeService(const InternalRunnable* service);
-
- private:
-  /// For testing only, reset the stopping status for unittests.
-  void resetStopping() {
-    stopping_ = false;
-  }
-
- private:
-  /// The set of shared osquery service threads.
-  std::vector<InternalThreadRef> service_threads_;
-
-  /// The set of shared osquery services.
-  std::vector<InternalRunnableRef> services_;
-
-  // Protection around service access.
-  mutable Mutex mutex_;
-
-
-  /**
-   * @brief Signal to the Dispatcher that no services should be created.
-   *
-   * The Dispatcher will not add services if it is shutting down until
-   * a join has completed of existing services.
-   *
-   * This prevents a very strange race where the dispatcher is signaled to
-   * abort or interrupt and serviced are sill waiting to be added.
-   * A future join will be requested AFTER all services were expected to have
-   * been interrupted.
-   */
-  std::atomic<bool> stopping_{false};
-
- private:
-  friend class InternalRunnable;
-  friend class ExtensionsTests;
-  friend class DispatcherTests;
-};
-} // namespace osquery
diff --git a/src/osquery/include/osquery/distributed.h b/src/osquery/include/osquery/distributed.h
deleted file mode 100644 (file)
index 837bcdb..0000000
+++ /dev/null
@@ -1,280 +0,0 @@
-/**
- *  Copyright (c) 2014-present, Facebook, Inc.
- *  All rights reserved.
- *
- *  This source code is licensed in accordance with the terms specified in
- *  the LICENSE file found in the root directory of this source tree.
- */
-
-#pragma once
-
-#include <string>
-#include <vector>
-
-#include <osquery/plugins/plugin.h>
-#include <osquery/query.h>
-#include <osquery/utils/status/status.h>
-
-namespace osquery {
-
-/**
- * @brief Small struct containing the query and ID information for a
- * distributed query
- */
-struct DistributedQueryRequest {
- public:
-  explicit DistributedQueryRequest() {}
-
-  std::string query;
-  std::string id;
-};
-
-/**
- * @brief Serialize a DistributedQueryRequest into a property tree
- *
- * @param r the DistributedQueryRequest to serialize
- * @param doc the input JSON managed document
- * @param obj the output rapidjson document [object]
- *
- * @return Status indicating the success or failure of the operation
- */
-Status serializeDistributedQueryRequest(const DistributedQueryRequest& r,
-                                        JSON& doc,
-                                        rapidjson::Value& obj);
-
-/**
- * @brief Serialize a DistributedQueryRequest object into a JSON string
- *
- * @param r the DistributedQueryRequest to serialize
- * @param json the output JSON string
- *
- * @return Status indicating the success or failure of the operation
- */
-Status serializeDistributedQueryRequestJSON(const DistributedQueryRequest& r,
-                                            std::string& json);
-
-/**
- * @brief Deserialize a DistributedQueryRequest object from a property tree
- *
- * @param obj the input rapidjson value [object]
- * @param r the output DistributedQueryRequest structure
- *
- * @return Status indicating the success or failure of the operation
- */
-Status deserializeDistributedQueryRequest(const rapidjson::Value& obj,
-                                          DistributedQueryRequest& r);
-
-/**
- * @brief Deserialize a DistributedQueryRequest object from a JSON string
- *
- * @param json the input JSON string
- * @param r the output DistributedQueryRequest structure
- *
- * @return Status indicating the success or failure of the operation
- */
-Status deserializeDistributedQueryRequestJSON(const std::string& json,
-                                              DistributedQueryRequest& r);
-
-/**
- * @brief Small struct containing the results of a distributed query
- */
-struct DistributedQueryResult {
- public:
-  DistributedQueryResult() {}
-  DistributedQueryResult(const DistributedQueryRequest& req,
-                         const QueryData& res,
-                         const ColumnNames& cols,
-                         const Status& s)
-      : request(req), results(res), columns(cols), status(s) {}
-
-  DistributedQueryRequest request;
-  QueryData results;
-  ColumnNames columns;
-  Status status;
-};
-
-/**
- * @brief Serialize a DistributedQueryResult into a property tree
- *
- * @param r the DistributedQueryResult to serialize
- * @param doc the input JSON managed document
- * @param obj the output rapidjson document [object]
- *
- * @return Status indicating the success or failure of the operation
- */
-Status serializeDistributedQueryResult(const DistributedQueryResult& r,
-                                       JSON& doc,
-                                       rapidjson::Value& obj);
-/**
- * @brief Serialize a DistributedQueryResult object into a JSON string
- *
- * @param r the DistributedQueryResult to serialize
- * @param json the output JSON string
- *
- * @return Status indicating the success or failure of the operation
- */
-Status serializeDistributedQueryResultJSON(const DistributedQueryResult& r,
-                                           std::string& json);
-
-/**
- * @brief Deserialize a DistributedQueryResult object from a property tree
- *
- * @param obj the input rapidjson document [object]
- * @param r the output DistributedQueryResult structure
- *
- * @return Status indicating the success or failure of the operation
- */
-Status deserializeDistributedQueryResult(const rapidjson::Value& obj,
-                                         DistributedQueryResult& r);
-
-/**
- * @brief Deserialize a DistributedQueryResult object from a JSON string
- *
- * @param json the input JSON string
- * @param r the output DistributedQueryResult structure
- *
- * @return Status indicating the success or failure of the operation
- */
-Status deserializeDistributedQueryResultJSON(const std::string& json,
-                                             DistributedQueryResult& r);
-
-class DistributedPlugin : public Plugin {
- public:
-  /**
-   * @brief Get the queries to be executed
-   *
-   * Consider the following example JSON which represents the expected format
-   *
-   * @code{.json}
-   *   {
-   *     "queries": {
-   *       "id1": "select * from osquery_info",
-   *       "id2": "select * from osquery_schedule"
-   *     }
-   *   }
-   * @endcode
-   *
-   * @param json is the string to populate the queries data structure with
-   * @return a Status indicating the success or failure of the operation
-   */
-  virtual Status getQueries(std::string& json) = 0;
-
-  /**
-   * @brief Write the results that were executed
-   *
-   * Consider the following JSON which represents the format that will be used:
-   *
-   * @code{.json}
-   *   {
-   *     "queries": {
-   *       "id1": [
-   *         {
-   *           "col1": "val1",
-   *           "col2": "val2"
-   *         },
-   *         {
-   *           "col1": "val1",
-   *           "col2": "val2"
-   *         }
-   *       ],
-   *       "id2": [
-   *         {
-   *           "col1": "val1",
-   *           "col2": "val2"
-   *         }
-   *       ]
-   *     }
-   *   }
-   * @endcode
-   *
-   * @param json is the results data to write
-   * @return a Status indicating the success or failure of the operation
-   */
-  virtual Status writeResults(const std::string& json) = 0;
-
-  /// Main entrypoint for distributed plugin requests
-  Status call(const PluginRequest& request, PluginResponse& response) override;
-};
-
-/**
- * @brief Class for managing the set of distributed queries to execute
- *
- * Consider the following workflow example, without any error handling
- *
- * @code{.cpp}
- *   auto dist = Distributed();
- *   while (true) {
- *     dist.pullUpdates();
- *     if (dist.getPendingQueryCount() > 0) {
- *       dist.runQueries();
- *     }
- *   }
- * @endcode
- */
-class Distributed {
- public:
-  /// Default constructor
-  Distributed() {}
-
-  /// Retrieve queued queries from a remote server
-  Status pullUpdates();
-
-  /// Get the number of queries which are waiting to be executed
-  size_t getPendingQueryCount();
-
-  /// Get the number of results which are waiting to be flushed
-  size_t getCompletedCount();
-
-  /// Serialize result data into a JSON string and clear the results
-  Status serializeResults(std::string& json);
-
-  /// Process and execute queued queries
-  Status runQueries();
-
-  // Getter for ID of currently executing request
-  static std::string getCurrentRequestId();
-
- protected:
-  /**
-   * @brief Process several queries from a distributed plugin
-   *
-   * Given a response from a distributed plugin, parse the results and enqueue
-   * them in the internal state of the class
-   *
-   * @param work is the string from DistributedPlugin::getQueries
-   * @return a Status indicating the success or failure of the operation
-   */
-  Status acceptWork(const std::string& work);
-
-  /**
-   * @brief Pop a request object off of the queries_ member
-   *
-   * @return a DistributedQueryRequest object which needs to be executed
-   */
-  DistributedQueryRequest popRequest();
-
-  /**
-   * @brief Queue a result to be batch sent to the server
-   *
-   * @param result is a DistributedQueryResult object to be sent to the server
-   */
-  void addResult(const DistributedQueryResult& result);
-
-  /**
-   * @brief Flush all of the collected results to the server
-   */
-  Status flushCompleted();
-
-  // Setter for ID of currently executing request
-  static void setCurrentRequestId(const std::string& cReqId);
-
-  std::vector<DistributedQueryResult> results_;
-
-  // ID of the currently executing query
-  static std::string currentRequestId_;
-
- private:
-  friend class DistributedTests;
-  FRIEND_TEST(DistributedTests, DISABLED_test_workflow);
-};
-}
index 67f3482908e8a243adc022e6086a02df3b44e054..862a70b70be200a1df0a5250a63221c716c9ea9b 100644 (file)
@@ -20,9 +20,9 @@
 #include <gtest/gtest_prod.h>
 
 #include <osquery/core.h>
-#include <osquery/dispatcher.h>
 #include <osquery/tables.h>
 #include <osquery/utils/status/status.h>
+#include <osquery/utils/mutex.h>
 
 namespace osquery {
 
@@ -216,7 +216,6 @@ class Eventer {
 };
 
 class EventPublisherPlugin : public Plugin,
-                             public InterruptableRunnable,
                              public Eventer {
  public:
   /**
@@ -261,16 +260,6 @@ class EventPublisherPlugin : public Plugin,
     return Status(1, "No run loop required");
   }
 
-  /**
-   * @brief Allow the EventFactory to interrupt the run loop.
-   *
-   * Assume the main thread may ask the run loop to stop at anytime.
-   * Before end is called the publisher's `isEnding` is set and the EventFactory
-   * run loop manager will exit the stepping loop and fall through to a call
-   * to tearDown followed by a removal of the publisher.
-   */
-  void stop() override {}
-
   /// This is a plugin type and must implement a call method.
   Status call(const PluginRequest& /*request*/,
               PluginResponse& /*response*/) override {
index 33646290554fbb611bb705eba6bbaa418f2d793c..3953e9da7ae43dda6c527843549ba6e6a603f610 100644 (file)
@@ -17,7 +17,6 @@
 #include <osquery/core.h>
 #include <osquery/database.h>
 #include <osquery/devtools/devtools.h>
-#include <osquery/dispatcher/scheduler.h>
 #include <osquery/filesystem/fileops.h>
 #include <osquery/flags.h>
 #include <osquery/logger.h>
@@ -88,9 +87,6 @@ int profile(int argc, char* argv[]) {
 int startDaemon(Initializer& runner) {
   runner.start();
 
-  // Begin the schedule runloop.
-  startScheduler();
-
 //  osquery::events::init_syscall_tracing();
 
   // Finally wait for a signal / interrupt to shutdown.