ksym_addrs = []
ksym_names = []
ksym_loaded = 0
+_kprobe_limit = 1000
@atexit.register
def cleanup_kprobes():
if tracefile:
tracefile.close()
+
+def _check_probe_quota(num_new_probes):
+ if len(open_kprobes) + len(open_uprobes) + num_new_probes > _kprobe_limit:
+ raise Exception("Number of open probes would exceed quota")
+
+
class BPF(object):
SOCKET_FILTER = 1
KPROBE = 2
lines = p.communicate()[0].decode().split()
with open("%s/../kprobes/blacklist" % TRACEFS) as f:
blacklist = [line.split()[1] for line in f.readlines()]
- return [line.rstrip() for line in lines if
+ fns = [line.rstrip() for line in lines if
(line != "\n" and line not in blacklist)]
+ _check_probe_quota(len(fns))
+ return fns
def attach_kprobe(self, event="", fn_name="", event_re="",
pid=-1, cpu=0, group_fd=-1):
pass
return
+ _check_probe_quota(1)
fn = self.load_func(fn_name, BPF.KPROBE)
ev_name = "p_" + event.replace("+", "_").replace(".", "_")
desc = "p:kprobes/%s %s" % (ev_name, event)
pass
return
+ _check_probe_quota(1)
fn = self.load_func(fn_name, BPF.KPROBE)
ev_name = "r_" + event.replace("+", "_").replace(".", "_")
desc = "r:kprobes/%s %s" % (ev_name, event)
(path, addr) = BPF._check_path_symbol(name, sym, addr)
+ _check_probe_quota(1)
fn = self.load_func(fn_name, BPF.KPROBE)
ev_name = "p_%s_0x%x" % (self._probe_repl.sub("_", path), addr)
desc = "p:uprobes/%s %s:0x%x" % (ev_name, path, addr)
(path, addr) = BPF._check_path_symbol(name, sym, addr)
+ _check_probe_quota(1)
fn = self.load_func(fn_name, BPF.KPROBE)
ev_name = "r_%s_0x%x" % (self._probe_repl.sub("_", path), addr)
desc = "r:uprobes/%s %s:0x%x" % (ev_name, path, addr)
open_cnt = self.b.num_open_kprobes()
self.assertEqual(actual_cnt, open_cnt)
+
+class TestProbeQuota(TestCase):
+ def setUp(self):
+ self.b = BPF(text="""int count(void *ctx) { return 0; }""")
+
+ def test_probe_quota(self):
+ with self.assertRaises(Exception):
+ self.b.attach_kprobe(event_re=".*", fn_name="count")
+
+
if __name__ == "__main__":
main()