from unittest import main, TestCase
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
+import ctypes as ct
import os
import signal
int main() {
char s[100];
- int i, a = 20, b = 40;
+ int i, a = 200, b = 40;
for (i = 0; i < 100; i++) s[i] = (i & 7) + (i & 6);
while (1) {
FOLLY_SDT(test, probe_point_1, s[7], b);
FOLLY_SDT(test, probe_point_3, a, b);
- sleep(3);
- a++; b++;
FOLLY_SDT(test, probe_point_1, s[4], a);
FOLLY_SDT(test, probe_point_2, 5, s[10]);
FOLLY_SDT(test, probe_point_3, s[4], s[7]);
+ sleep(1);
}
return 1;
}
# BPF program
self.bpf_text = """
#include <uapi/linux/ptrace.h>
+
+struct probe_result_t1 {
+ char v1;
+ int v2;
+};
+
+struct probe_result_t2 {
+ int v1;
+ char v2;
+};
+
+struct probe_result_t3 {
+ int v1;
+ int v2;
+};
+
+BPF_PERF_OUTPUT(event1);
+BPF_PERF_OUTPUT(event2);
+BPF_PERF_OUTPUT(event3);
int do_trace(struct pt_regs *ctx) {
+ struct probe_result_t1 result = {};
+ bpf_usdt_readarg(1, ctx, &result.v1);
+ bpf_usdt_readarg(2, ctx, &result.v2);
+ event1.perf_submit(ctx, &result, sizeof(result));
return 0;
};
int do_trace2(struct pt_regs *ctx) {
+ struct probe_result_t2 result = {};
+ bpf_usdt_readarg(1, ctx, &result.v1);
+ bpf_usdt_readarg(2, ctx, &result.v2);
+ event2.perf_submit(ctx, &result, sizeof(result));
return 0;
}
int do_trace3(struct pt_regs *ctx) {
+ struct probe_result_t3 result = {};
+ bpf_usdt_readarg(1, ctx, &result.v1);
+ bpf_usdt_readarg(2, ctx, &result.v2);
+ event3.perf_submit(ctx, &result, sizeof(result));
return 0;
}
"""
u.enable_probe(probe="probe_point_3", fn_name="do_trace3")
b = BPF(text=self.bpf_text, usdt_contexts=[u])
+ # Event states for each event:
+ # 0 - probe not caught, 1 - probe caught with correct value,
+ # 2 - probe caught with incorrect value
+ self.evt_st_1 = 0
+ self.evt_st_2 = 0
+ self.evt_st_3 = 0
+
+ # define output data structure in Python
+ class Data1(ct.Structure):
+ _fields_ = [("v1", ct.c_char),
+ ("v2", ct.c_int)]
+
+ class Data2(ct.Structure):
+ _fields_ = [("v1", ct.c_int),
+ ("v2", ct.c_char)]
+
+ class Data3(ct.Structure):
+ _fields_ = [("v1", ct.c_int),
+ ("v2", ct.c_int)]
+
+ def check_event_val(event, event_state, v1, v2, v3, v4):
+ if ((event.v1 == v1 and event.v2 == v2) or (event.v1 == v3 and event.v2 == v4)):
+ if (event_state == 0 or event_state == 1):
+ return 1
+ return 2
+
+ def print_event1(cpu, data, size):
+ event = ct.cast(data, ct.POINTER(Data1)).contents
+ self.evt_st_1 = check_event_val(event, self.evt_st_1, b'\x0d', 40, b'\x08', 200)
+
+ def print_event2(cpu, data, size):
+ event = ct.cast(data, ct.POINTER(Data2)).contents
+ # pretend we have two identical probe points to simplify the code
+ self.evt_st_2 = check_event_val(event, self.evt_st_2, 5, b'\x04', 5, b'\x04')
+
+ def print_event3(cpu, data, size):
+ event = ct.cast(data, ct.POINTER(Data3)).contents
+ self.evt_st_3 = check_event_val(event, self.evt_st_3, 200, 40, 8, 13)
+
+ # loop with callback to print_event
+ b["event1"].open_perf_buffer(print_event1)
+ b["event2"].open_perf_buffer(print_event2)
+ b["event3"].open_perf_buffer(print_event3)
+
+ # three iterations to make sure we get some probes and have time to process them
+ for i in range(3):
+ b.kprobe_poll()
+ self.assertTrue(self.evt_st_1 == 1 and self.evt_st_2 == 1 and self.evt_st_3 == 1)
+
def tearDown(self):
# kill the subprocess, clean the environment
self.app.kill()