# load BPF program
b = BPF(src_file = "bitehist.c")
-BPF.attach_kprobe(b.load_func("do_request", BPF.KPROBE), "blk_start_request")
+b.attach_kprobe(event="blk_start_request", fn_name="do_request")
dist = b.get_table("dist")
dist_max = 64
# load BPF program
b = BPF(src_file="disksnoop.c")
-BPF.attach_kprobe(b.load_func("do_request", BPF.KPROBE), "blk_start_request")
-BPF.attach_kprobe(b.load_func("do_completion", BPF.KPROBE), "blk_update_request")
+b.attach_kprobe(event="blk_start_request", fn_name="do_request")
+b.attach_kprobe(event="blk_update_request", fn_name="do_completion")
# header
print("%-18s %-2s %-7s %8s" % ("TIME(s)", "T", "BYTES", "LAT(ms)"))
};
"""
b = BPF(text=prog)
-fn = b.load_func("hello", BPF.KPROBE)
-BPF.attach_kprobe(fn, "sys_clone")
+b.attach_kprobe(event="sys_clone", fn_name="hello")
try:
call(["cat", "/sys/kernel/debug/tracing/trace_pipe"])
except KeyboardInterrupt:
from time import sleep
b = BPF(src_file="task_switch.c")
-fn = b.load_func("count_sched", BPF.KPROBE)
stats = b.get_table("stats")
-BPF.attach_kprobe(fn, "finish_task_switch")
+b.attach_kprobe(event="finish_task_switch", fn_name="count_sched")
# generate many schedule events
for i in range(0, 100): sleep(0.01)
# load BPF program
b = BPF(src_file = "vfsreadlat.c")
-BPF.attach_kprobe(b.load_func("do_entry", BPF.KPROBE), "vfs_read")
-BPF.attach_kretprobe(b.load_func("do_return", BPF.KPROBE), "vfs_read")
+b.attach_kprobe(event="vfs_read", fn_name="do_entry")
+b.attach_kretprobe(event="vfs_read", fn_name="do_return")
dist = b.get_table("dist")
dist_max = 64
% (dev, errstr))
fn.sock = sock
- @staticmethod
- def attach_kprobe(fn, event, pid=0, cpu=-1, group_fd=-1):
- if not isinstance(fn, BPF.Function):
- raise Exception("arg 1 must be of type BPF.Function")
+ def attach_kprobe(self, event="", fn_name="", pid=0, cpu=-1, group_fd=-1):
+ fn = self.load_func(fn_name, BPF.KPROBE)
ev_name = "p_" + event.replace("+", "_")
desc = "p:kprobes/%s %s" % (ev_name, event)
res = lib.bpf_attach_kprobe(fn.fd, ev_name.encode("ascii"),
raise Exception("Failed to detach BPF from kprobe")
del open_kprobes[ev_name]
- @staticmethod
- def attach_kretprobe(fn, event, pid=-1, cpu=0, group_fd=-1):
- if not isinstance(fn, BPF.Function):
- raise Exception("arg 1 must be of type BPF.Function")
+ def attach_kretprobe(self, event="", fn_name="", pid=-1, cpu=0, group_fd=-1):
+ fn = self.load_func(fn_name, BPF.KPROBE)
ev_name = "r_" + event.replace("+", "_")
desc = "r:kprobes/%s %s" % (ev_name, event)
res = lib.bpf_attach_kprobe(fn.fd, ev_name.encode("ascii"),
class TestKprobe(TestCase):
def setUp(self):
b = BPF(arg1, arg2, debug=0)
- fn1 = b.load_func("sys_wr", BPF.KPROBE)
- fn2 = b.load_func("sys_rd", BPF.KPROBE)
- fn3 = b.load_func("sys_bpf", BPF.KPROBE)
self.stats = b.get_table("stats", Key, Leaf)
- BPF.attach_kprobe(fn1, "sys_write", 0, -1)
- BPF.attach_kprobe(fn2, "sys_read", 0, -1)
- BPF.attach_kprobe(fn2, "htab_map_get_next_key", 0, -1)
+ b.attach_kprobe(event="sys_write", fn_name="sys_wr", pid=0, cpu=-1)
+ b.attach_kprobe(event="sys_read", fn_name="sys_rd", pid=0, cpu=-1)
+ b.attach_kprobe(event="htab_map_get_next_key", fn_name="sys_rd", pid=0, cpu=-1)
def test_trace1(self):
with open("/dev/null", "a") as f:
class TestTracingEvent(TestCase):
def setUp(self):
b = BPF(text=text, debug=0)
- fn = b.load_func("count_sched", BPF.KPROBE)
self.stats = b.get_table("stats")
- BPF.attach_kprobe(fn, "schedule+50", 0, -1)
+ b.attach_kprobe(event="schedule+50", fn_name="count_sched", pid=0, cpu=-1)
def test_sched1(self):
for i in range(0, 100):
class TestBlkRequest(TestCase):
def setUp(self):
b = BPF(arg1, arg2, debug=0)
- fn1 = b.load_func("probe_blk_start_request", BPF.KPROBE)
- fn2 = b.load_func("probe_blk_update_request", BPF.KPROBE)
self.latency = b.get_table("latency", c_uint, c_ulong)
- BPF.attach_kprobe(fn1, "blk_start_request", -1, 0)
- BPF.attach_kprobe(fn2, "blk_update_request", -1, 0)
+ b.attach_kprobe(event="blk_start_request",
+ fn_name="probe_blk_start_request", pid=-1, cpu=0)
+ b.attach_kprobe(event="blk_update_request",
+ fn_name="probe_blk_update_request", pid=-1, cpu=0)
def test_blk1(self):
import subprocess
# load BPF program
b = BPF(src_file = "pidpersec.c")
-BPF.attach_kprobe(b.load_func("do_count", BPF.KPROBE), "sched_fork")
+b.attach_kprobe(event="sched_fork", fn_name="do_count")
stats = b.get_table("stats")
# stat indexes
return 0;
};
""")
-BPF.attach_kprobe(b.load_func("do_sync", BPF.KPROBE), "sys_sync")
+b.attach_kprobe(event="sys_sync")
# header
print("%-18s %s" % ("TIME(s)", "CALL"))
# load BPF program
b = BPF(src_file = "vfscount.c")
-fn = b.load_func("do_count", BPF.KPROBE)
-BPF.attach_kprobe(fn, "vfs_read")
-BPF.attach_kprobe(fn, "vfs_write")
-BPF.attach_kprobe(fn, "vfs_fsync")
-BPF.attach_kprobe(fn, "vfs_open")
-BPF.attach_kprobe(fn, "vfs_create")
+b.attach_kprobe(event="vfs_read", fn_name="do_count")
+b.attach_kprobe(event="vfs_write", fn_name="do_count")
+b.attach_kprobe(event="vfs_fsync", fn_name="do_count")
+b.attach_kprobe(event="vfs_open", fn_name="do_count")
+b.attach_kprobe(event="vfs_create", fn_name="do_count")
counts = b.get_table("counts")
# header
# load BPF program
b = BPF(src_file = "vfsstat.c")
-BPF.attach_kprobe(b.load_func("do_read", BPF.KPROBE), "vfs_read")
-BPF.attach_kprobe(b.load_func("do_write", BPF.KPROBE), "vfs_write")
-BPF.attach_kprobe(b.load_func("do_fsync", BPF.KPROBE), "vfs_fsync")
-BPF.attach_kprobe(b.load_func("do_open", BPF.KPROBE), "vfs_open")
-BPF.attach_kprobe(b.load_func("do_create", BPF.KPROBE), "vfs_create")
+b.attach_kprobe(event="vfs_read", fn_name="do_read")
+b.attach_kprobe(event="vfs_write", fn_name="do_write")
+b.attach_kprobe(event="vfs_fsync", fn_name="do_fsync")
+b.attach_kprobe(event="vfs_open", fn_name="do_open")
+b.attach_kprobe(event="vfs_create", fn_name="do_create")
stats = b.get_table("stats")
# stat column labels and indexes