from time import sleep
import distutils.version
import os
+import subprocess
def kernel_version_ge(major, minor):
# True if running kernel is >= X.Y
total_switches += v.value
self.assertNotEqual(0, total_switches)
+@unittest.skipUnless(kernel_version_ge(4,7), "requires kernel >= 4.7")
+class TestTracepointDataLoc(unittest.TestCase):
+ def test_tracepoint_data_loc(self):
+ text = """
+ struct value_t {
+ char filename[64];
+ };
+ BPF_HASH(execs, u32, struct value_t);
+ TRACEPOINT_PROBE(sched, sched_process_exec) {
+ struct value_t val = {0};
+ char fn[64];
+ u32 pid = args->pid;
+ struct value_t *existing = execs.lookup_or_init(&pid, &val);
+ TP_DATA_LOC_READ_CONST(fn, filename, 64);
+ __builtin_memcpy(existing->filename, fn, 64);
+ return 0;
+ }
+ """
+ b = bcc.BPF(text=text)
+ subprocess.check_output(["/bin/ls"])
+ sleep(1)
+ found = False
+ for k, v in b["execs"].items():
+ found = "ls" in v.filename
+ self.assertTrue(found, "'ls' was not found in map")
+
if __name__ == "__main__":
unittest.main()