import fcntl
import json
import os
+from subprocess import Popen, PIPE
import sys
basestring = (unicode if sys.version_info[0] < 3 else str)
% (dev, errstr))
fn.sock = sock
- def attach_kprobe(self, event="", fn_name="", pid=0, cpu=-1, group_fd=-1):
+ @staticmethod
+ def _get_kprobe_functions(event_re):
+ p = Popen(["awk", "$1 ~ /%s/ { print $1 }" % event_re,
+ "%s/available_filter_functions" % TRACEFS], stdout=PIPE)
+ lines = p.communicate()[0].decode().split()
+ return [line.rstrip() for line in lines if line != "\n"]
+
+ def attach_kprobe(self, event="", fn_name="", event_re="",
+ pid=0, cpu=-1, group_fd=-1):
+
+ # allow the caller to glob multiple functions together
+ if event_re:
+ for line in BPF._get_kprobe_functions(event_re):
+ self.attach_kprobe(event=line, fn_name=fn_name, pid=pid,
+ cpu=cpu, group_fd=group_fd)
+ return
+
fn = self.load_func(fn_name, BPF.KPROBE)
ev_name = "p_" + event.replace("+", "_")
desc = "p:kprobes/%s %s" % (ev_name, event)
raise Exception("Failed to detach BPF from kprobe")
del open_kprobes[ev_name]
- def attach_kretprobe(self, event="", fn_name="", pid=-1, cpu=0, group_fd=-1):
+ def attach_kretprobe(self, event="", fn_name="", event_re="",
+ pid=-1, cpu=0, group_fd=-1):
+
+ # allow the caller to glob multiple functions together
+ if event_re:
+ for line in BPF._get_kprobe_functions(event_re):
+ self.attach_kretprobe(event=line, fn_name=fn_name, pid=pid,
+ cpu=cpu, group_fd=group_fd)
+ return
+
fn = self.load_func(fn_name, BPF.KPROBE)
ev_name = "r_" + event.replace("+", "_")
desc = "r:kprobes/%s %s" % (ev_name, event)
"""
line = BPF.trace_readline(nonblocking)
if line:
+ # don't print messages related to lost events
+ if line.startswith("CPU:"): return
task = line[:16].lstrip()
line = line[17:]
ts_end = line.find(":")
while True:
if fmt:
fields = BPF.trace_readline_fields(nonblocking=False)
+ if not fields: continue
line = fmt.format(*fields)
else:
line = BPF.trace_readline(nonblocking=False)
--- /dev/null
+#!/usr/bin/env python
+# Copyright (c) PLUMgrid, Inc.
+# Licensed under the Apache License, Version 2.0 (the "License")
+
+from bpf import BPF
+import os
+from socket import socket, AF_INET, SOCK_DGRAM
+import sys
+from unittest import main, TestCase
+
+class TestKprobeRgx(TestCase):
+ def setUp(self):
+ self.b = BPF(text="""
+ typedef struct { int idx; } Key;
+ typedef struct { u64 val; } Val;
+ BPF_TABLE("array", Key, Val, stats, 3);
+ int hello(void *ctx) {
+ stats.lookup_or_init(&(Key){1}, &(Val){0})->val++;
+ return 0;
+ }
+ int goodbye(void *ctx) {
+ stats.lookup_or_init(&(Key){2}, &(Val){0})->val++;
+ return 0;
+ }
+ """)
+ self.b.attach_kprobe(event_re="^SyS_send.*", fn_name="hello",
+ pid=0, cpu=-1)
+ self.b.attach_kretprobe(event_re="^SyS_send.*", fn_name="goodbye",
+ pid=1, cpu=-1)
+
+ def test_send1(self):
+ udp = socket(AF_INET, SOCK_DGRAM)
+ udp.sendto(b"a" * 10, ("127.0.0.1", 5000))
+ udp.close()
+ k1 = self.b["stats"].Key(1)
+ k2 = self.b["stats"].Key(2)
+ self.assertEqual(self.b["stats"][k1].val, self.b["stats"][k2].val)
+
+if __name__ == "__main__":
+ main()