if _num_open_probes + num_new_probes > _probe_limit:
raise Exception("Number of open probes would exceed global quota")
- def _add_kprobe_fd(self, name, fd):
+ def _add_kprobe_fd(self, ev_name, fn_name, fd):
global _num_open_probes
- self.kprobe_fds[name] = fd
+ if ev_name not in self.kprobe_fds:
+ self.kprobe_fds[ev_name] = {}
+ self.kprobe_fds[ev_name][fn_name] = fd
_num_open_probes += 1
- def _del_kprobe_fd(self, name):
+ def _del_kprobe_fd(self, ev_name, fn_name):
global _num_open_probes
- del self.kprobe_fds[name]
+ del self.kprobe_fds[ev_name][fn_name]
_num_open_probes -= 1
def _add_uprobe_fd(self, name, fd):
if fd < 0:
raise Exception("Failed to attach BPF program %s to kprobe %s" %
(fn_name, event))
- self._add_kprobe_fd(ev_name, fd)
+ self._add_kprobe_fd(ev_name, fn_name, fd)
return self
def attach_kretprobe(self, event=b"", fn_name=b"", event_re=b"", maxactive=0):
if fd < 0:
raise Exception("Failed to attach BPF program %s to kretprobe %s" %
(fn_name, event))
- self._add_kprobe_fd(ev_name, fd)
+ self._add_kprobe_fd(ev_name, fn_name, fd)
return self
def detach_kprobe_event(self, ev_name):
+ ev_name = _assert_is_bytes(ev_name)
+ fn_names = list(self.kprobe_fds[ev_name].keys())
+ for fn_name in fn_names:
+ self.detach_kprobe_event_by_fn(ev_name, fn_name)
+
+ def detach_kprobe_event_by_fn(self, ev_name, fn_name):
+ ev_name = _assert_is_bytes(ev_name)
+ fn_name = _assert_is_bytes(fn_name)
if ev_name not in self.kprobe_fds:
raise Exception("Kprobe %s is not attached" % ev_name)
- res = lib.bpf_close_perf_event_fd(self.kprobe_fds[ev_name])
+ res = lib.bpf_close_perf_event_fd(self.kprobe_fds[ev_name][fn_name])
if res < 0:
raise Exception("Failed to close kprobe FD")
- res = lib.bpf_detach_kprobe(ev_name)
- if res < 0:
- raise Exception("Failed to detach BPF from kprobe")
- self._del_kprobe_fd(ev_name)
+ self._del_kprobe_fd(ev_name, fn_name)
+ if len(self.kprobe_fds[ev_name]) == 0:
+ res = lib.bpf_detach_kprobe(ev_name)
+ if res < 0:
+ raise Exception("Failed to detach BPF from kprobe")
- def detach_kprobe(self, event):
+ def detach_kprobe(self, event, fn_name=None):
event = _assert_is_bytes(event)
ev_name = b"p_" + event.replace(b"+", b"_").replace(b".", b"_")
- self.detach_kprobe_event(ev_name)
+ if fn_name:
+ fn_name = _assert_is_bytes(fn_name)
+ self.detach_kprobe_event_by_fn(ev_name, fn_name)
+ else:
+ self.detach_kprobe_event(ev_name)
+
- def detach_kretprobe(self, event):
+ def detach_kretprobe(self, event, fn_name=None):
event = _assert_is_bytes(event)
ev_name = b"r_" + event.replace(b"+", b"_").replace(b".", b"_")
- self.detach_kprobe_event(ev_name)
+ if fn_name:
+ fn_name = _assert_is_bytes(fn_name)
+ self.detach_kprobe_event_by_fn(ev_name, fn_name)
+ else:
+ self.detach_kprobe_event(ev_name)
@staticmethod
def attach_xdp(dev, fn, flags=0):