raise Exception("Failed to detach BPF from kprobe")
del open_kprobes[ev_name]
- @staticmethod
- def trace_open(nonblocking=False):
+ def _trace_autoload(self):
+ # Cater to one-liner case where attach_kprobe is omitted and C function
+ # name matches that of the kprobe.
+ if len(open_kprobes) == 0:
+ fns = self.load_funcs(BPF.KPROBE)
+ for fn in fns:
+ if fn.name.startswith("kprobe__"):
+ self.attach_kprobe(event=fn.name[8:], fn_name=fn.name)
+ elif fn.name.startswith("kretprobe__"):
+ self.attach_kprobe(event=fn.name[11:], fn_name=fn.name)
+
+ def trace_open(self, nonblocking=False):
"""trace_open(nonblocking=False)
Open the trace_pipe if not already open
"""
-
+ self._trace_autoload()
global tracefile
if not tracefile:
tracefile = open("%s/trace_pipe" % TRACEFS)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return tracefile
- @staticmethod
- def trace_fields(nonblocking=False):
+ def trace_fields(self, nonblocking=False):
"""trace_fields(nonblocking=False)
Read from the kernel debug trace pipe and return a tuple of the
line was read (nonblocking=True)
"""
while True:
- line = BPF.trace_readline(nonblocking)
+ line = self.trace_readline(nonblocking)
if not line and nonblocking: return (None,) * 6
# don't print messages related to lost events
if line.startswith("CPU:"): continue
msg = line[ts_end + 4:]
return (task, int(pid), int(cpu), flags, float(ts), msg)
- @staticmethod
- def trace_readline(nonblocking=False):
+ def trace_readline(self, nonblocking=False):
"""trace_readline(nonblocking=False)
Read from the kernel debug trace pipe and return one line
If nonblocking is False, this will block until ctrl-C is pressed.
"""
- trace = BPF.trace_open(nonblocking)
+ trace = self.trace_open(nonblocking)
line = None
try:
example: trace_print(fmt="pid {1}, msg = {5}")
"""
- # Cater to one-liner case where attach_kprobe is omitted and C function
- # name matches that of the kprobe.
- if len(open_kprobes) == 0:
- fns = self.load_funcs(BPF.KPROBE)
- for fn in fns:
- if fn.name.startswith("kprobe__"):
- self.attach_kprobe(event=fn.name[8:], fn_name=fn.name)
- elif fn.name.startswith("kretprobe__"):
- self.attach_kprobe(event=fn.name[11:], fn_name=fn.name)
-
while True:
if fmt:
fields = self.trace_fields(nonblocking=False)