{ "fdcache-mbs", ARGP_KEY_FDCACHE_MBS, "MB", 0, "Maximum total size of archive file fdcache.", 0 },
#define ARGP_KEY_FDCACHE_PREFETCH 0x1003
{ "fdcache-prefetch", ARGP_KEY_FDCACHE_PREFETCH, "NUM", 0, "Number of archive files to prefetch into fdcache.", 0 },
+#define ARGP_KEY_FDCACHE_MINTMP 0x1004
+ { "fdcache-mintmp", ARGP_KEY_FDCACHE_MINTMP, "NUM", 0, "Minimum free space% on tmpdir.", 0 },
{ NULL, 0, NULL, 0, NULL, 0 }
};
static long fdcache_fds;
static long fdcache_mbs;
static long fdcache_prefetch;
+static long fdcache_mintmp;
static string tmpdir;
-static void set_metric(const string& key, int64_t value);
+static void set_metric(const string& key, double value);
// static void inc_metric(const string& key);
static void set_metric(const string& metric,
const string& lname, const string& lvalue,
- int64_t value);
+ double value);
static void inc_metric(const string& metric,
const string& lname, const string& lvalue);
static void add_metric(const string& metric,
const string& lname, const string& lvalue,
- int64_t value);
-// static void add_metric(const string& metric, int64_t value);
+ double value);
+// static void add_metric(const string& metric, double value);
class tmp_inc_metric { // a RAII style wrapper for exception-safe scoped increment & decrement
string m, n, v;
double deltas = (ts_end.tv_sec - ts_start.tv_sec)
+ (ts_end.tv_nsec - ts_start.tv_nsec)/1.e9;
- add_metric (m + "_milliseconds_sum", n, v, (deltas*1000));
+ add_metric (m + "_milliseconds_sum", n, v, (deltas*1000.0));
inc_metric (m + "_milliseconds_count", n, v);
}
};
case ARGP_KEY_FDCACHE_PREFETCH:
fdcache_prefetch = atol (arg);
break;
+ case ARGP_KEY_FDCACHE_MINTMP:
+ fdcache_mintmp = atol (arg);
+ break;
case ARGP_KEY_ARG:
source_paths.insert(string(arg));
break;
{
sqlite_exception(int rc, const string& msg):
reportable_exception(string("sqlite3 error: ") + msg + ": " + string(sqlite3_errstr(rc) ?: "?")) {
- inc_metric("error_count","sqlite3",sqlite3_errstr(rc));
+ inc_metric("error_count","sqlite3",sqlite3_errstr(rc));
}
};
}
archive_exception(struct archive* a, const string& msg):
reportable_exception(string("libarchive error: ") + msg + ": " + string(archive_error_string(a) ?: "?")) {
- inc_metric("error_count","libarchive",msg);
+ inc_metric("error_count","libarchive",msg + ": " + string(archive_error_string(a) ?: "?"));
}
};
}
+// Estimate available free space for a given filesystem via statfs(2).
+// Return true if the free fraction is known to be smaller than the
+// given minimum percentage. Also update a related metric.
+bool statfs_free_enough_p(const string& path, const string& label, long minfree = 0)
+{
+ struct statfs sfs;
+ int rc = statfs(path.c_str(), &sfs);
+ if (rc == 0)
+ {
+ double s = (double) sfs.f_bavail / (double) sfs.f_blocks;
+ set_metric("filesys_free_ratio","purpose",label, s);
+ return ((s * 100.0) < minfree);
+ }
+ return false;
+}
+
+
// A map-like class that owns a cache of file descriptors (indexed by
// file / content names).
set_metrics();
// NB: we age the cache at lookup time too
- if (front_p)
+ if (statfs_free_enough_p(tmpdir, "tmpdir", fdcache_mintmp))
+ {
+ inc_metric("fdcache_op_count","op","emerg-flush");
+ obatched(clog) << "fdcache emergency flush for filling tmpdir" << endl;
+ this->limit(0, 0); // emergency flush
+ }
+ else if (front_p)
this->limit(max_fds, max_mbs); // age cache if required
}
}
}
- if (fd >= 0)
+ if (statfs_free_enough_p(tmpdir, "tmpdir", fdcache_mintmp))
+ {
+ inc_metric("fdcache_op_count","op","emerg-flush");
+ obatched(clog) << "fdcache emergency flush for filling tmpdir";
+ this->limit(0, 0); // emergency flush
+ }
+ else if (fd >= 0)
this->limit(max_fds, max_mbs); // age cache if required
return fd;
}
}
+
void limit(long maxfds, long maxmbs, bool metrics_p = true)
{
if (verbose > 3 && (this->max_fds != maxfds || this->max_mbs != maxmbs))
if (metrics_p) set_metrics();
}
+
~libarchive_fdcache()
{
// unlink any fdcache entries in $TMPDIR
- // don't update metrics; those globals may be already destroyed
+ // don't update metrics; those globals may be already destroyed
limit(0, 0, false);
}
};
// NB: don't unlink (tmppath), as fdcache will take charge of it.
// NB: this can take many uninterruptible seconds for a huge file
- rc = archive_read_data_into_fd (a, fd);
+ rc = archive_read_data_into_fd (a, fd);
if (rc != ARCHIVE_OK) // e.g. ENOSPC!
{
close (fd);
// Report but swallow libc etc. errors here; let the caller
// iterate to other matches of the content.
}
-
+
return 0;
}
// If invoked from the scanner threads, use the scanners' read-write
// connection. Otherwise use the web query threads' read-only connection.
sqlite3 *thisdb = (conn == 0) ? db : dbq;
-
+
sqlite_ps *pp = 0;
if (atype_code == "D")
////////////////////////////////////////////////////////////////////////
-static map<string,int64_t> metrics; // arbitrary data for /metrics query
+static map<string,double> metrics; // arbitrary data for /metrics query
// NB: store int64_t since all our metrics are integers; prometheus accepts double
static mutex metrics_lock;
// NB: these objects get released during the process exit via global dtors
// add prometheus-format metric name + label tuple (if any) + value
static void
-set_metric(const string& metric, int64_t value)
+set_metric(const string& metric, double value)
{
unique_lock<mutex> lock(metrics_lock);
metrics[metric] = value;
static void
set_metric(const string& metric,
const string& lname, const string& lvalue,
- int64_t value)
+ double value)
{
string key = (metric + "{" + metric_label(lname, lvalue) + "}");
unique_lock<mutex> lock(metrics_lock);
static void
add_metric(const string& metric,
const string& lname, const string& lvalue,
- int64_t value)
+ double value)
{
string key = (metric + "{" + metric_label(lname, lvalue) + "}");
unique_lock<mutex> lock(metrics_lock);
#if 0
static void
add_metric(const string& metric,
- int64_t value)
+ double value)
{
unique_lock<mutex> lock(metrics_lock);
metrics[metric] += value;
{
unique_lock<mutex> lock(metrics_lock);
for (auto&& i : metrics)
- o << i.first << " " << i.second << endl;
+ o << i.first
+ << " "
+ << std::setprecision(std::numeric_limits<double>::digits10 + 1)
+ << i.second
+ << endl;
}
const string& os = o.str();
MHD_Response* r = MHD_create_response_from_buffer (os.size(),
e.report(cerr);
}
+ if (fts_cached || fts_executable || fts_debuginfo || fts_sourcefiles || fts_sref || fts_sdef)
+ {} // NB: not just if a successful scan - we might have encountered -ENOSPC & failed
+ (void) statfs_free_enough_p(db_path, "database"); // report sqlite filesystem size
+ (void) statfs_free_enough_p(tmpdir, "tmpdir"); // this too, in case of fdcache/tmpfile usage
+
// finished a scanning step -- not a "loop", because we just
// consume the traversal loop's work, whenever
inc_metric("thread_work_total","role","scan");
}
+
add_metric("thread_busy", "role", "scan", -1);
return 0;
}
{
if (interrupted) break;
- if (sigusr2 != forced_groom_count) // stop early if groom triggered
+ if (sigusr2 != forced_groom_count) // stop early if groom triggered
{
scanq.clear(); // clear previously issued work for scanner threads
break;
}
-
+
fts_scanned ++;
if (verbose > 2)
continue; // ignore dangling symlink or such
string rps = string(rp);
free (rp);
-
+
bool ri = !regexec (&file_include_regex, rps.c_str(), 0, 0, 0);
bool rx = !regexec (&file_exclude_regex, rps.c_str(), 0, 0, 0);
if (!ri || rx)
case FTS_D: // ignore
inc_metric("traversed_total","type","directory");
break;
-
+
default: // ignore
inc_metric("traversed_total","type","other");
break;
if (interrupted) break;
if (sigusr1 != forced_rescan_count) // stop early if scan triggered
break;
-
+
int rc = ps_query.step();
if (rc == SQLITE_DONE) break;
if (rc != SQLITE_ROW)
database_stats_report();
+ (void) statfs_free_enough_p(db_path, "database"); // report sqlite filesystem size
+
sqlite3_db_release_memory(db); // shrink the process if possible
sqlite3_db_release_memory(dbq); // ... for both connections
fdcache_mbs = 1024; // 1 gigabyte
else
fdcache_mbs = sfs.f_bavail * sfs.f_bsize / 1024 / 1024 / 4; // 25% of free space
+ fdcache_mintmp = 25; // emergency flush at 25% remaining (75% full)
fdcache_prefetch = 64; // guesstimate storage is this much less costly than re-decompression
fdcache_fds = (concurrency + fdcache_prefetch) * 2;
"cannot open %s, consider deleting database: %s", db_path.c_str(), sqlite3_errmsg(dbq));
}
-
+
obatched(clog) << "opened database " << db_path << endl;
obatched(clog) << "sqlite version " << sqlite3_version << endl;
obatched(clog) << "fdcache mbs " << fdcache_mbs << endl;
obatched(clog) << "fdcache prefetch " << fdcache_prefetch << endl;
obatched(clog) << "fdcache tmpdir " << tmpdir << endl;
+ obatched(clog) << "fdcache tmpdir min% " << fdcache_mintmp << endl;
obatched(clog) << "groom time " << groom_s << endl;
if (scan_archives.size()>0)
{
pthread_t pt;
rc = pthread_create (& pt, NULL, thread_main_groom, NULL);
- if (rc < 0)
- error (0, 0, "warning: cannot spawn thread (%d) to groom database\n", rc);
+ if (rc)
+ error (EXIT_FAILURE, rc, "cannot spawn thread to groom database\n");
else
all_threads.push_back(pt);
if (scan_files || scan_archives.size() > 0)
{
- pthread_create (& pt, NULL, thread_main_fts_source_paths, NULL);
- if (rc < 0)
- error (0, 0, "warning: cannot spawn thread (%d) to traverse source paths\n", rc);
+ rc = pthread_create (& pt, NULL, thread_main_fts_source_paths, NULL);
+ if (rc)
+ error (EXIT_FAILURE, rc, "cannot spawn thread to traverse source paths\n");
all_threads.push_back(pt);
for (unsigned i=0; i<concurrency; i++)
{
- pthread_create (& pt, NULL, thread_main_scanner, NULL);
- if (rc < 0)
- error (0, 0, "warning: cannot spawn thread (%d) to scan source files / archives\n", rc);
+ rc = pthread_create (& pt, NULL, thread_main_scanner, NULL);
+ if (rc)
+ error (EXIT_FAILURE, rc, "cannot spawn thread to scan source files / archives\n");
all_threads.push_back(pt);
}
}
(void) regfree (& file_exclude_regex);
sqlite3 *database = db;
- sqlite3 *databaseq = dbq;
+ sqlite3 *databaseq = dbq;
db = dbq = 0; // for signal_handler not to freak
(void) sqlite3_close (databaseq);
(void) sqlite3_close (database);