#include <ostream>
#include <sstream>
#include <mutex>
+#include <deque>
#include <condition_variable>
#include <thread>
// #include <regex> // on rhel7 gcc 4.8, not competent
{ NULL, 0, NULL, 0, "Scanners:", 1 },
{ "scan-file-dir", 'F', NULL, 0, "Enable ELF/DWARF file scanning threads.", 0 },
{ "scan-rpm-dir", 'R', NULL, 0, "Enable RPM scanning threads.", 0 },
- { "scan-deb-dir", 'U', NULL, 0, "Enable DEB scanning threads.", 0 },
- // "source-oci-imageregistry" ...
+ { "scan-deb-dir", 'U', NULL, 0, "Enable DEB scanning threads.", 0 },
+ // "source-oci-imageregistry" ...
{ NULL, 0, NULL, 0, "Options:", 2 },
{ "logical", 'L', NULL, 0, "Follow symlinks, default=ignore.", 0 },
static void add_metric(const string& metric,
const string& lname, const string& lvalue,
int64_t value);
+// static void add_metric(const string& metric, int64_t value);
/* Handle program arguments. */
static error_t
////////////////////////////////////////////////////////////////////////
-// a c++ counting-semaphore class ... since we're c++11 not c++20
-
-class semaphore
+template <typename Payload>
+class workq
{
+ deque<Payload> q;
+ mutex mtx;
+ condition_variable cv;
+ bool dead;
+
public:
- semaphore (unsigned c=1): count(c) {}
- inline void notify () {
+ workq() { dead = false;}
+ ~workq() {}
+
+ void push_back(const Payload& p)
+ {
unique_lock<mutex> lock(mtx);
- count++;
+ q.push_back(p);
+ set_metric("thread_work_pending","role","scan", q.size());
cv.notify_one();
}
- inline void wait() {
+
+ void nuke() {
+ unique_lock<mutex> lock(mtx);
+ // optional: q.clear();
+ dead = true;
+ cv.notify_all();
+ }
+
+ bool wait_front (Payload& p)
+ {
unique_lock<mutex> lock(mtx);
- while (count == 0)
+ while (q.size() == 0 && !dead)
cv.wait(lock);
- count--;
+ if (dead)
+ return false;
+ else
+ {
+ p = q.front();
+ q.pop_front();
+ set_metric("thread_work_pending","role","scan", q.size());
+ return true;
+ }
}
-private:
- mutex mtx;
- condition_variable cv;
- unsigned count;
};
+typedef struct stat stat_t;
+typedef pair<string,stat_t> scan_payload;
+static workq<scan_payload> scanq; // just a single one
+// producer: thread_main_fts_source_paths()
+// consumer: thread_main_scanner()
-class semaphore_borrower
-{
-public:
- semaphore_borrower(semaphore* s): sem(s) { sem->wait(); }
- ~semaphore_borrower() { sem->notify(); }
-private:
- semaphore* sem;
-};
////////////////////////////////////////////////////////////////////////
unique_lock<mutex> lock(metrics_lock);
metrics[key] += value;
}
+#if 0
+static void
+add_metric(const string& metric,
+ int64_t value)
+{
+ unique_lock<mutex> lock(metrics_lock);
+ metrics[metric] += value;
+}
+#endif
// and more for higher arity labels if needed
}
-static semaphore* scan_concurrency_sem = 0; // used to implement -c load limiting
-
-
static void
-scan_source_file_path (const string& dir)
+scan_source_file (const string& rps, const stat_t& st,
+ sqlite_ps& ps_upsert_buildids,
+ sqlite_ps& ps_upsert_files,
+ sqlite_ps& ps_upsert_de,
+ sqlite_ps& ps_upsert_s,
+ sqlite_ps& ps_query,
+ sqlite_ps& ps_scan_done,
+ unsigned& fts_cached,
+ unsigned& fts_executable,
+ unsigned& fts_debuginfo,
+ unsigned& fts_sourcefiles)
{
- obatched(clog) << "fts/file traversing " << dir << endl;
+ /* See if we know of it already. */
+ int rc = ps_query
+ .reset()
+ .bind(1, rps)
+ .bind(2, st.st_mtime)
+ .step();
+ ps_query.reset();
+ if (rc == SQLITE_ROW) // i.e., a result, as opposed to DONE (no results)
+ // no need to recheck a file/version we already know
+ // specifically, no need to elf-begin a file we already determined is non-elf
+ // (so is stored with buildid=NULL)
+ {
+ fts_cached++;
+ return;
+ }
- struct timeval tv_start, tv_end;
- gettimeofday (&tv_start, NULL);
-
- sqlite_ps ps_upsert_buildids (db, "file-buildids-intern", "insert or ignore into " BUILDIDS "_buildids VALUES (NULL, ?);");
- sqlite_ps ps_upsert_files (db, "file-files-intern", "insert or ignore into " BUILDIDS "_files VALUES (NULL, ?);");
- sqlite_ps ps_upsert_de (db, "file-de-upsert",
- "insert or ignore into " BUILDIDS "_f_de "
- "(buildid, debuginfo_p, executable_p, file, mtime) "
- "values ((select id from " BUILDIDS "_buildids where hex = ?),"
- " ?,?,"
- " (select id from " BUILDIDS "_files where name = ?), ?);");
- sqlite_ps ps_upsert_s (db, "file-s-upsert",
- "insert or ignore into " BUILDIDS "_f_s "
- "(buildid, artifactsrc, file, mtime) "
- "values ((select id from " BUILDIDS "_buildids where hex = ?),"
- " (select id from " BUILDIDS "_files where name = ?),"
- " (select id from " BUILDIDS "_files where name = ?),"
- " ?);");
- sqlite_ps ps_query (db, "file-negativehit-find",
- "select 1 from " BUILDIDS "_file_mtime_scanned where sourcetype = 'F' and file = (select id from " BUILDIDS "_files where name = ?) and mtime = ?;");
- sqlite_ps ps_scan_done (db, "file-scanned",
- "insert or ignore into " BUILDIDS "_file_mtime_scanned (sourcetype, file, mtime, size)"
- "values ('F', (select id from " BUILDIDS "_files where name = ?), ?, ?);");
+ bool executable_p = false, debuginfo_p = false; // E and/or D
+ string buildid;
+ set<string> sourcefiles;
+ int fd = open (rps.c_str(), O_RDONLY);
+ try
+ {
+ if (fd >= 0)
+ elf_classify (fd, executable_p, debuginfo_p, buildid, sourcefiles);
+ else
+ throw libc_exception(errno, string("open ") + rps);
+ inc_metric ("scanned_total","source","file");
+ }
+ // NB: we catch exceptions here too, so that we can
+ // cache the corrupt-elf case (!executable_p &&
+ // !debuginfo_p) just below, just as if we had an
+ // EPERM error from open(2).
+ catch (const reportable_exception& e)
+ {
+ e.report(clog);
+ }
- char * const dirs[] = { (char*) dir.c_str(), NULL };
+ if (fd >= 0)
+ close (fd);
- unsigned fts_scanned=0, fts_regex=0, fts_cached=0, fts_debuginfo=0, fts_executable=0, fts_sourcefiles=0;
+ // register this file name in the interning table
+ ps_upsert_files
+ .reset()
+ .bind(1, rps)
+ .step_ok_done();
- FTS *fts = fts_open (dirs,
- (traverse_logical ? FTS_LOGICAL : FTS_PHYSICAL|FTS_XDEV)
- | FTS_NOCHDIR /* multithreaded */,
- NULL);
- if (fts == NULL)
+ if (buildid == "")
{
- obatched(cerr) << "cannot fts_open " << dir << endl;
- return;
+ // no point storing an elf file without buildid
+ executable_p = false;
+ debuginfo_p = false;
}
-
- FTSENT *f;
- while ((f = fts_read (fts)) != NULL)
+ else
{
- semaphore_borrower handle_one_file (scan_concurrency_sem);
-
- fts_scanned ++;
- if (interrupted)
- break;
-
- if (verbose > 2)
- obatched(clog) << "fts/file traversing " << f->fts_path << endl;
+ // register this build-id in the interning table
+ ps_upsert_buildids
+ .reset()
+ .bind(1, buildid)
+ .step_ok_done();
+ }
- try
- {
- /* Found a file. Convert it to an absolute path, so
- the buildid database does not have relative path
- names that are unresolvable from a subsequent run
- in a different cwd. */
- char *rp = realpath(f->fts_path, NULL);
- if (rp == NULL)
- continue; // ignore dangling symlink or such
- string rps = string(rp);
- free (rp);
-
- bool ri = !regexec (&file_include_regex, rps.c_str(), 0, 0, 0);
- bool rx = !regexec (&file_exclude_regex, rps.c_str(), 0, 0, 0);
- if (!ri || rx)
- {
- if (verbose > 3)
- obatched(clog) << "fts/file skipped by regex " << (!ri ? "I" : "") << (rx ? "X" : "") << endl;
- fts_regex ++;
- continue;
- }
+ if (executable_p)
+ fts_executable ++;
+ if (debuginfo_p)
+ fts_debuginfo ++;
+ if (executable_p || debuginfo_p)
+ {
+ ps_upsert_de
+ .reset()
+ .bind(1, buildid)
+ .bind(2, debuginfo_p ? 1 : 0)
+ .bind(3, executable_p ? 1 : 0)
+ .bind(4, rps)
+ .bind(5, st.st_mtime)
+ .step_ok_done();
+ }
+ if (executable_p)
+ inc_metric("found_executable_total","source","files");
+ if (debuginfo_p)
+ inc_metric("found_debuginfo_total","source","files");
- switch (f->fts_info)
- {
- case FTS_D:
- break;
-
- case FTS_DP:
- break;
-
- case FTS_F:
- {
- /* See if we know of it already. */
- int rc = ps_query
- .reset()
- .bind(1, rps)
- .bind(2, f->fts_statp->st_mtime)
- .step();
- ps_query.reset();
- if (rc == SQLITE_ROW) // i.e., a result, as opposed to DONE (no results)
- // no need to recheck a file/version we already know
- // specifically, no need to elf-begin a file we already determined is non-elf
- // (so is stored with buildid=NULL)
- {
- fts_cached ++;
- continue;
- }
-
- bool executable_p = false, debuginfo_p = false; // E and/or D
- string buildid;
- set<string> sourcefiles;
-
- int fd = open (rps.c_str(), O_RDONLY);
- try
- {
- if (fd >= 0)
- elf_classify (fd, executable_p, debuginfo_p, buildid, sourcefiles);
- else
- throw libc_exception(errno, string("open ") + rps);
- inc_metric ("scanned_total","source","file");
- }
-
- // NB: we catch exceptions here too, so that we can
- // cache the corrupt-elf case (!executable_p &&
- // !debuginfo_p) just below, just as if we had an
- // EPERM error from open(2).
-
- catch (const reportable_exception& e)
- {
- e.report(clog);
- }
-
- if (fd >= 0)
- close (fd);
-
- // register this file name in the interning table
- ps_upsert_files
- .reset()
- .bind(1, rps)
- .step_ok_done();
-
- if (buildid == "")
- {
- // no point storing an elf file without buildid
- executable_p = false;
- debuginfo_p = false;
- }
- else
- {
- // register this build-id in the interning table
- ps_upsert_buildids
- .reset()
- .bind(1, buildid)
- .step_ok_done();
- }
-
- if (executable_p)
- fts_executable ++;
- if (debuginfo_p)
- fts_debuginfo ++;
- if (executable_p || debuginfo_p)
- {
- ps_upsert_de
- .reset()
- .bind(1, buildid)
- .bind(2, debuginfo_p ? 1 : 0)
- .bind(3, executable_p ? 1 : 0)
- .bind(4, rps)
- .bind(5, f->fts_statp->st_mtime)
- .step_ok_done();
- }
- if (executable_p)
- inc_metric("found_executable_total","source","files");
- if (debuginfo_p)
- inc_metric("found_debuginfo_total","source","files");
-
- if (sourcefiles.size() && buildid != "")
- {
- fts_sourcefiles += sourcefiles.size();
-
- for (auto&& dwarfsrc : sourcefiles)
- {
- char *srp = realpath(dwarfsrc.c_str(), NULL);
- if (srp == NULL) // also if DWZ unresolved dwarfsrc=""
- continue; // unresolvable files are not a serious problem
- // throw libc_exception(errno, "fts/file realpath " + srcpath);
- string srps = string(srp);
- free (srp);
-
- struct stat sfs;
- rc = stat(srps.c_str(), &sfs);
- if (rc != 0)
- continue;
-
- if (verbose > 2)
- obatched(clog) << "recorded buildid=" << buildid << " file=" << srps
- << " mtime=" << sfs.st_mtime
- << " as source " << dwarfsrc << endl;
-
- ps_upsert_files
- .reset()
- .bind(1, srps)
- .step_ok_done();
-
- // register the dwarfsrc name in the interning table too
- ps_upsert_files
- .reset()
- .bind(1, dwarfsrc)
- .step_ok_done();
-
- ps_upsert_s
- .reset()
- .bind(1, buildid)
- .bind(2, dwarfsrc)
- .bind(3, srps)
- .bind(4, sfs.st_mtime)
- .step_ok_done();
-
- inc_metric("found_sourcerefs_total","source","files");
- }
- }
-
- ps_scan_done
- .reset()
- .bind(1, rps)
- .bind(2, f->fts_statp->st_mtime)
- .bind(3, f->fts_statp->st_size)
- .step_ok_done();
-
- if (verbose > 2)
- obatched(clog) << "recorded buildid=" << buildid << " file=" << rps
- << " mtime=" << f->fts_statp->st_mtime << " atype="
- << (executable_p ? "E" : "")
- << (debuginfo_p ? "D" : "") << endl;
- }
- break;
-
- case FTS_ERR:
- case FTS_NS:
- throw libc_exception(f->fts_errno, string("fts/file traversal ") + string(f->fts_path));
-
- default:
- case FTS_SL: /* ignore symlinks; seen in non-L mode only */
- break;
- }
+ if (sourcefiles.size() && buildid != "")
+ {
+ fts_sourcefiles += sourcefiles.size();
- if ((verbose && f->fts_info == FTS_DP) ||
- (verbose > 1 && f->fts_info == FTS_F))
- obatched(clog) << "fts/file traversing " << rps << ", scanned=" << fts_scanned
- << ", regex-skipped=" << fts_regex
- << ", cached=" << fts_cached << ", debuginfo=" << fts_debuginfo
- << ", executable=" << fts_executable << ", source=" << fts_sourcefiles << endl;
- }
- catch (const reportable_exception& e)
+ for (auto&& dwarfsrc : sourcefiles)
{
- e.report(clog);
- }
- }
- fts_close (fts);
+ char *srp = realpath(dwarfsrc.c_str(), NULL);
+ if (srp == NULL) // also if DWZ unresolved dwarfsrc=""
+ continue; // unresolvable files are not a serious problem
+ // throw libc_exception(errno, "fts/file realpath " + srcpath);
+ string srps = string(srp);
+ free (srp);
+
+ struct stat sfs;
+ rc = stat(srps.c_str(), &sfs);
+ if (rc != 0)
+ continue;
- gettimeofday (&tv_end, NULL);
- double deltas = (tv_end.tv_sec - tv_start.tv_sec) + (tv_end.tv_usec - tv_start.tv_usec)*0.000001;
+ if (verbose > 2)
+ obatched(clog) << "recorded buildid=" << buildid << " file=" << srps
+ << " mtime=" << sfs.st_mtime
+ << " as source " << dwarfsrc << endl;
- obatched(clog) << "fts/file traversed " << dir << " in " << deltas << "s, scanned=" << fts_scanned
- << ", regex-skipped=" << fts_regex
- << ", cached=" << fts_cached << ", debuginfo=" << fts_debuginfo
- << ", executable=" << fts_executable << ", source=" << fts_sourcefiles << endl;
-}
+ ps_upsert_files
+ .reset()
+ .bind(1, srps)
+ .step_ok_done();
+ // register the dwarfsrc name in the interning table too
+ ps_upsert_files
+ .reset()
+ .bind(1, dwarfsrc)
+ .step_ok_done();
-static void*
-thread_main_scan_source_file_path (void* arg)
-{
- string dir = string((const char*) arg);
+ ps_upsert_s
+ .reset()
+ .bind(1, buildid)
+ .bind(2, dwarfsrc)
+ .bind(3, srps)
+ .bind(4, sfs.st_mtime)
+ .step_ok_done();
- unsigned rescan_timer = 0;
- sig_atomic_t forced_rescan_count = 0;
- set_metric("thread_timer_max", "file", dir, rescan_s);
- set_metric("thread_tid", "file", dir, tid());
- while (! interrupted)
- {
- set_metric("thread_timer", "file", dir, rescan_timer);
- set_metric("thread_forced_total", "file", dir, forced_rescan_count);
- if (rescan_s && rescan_timer > rescan_s)
- rescan_timer = 0;
- if (sigusr1 != forced_rescan_count)
- {
- forced_rescan_count = sigusr1;
- rescan_timer = 0;
+ inc_metric("found_sourcerefs_total","source","files");
}
- if (rescan_timer == 0)
- try
- {
- set_metric("thread_working", "file", dir, time(NULL));
- inc_metric("thread_work_total", "file", dir);
- scan_source_file_path (dir);
- set_metric("thread_working", "file", dir, 0);
- }
- catch (const sqlite_exception& e)
- {
- obatched(cerr) << e.message << endl;
- }
- sleep (1);
- rescan_timer ++;
}
- return 0;
+ ps_scan_done
+ .reset()
+ .bind(1, rps)
+ .bind(2, st.st_mtime)
+ .bind(3, st.st_size)
+ .step_ok_done();
+
+ if (verbose > 2)
+ obatched(clog) << "recorded buildid=" << buildid << " file=" << rps
+ << " mtime=" << st.st_mtime << " atype="
+ << (executable_p ? "E" : "")
+ << (debuginfo_p ? "D" : "") << endl;
}
-////////////////////////////////////////////////////////////////////////
-
// scan for archive files such as .rpm
static void
-scan_source_archive_path (const string& dir)
+scan_archive_file (const string& rps, const stat_t& st,
+ sqlite_ps& ps_upsert_buildids,
+ sqlite_ps& ps_upsert_files,
+ sqlite_ps& ps_upsert_de,
+ sqlite_ps& ps_upsert_sref,
+ sqlite_ps& ps_upsert_sdef,
+ sqlite_ps& ps_query,
+ sqlite_ps& ps_scan_done,
+ unsigned& fts_cached,
+ unsigned& fts_executable,
+ unsigned& fts_debuginfo,
+ unsigned& fts_sref,
+ unsigned& fts_sdef)
{
- obatched(clog) << "fts/archive traversing " << dir << endl;
+ /* See if we know of it already. */
+ int rc = ps_query
+ .reset()
+ .bind(1, rps)
+ .bind(2, st.st_mtime)
+ .step();
+ ps_query.reset();
+ if (rc == SQLITE_ROW) // i.e., a result, as opposed to DONE (no results)
+ // no need to recheck a file/version we already know
+ // specifically, no need to parse this archive again, since we already have
+ // it as a D or E or S record,
+ // (so is stored with buildid=NULL)
+ {
+ fts_cached ++;
+ return;
+ }
+
+ // intern the archive file name
+ ps_upsert_files
+ .reset()
+ .bind(1, rps)
+ .step_ok_done();
+
+ // extract the archive contents
+ unsigned my_fts_executable = 0, my_fts_debuginfo = 0, my_fts_sref = 0, my_fts_sdef = 0;
+ bool my_fts_sref_complete_p = true;
+ try
+ {
+ string archive_extension;
+ archive_classify (rps, archive_extension,
+ ps_upsert_buildids, ps_upsert_files,
+ ps_upsert_de, ps_upsert_sref, ps_upsert_sdef, // dalt
+ st.st_mtime,
+ my_fts_executable, my_fts_debuginfo, my_fts_sref, my_fts_sdef,
+ my_fts_sref_complete_p);
+ inc_metric ("scanned_total","source",archive_extension + " archive");
+ add_metric("found_debuginfo_total","source",archive_extension + " archive",
+ my_fts_debuginfo);
+ add_metric("found_executable_total","source",archive_extension + " archive",
+ my_fts_executable);
+ add_metric("found_sourcerefs_total","source",archive_extension + " archive",
+ my_fts_sref);
+ }
+ catch (const reportable_exception& e)
+ {
+ e.report(clog);
+ }
+
+ if (verbose > 2)
+ obatched(clog) << "scanned archive=" << rps
+ << " mtime=" << st.st_mtime
+ << " executables=" << my_fts_executable
+ << " debuginfos=" << my_fts_debuginfo
+ << " srefs=" << my_fts_sref
+ << " sdefs=" << my_fts_sdef
+ << endl;
+
+ fts_executable += my_fts_executable;
+ fts_debuginfo += my_fts_debuginfo;
+ fts_sref += my_fts_sref;
+ fts_sdef += my_fts_sdef;
+
+ if (my_fts_sref_complete_p) // leave incomplete?
+ ps_scan_done
+ .reset()
+ .bind(1, rps)
+ .bind(2, st.st_mtime)
+ .bind(3, st.st_size)
+ .step_ok_done();
+}
+
+
+
+////////////////////////////////////////////////////////////////////////
+
+
- sqlite_ps ps_upsert_buildids (db, "rpm-buildid-intern", "insert or ignore into " BUILDIDS "_buildids VALUES (NULL, ?);");
- sqlite_ps ps_upsert_files (db, "rpm-file-intern", "insert or ignore into " BUILDIDS "_files VALUES (NULL, ?);");
- sqlite_ps ps_upsert_de (db, "rpm-de-insert",
+// The thread that consumes file names off of the scanq. We hold
+// the persistent sqlite_ps's at this level and delegate file/archive
+// scanning to other functions.
+static void*
+thread_main_scanner (void* arg)
+{
+ (void) arg;
+
+ // all the prepared statements fit to use, the _f_ set:
+ sqlite_ps ps_f_upsert_buildids (db, "file-buildids-intern", "insert or ignore into " BUILDIDS "_buildids VALUES (NULL, ?);");
+ sqlite_ps ps_f_upsert_files (db, "file-files-intern", "insert or ignore into " BUILDIDS "_files VALUES (NULL, ?);");
+ sqlite_ps ps_f_upsert_de (db, "file-de-upsert",
+ "insert or ignore into " BUILDIDS "_f_de "
+ "(buildid, debuginfo_p, executable_p, file, mtime) "
+ "values ((select id from " BUILDIDS "_buildids where hex = ?),"
+ " ?,?,"
+ " (select id from " BUILDIDS "_files where name = ?), ?);");
+ sqlite_ps ps_f_upsert_s (db, "file-s-upsert",
+ "insert or ignore into " BUILDIDS "_f_s "
+ "(buildid, artifactsrc, file, mtime) "
+ "values ((select id from " BUILDIDS "_buildids where hex = ?),"
+ " (select id from " BUILDIDS "_files where name = ?),"
+ " (select id from " BUILDIDS "_files where name = ?),"
+ " ?);");
+ sqlite_ps ps_f_query (db, "file-negativehit-find",
+ "select 1 from " BUILDIDS "_file_mtime_scanned where sourcetype = 'F' "
+ "and file = (select id from " BUILDIDS "_files where name = ?) and mtime = ?;");
+ sqlite_ps ps_f_scan_done (db, "file-scanned",
+ "insert or ignore into " BUILDIDS "_file_mtime_scanned (sourcetype, file, mtime, size)"
+ "values ('F', (select id from " BUILDIDS "_files where name = ?), ?, ?);");
+
+ // and now for the _r_ set
+ sqlite_ps ps_r_upsert_buildids (db, "rpm-buildid-intern", "insert or ignore into " BUILDIDS "_buildids VALUES (NULL, ?);");
+ sqlite_ps ps_r_upsert_files (db, "rpm-file-intern", "insert or ignore into " BUILDIDS "_files VALUES (NULL, ?);");
+ sqlite_ps ps_r_upsert_de (db, "rpm-de-insert",
"insert or ignore into " BUILDIDS "_r_de (buildid, debuginfo_p, executable_p, file, mtime, content) values ("
"(select id from " BUILDIDS "_buildids where hex = ?), ?, ?, "
"(select id from " BUILDIDS "_files where name = ?), ?, "
"(select id from " BUILDIDS "_files where name = ?));");
- sqlite_ps ps_upsert_sref (db, "rpm-sref-insert",
+ sqlite_ps ps_r_upsert_sref (db, "rpm-sref-insert",
"insert or ignore into " BUILDIDS "_r_sref (buildid, artifactsrc) values ("
"(select id from " BUILDIDS "_buildids where hex = ?), "
"(select id from " BUILDIDS "_files where name = ?));");
- sqlite_ps ps_upsert_sdef (db, "rpm-sdef-insert",
+ sqlite_ps ps_r_upsert_sdef (db, "rpm-sdef-insert",
"insert or ignore into " BUILDIDS "_r_sdef (file, mtime, content) values ("
"(select id from " BUILDIDS "_files where name = ?), ?,"
"(select id from " BUILDIDS "_files where name = ?));");
- sqlite_ps ps_query (db, "rpm-negativehit-query",
+ sqlite_ps ps_r_query (db, "rpm-negativehit-query",
"select 1 from " BUILDIDS "_file_mtime_scanned where "
"sourcetype = 'R' and file = (select id from " BUILDIDS "_files where name = ?) and mtime = ?;");
- sqlite_ps ps_scan_done (db, "rpm-scanned",
+ sqlite_ps ps_r_scan_done (db, "rpm-scanned",
"insert or ignore into " BUILDIDS "_file_mtime_scanned (sourcetype, file, mtime, size)"
"values ('R', (select id from " BUILDIDS "_files where name = ?), ?, ?);");
- char * const dirs[] = { (char*) dir.c_str(), NULL };
- struct timeval tv_start, tv_end;
- gettimeofday (&tv_start, NULL);
- unsigned fts_scanned=0, fts_regex=0, fts_cached=0, fts_debuginfo=0;
- unsigned fts_executable=0, fts_archive = 0, fts_sref=0, fts_sdef=0;
+ unsigned fts_cached = 0, fts_executable = 0, fts_debuginfo = 0, fts_sourcefiles = 0;
+ unsigned fts_sref = 0, fts_sdef = 0;
- FTS *fts = fts_open (dirs,
- (traverse_logical ? FTS_LOGICAL : FTS_PHYSICAL|FTS_XDEV)
- | FTS_NOCHDIR /* multithreaded */,
- NULL);
- if (fts == NULL)
+ add_metric("thread_count", "role", "scan", 1);
+ add_metric("thread_busy", "role", "scan", 1);
+ while (! interrupted)
{
- obatched(cerr) << "cannot fts_open " << dir << endl;
- return;
+ scan_payload p;
+
+ add_metric("thread_busy", "role", "scan", -1);
+ bool gotone = scanq.wait_front(p);
+ add_metric("thread_busy", "role", "scan", 1);
+ if (! gotone) continue; // or break
+ inc_metric("thread_work_total", "role","scan");
+
+ try
+ {
+ bool scan_archive = false;
+ for (auto&& arch : scan_archives)
+ if (string_endswith(p.first, arch.first))
+ scan_archive = true;
+
+ if (scan_archive)
+ scan_archive_file (p.first, p.second,
+ ps_r_upsert_buildids,
+ ps_r_upsert_files,
+ ps_r_upsert_de,
+ ps_r_upsert_sref,
+ ps_r_upsert_sdef,
+ ps_r_query,
+ ps_r_scan_done,
+ fts_cached,
+ fts_executable,
+ fts_debuginfo,
+ fts_sref,
+ fts_sdef);
+
+ if (scan_files) // NB: maybe "else if" ?
+ scan_source_file (p.first, p.second,
+ ps_f_upsert_buildids,
+ ps_f_upsert_files,
+ ps_f_upsert_de,
+ ps_f_upsert_s,
+ ps_f_query,
+ ps_f_scan_done,
+ fts_cached, fts_executable, fts_debuginfo, fts_sourcefiles);
+ }
+ catch (const reportable_exception& e)
+ {
+ e.report(cerr);
+ }
}
- FTSENT *f;
- while ((f = fts_read (fts)) != NULL)
- {
- semaphore_borrower handle_one_file (scan_concurrency_sem);
+ add_metric("thread_busy", "role", "scan", -1);
+ return 0;
+}
- fts_scanned ++;
- if (interrupted)
- break;
- if (verbose > 2)
- obatched(clog) << "fts/archive traversing " << f->fts_path << endl;
- try
- {
- /* Found a file. Convert it to an absolute path, so
- the buildid database does not have relative path
- names that are unresolvable from a subsequent run
- in a different cwd. */
- char *rp = realpath(f->fts_path, NULL);
- if (rp == NULL)
- continue; // ignore dangling symlink or such
- string rps = string(rp);
- free (rp);
-
- bool ri = !regexec (&file_include_regex, rps.c_str(), 0, 0, 0);
- bool rx = !regexec (&file_exclude_regex, rps.c_str(), 0, 0, 0);
- if (!ri || rx)
- {
- if (verbose > 3)
- obatched(clog) << "fts/archive skipped by regex " << (!ri ? "I" : "") << (rx ? "X" : "") << endl;
- fts_regex ++;
- continue;
- }
+// The thread that traverses all the source_paths and enqueues all the
+// matching files into the file/archive scan queue.
+static void
+scan_source_paths()
+{
+ // Turn the source_paths into an fts(3)-compatible char**. Since
+ // source_paths[] does not change after argv processing, the
+ // c_str()'s are safe to keep around awile.
+ vector<const char *> sps;
+ for (auto&& sp: source_paths)
+ sps.push_back(sp.c_str());
+ sps.push_back(NULL);
+
+ FTS *fts = fts_open ((char * const *)sps.data(),
+ (traverse_logical ? FTS_LOGICAL : FTS_PHYSICAL|FTS_XDEV)
+ | FTS_NOCHDIR /* multithreaded */,
+ NULL);
+ if (fts == NULL)
+ throw libc_exception(errno, "cannot fts_open");
+ defer_dtor<FTS*,int> fts_cleanup (fts, fts_close);
- switch (f->fts_info)
- {
- case FTS_D:
- break;
-
- case FTS_DP:
- break;
-
- case FTS_F:
- {
- bool any = false;
- for (auto&& arch : scan_archives)
- if (string_endswith(rps, arch.first))
- any = true;
- if (! any)
- continue;
- fts_archive ++;
-
- /* See if we know of it already. */
- int rc = ps_query
- .reset()
- .bind(1, rps)
- .bind(2, f->fts_statp->st_mtime)
- .step();
- ps_query.reset();
- if (rc == SQLITE_ROW) // i.e., a result, as opposed to DONE (no results)
- // no need to recheck a file/version we already know
- // specifically, no need to parse this archive again, since we already have
- // it as a D or E or S record,
- // (so is stored with buildid=NULL)
- {
- fts_cached ++;
- continue;
- }
-
- // intern the archive file name
- ps_upsert_files
- .reset()
- .bind(1, rps)
- .step_ok_done();
-
- // extract the archive contents
- unsigned my_fts_executable = 0, my_fts_debuginfo = 0, my_fts_sref = 0, my_fts_sdef = 0;
- bool my_fts_sref_complete_p = true;
- try
- {
- string archive_extension;
- archive_classify (rps, archive_extension,
- ps_upsert_buildids, ps_upsert_files,
- ps_upsert_de, ps_upsert_sref, ps_upsert_sdef, // dalt
- f->fts_statp->st_mtime,
- my_fts_executable, my_fts_debuginfo, my_fts_sref, my_fts_sdef,
- my_fts_sref_complete_p);
- inc_metric ("scanned_total","source",archive_extension + " archive");
- add_metric("found_debuginfo_total","source",archive_extension + " archive",
- my_fts_debuginfo);
- add_metric("found_executable_total","source",archive_extension + " archive",
- my_fts_executable);
- add_metric("found_sourcerefs_total","source",archive_extension + " archive",
- my_fts_sref);
- }
- catch (const reportable_exception& e)
- {
- e.report(clog);
- }
-
- if (verbose > 2)
- obatched(clog) << "scanned archive=" << rps
- << " mtime=" << f->fts_statp->st_mtime
- << " executables=" << my_fts_executable
- << " debuginfos=" << my_fts_debuginfo
- << " srefs=" << my_fts_sref
- << " sdefs=" << my_fts_sdef
- << endl;
-
- fts_executable += my_fts_executable;
- fts_debuginfo += my_fts_debuginfo;
- fts_sref += my_fts_sref;
- fts_sdef += my_fts_sdef;
-
- if (my_fts_sref_complete_p) // leave incomplete?
- ps_scan_done
- .reset()
- .bind(1, rps)
- .bind(2, f->fts_statp->st_mtime)
- .bind(3, f->fts_statp->st_size)
- .step_ok_done();
- }
- break;
+ struct timeval tv_start, tv_end;
+ gettimeofday (&tv_start, NULL);
+ unsigned fts_scanned = 0, fts_regex = 0;
- case FTS_ERR:
- case FTS_NS:
- throw libc_exception(f->fts_errno, string("fts/archive traversal ") + string(f->fts_path));
+ FTSENT *f;
+ while ((f = fts_read (fts)) != NULL)
+ {
+ if (interrupted) break;
+
+ fts_scanned ++;
+
+ if (verbose > 2)
+ obatched(clog) << "fts traversing " << f->fts_path << endl;
+
+ /* Found a file. Convert it to an absolute path, so
+ the buildid database does not have relative path
+ names that are unresolvable from a subsequent run
+ in a different cwd. */
+ char *rp = realpath(f->fts_path, NULL);
+ if (rp == NULL)
+ continue; // ignore dangling symlink or such
+ string rps = string(rp);
+ free (rp);
+
+ bool ri = !regexec (&file_include_regex, rps.c_str(), 0, 0, 0);
+ bool rx = !regexec (&file_exclude_regex, rps.c_str(), 0, 0, 0);
+ if (!ri || rx)
+ {
+ if (verbose > 3)
+ obatched(clog) << "fts skipped by regex " << (!ri ? "I" : "") << (rx ? "X" : "") << endl;
+ fts_regex ++;
+ continue;
+ }
- default:
- case FTS_SL: /* ignore symlinks; seen in non-L mode only */
- break;
- }
+ switch (f->fts_info)
+ {
+ case FTS_F:
+ scanq.push_back (make_pair(rps, *f->fts_statp));
+ break;
- if ((verbose && f->fts_info == FTS_DP) ||
- (verbose > 1 && f->fts_info == FTS_F))
- obatched(clog) << "fts/archive traversing " << rps << ", scanned=" << fts_scanned
- << ", regex-skipped=" << fts_regex
- << ", archive=" << fts_archive << ", cached=" << fts_cached << ", debuginfo=" << fts_debuginfo
- << ", executable=" << fts_executable
- << ", sourcerefs=" << fts_sref << ", sourcedefs=" << fts_sdef << endl;
- }
- catch (const reportable_exception& e)
+ case FTS_ERR:
+ case FTS_NS:
+ // report on some types of errors because they may reflect fixable misconfiguration
{
- e.report(clog);
+ auto x = libc_exception(f->fts_errno, string("fts traversal ") + string(f->fts_path));
+ x.report(cerr);
}
- }
- fts_close (fts);
+ break;
+ default:
+ ;
+ /* ignore */
+ }
+ }
gettimeofday (&tv_end, NULL);
double deltas = (tv_end.tv_sec - tv_start.tv_sec) + (tv_end.tv_usec - tv_start.tv_usec)*0.000001;
- obatched(clog) << "fts/archive traversed " << dir << " in " << deltas << "s, scanned=" << fts_scanned
- << ", regex-skipped=" << fts_regex
- << ", archive=" << fts_archive << ", cached=" << fts_cached << ", debuginfo=" << fts_debuginfo
- << ", executable=" << fts_executable
- << ", sourcerefs=" << fts_sref << ", sourcedefs=" << fts_sdef << endl;
+ obatched(clog) << "fts traversed source paths in " << deltas << "s, scanned=" << fts_scanned
+ << ", regex-skipped=" << fts_regex << endl;
}
-
static void*
-thread_main_scan_source_archive_path (void* arg)
+thread_main_fts_source_paths (void* arg)
{
- string dir = string((const char*) arg);
+ (void) arg; // ignore; we operate on global data
unsigned rescan_timer = 0;
sig_atomic_t forced_rescan_count = 0;
- set_metric("thread_timer_max", "archive", dir, rescan_s);
- set_metric("thread_tid", "archive", dir, tid());
+ set_metric("thread_timer_max", "role","traverse", rescan_s);
+ set_metric("thread_tid", "role","traverse", tid());
+ add_metric("thread_count", "role", "traverse", 1);
while (! interrupted)
{
- set_metric("thread_timer", "archive", dir, rescan_timer);
- set_metric("thread_forced_total", "archive", dir, forced_rescan_count);
+ set_metric("thread_timer", "role","traverse", rescan_timer);
+ // set_metric("thread_forced_total", "role","traverse", forced_rescan_count);
if (rescan_s && rescan_timer > rescan_s)
rescan_timer = 0;
if (sigusr1 != forced_rescan_count)
if (rescan_timer == 0)
try
{
- set_metric("thread_working", "archive", dir, time(NULL));
- inc_metric("thread_work_total", "archive", dir);
- scan_source_archive_path (dir);
- set_metric("thread_working", "archive", dir, 0);
+ set_metric("thread_busy", "role","traverse", 1);
+ inc_metric("thread_work_total", "role","traverse");
+ scan_source_paths();
+ set_metric("thread_busy", "role","traverse", 0);
}
- catch (const sqlite_exception& e)
+ catch (const reportable_exception& e)
{
- obatched(cerr) << e.message << endl;
+ e.report(cerr);
}
sleep (1);
rescan_timer ++;
}
+ // wake up any blocked scanning threads so they can check $interrupted and kill themselves
+ scanq.nuke();
+
return 0;
}
+
////////////////////////////////////////////////////////////////////////
static void
sig_atomic_t forced_groom_count = 0;
set_metric("thread_timer_max", "role", "groom", groom_s);
set_metric("thread_tid", "role", "groom", tid());
+ add_metric("thread_count", "role", "groom", 1);
while (! interrupted)
{
set_metric("thread_timer", "role", "groom", groom_timer);
- set_metric("thread_forced_total", "role", "groom", forced_groom_count);
+ // set_metric("thread_forced_total", "role", "groom", forced_groom_count);
if (groom_s && groom_timer > groom_s)
groom_timer = 0;
if (sigusr2 != forced_groom_count)
if (groom_timer == 0)
try
{
- set_metric("thread_working", "role", "groom", time(NULL));
+ set_metric("thread_busy", "role", "groom", 1);
inc_metric("thread_work_total", "role", "groom");
groom ();
- set_metric("thread_working", "role", "groom", 0);
+ set_metric("thread_busy", "role", "groom", 0);
}
catch (const sqlite_exception& e)
{
(void) signal (SIGUSR1, sigusr1_handler); // end-user
(void) signal (SIGUSR2, sigusr2_handler); // end-user
- // do this before any threads start
- scan_concurrency_sem = new semaphore(concurrency);
-
/* Get database ready. */
rc = sqlite3_open_v2 (db_path.c_str(), &db, (SQLITE_OPEN_READWRITE
|SQLITE_OPEN_CREATE
rc = pthread_create (& groom_thread, NULL, thread_main_groom, NULL);
if (rc < 0)
error (0, 0, "warning: cannot spawn thread (%d) to groom database\n", rc);
-
- if (scan_files) for (auto&& it : source_paths)
+
+ if (scan_files || scan_archives.size() > 0)
{
pthread_t pt;
- rc = pthread_create (& pt, NULL, thread_main_scan_source_file_path, (void*) it.c_str());
+ pthread_create (& pt, NULL, thread_main_fts_source_paths, NULL);
if (rc < 0)
- error (0, 0, "warning: cannot spawn thread (%d) to scan files %s\n", rc, it.c_str());
- else
- scanner_threads.push_back(pt);
- }
-
- if (scan_archives.size() > 0)
- for (auto&& it : source_paths)
- {
- pthread_t pt;
- rc = pthread_create (& pt, NULL, thread_main_scan_source_archive_path, (void*) it.c_str());
- if (rc < 0)
- error (0, 0, "warning: cannot spawn thread (%d) to scan archives %s\n", rc, it.c_str());
- else
+ error (0, 0, "warning: cannot spawn thread (%d) to traverse source paths\n", rc);
+ scanner_threads.push_back(pt);
+ for (unsigned i=0; i<concurrency; i++)
+ {
+ pthread_create (& pt, NULL, thread_main_scanner, NULL);
+ if (rc < 0)
+ error (0, 0, "warning: cannot spawn thread (%d) to scan source files / archives\n", rc);
scanner_threads.push_back(pt);
- }
+ }
+ }
/* Trivial main loop! */
set_metric("ready", 1);
for (auto&& it : scanner_threads)
pthread_join (it, NULL);
pthread_join (groom_thread, NULL);
-
+
/* Stop all the web service threads. */
if (d4) MHD_stop_daemon (d4);
if (d6) MHD_stop_daemon (d6);
/* With all threads known dead, we can clean up the global resources. */
- delete scan_concurrency_sem;
rc = sqlite3_exec (db, DEBUGINFOD_SQLITE_CLEANUP_DDL, NULL, NULL, NULL);
if (rc != SQLITE_OK)
{