Add per-cpu parameters to buffered perf output
authorBrenden Blanco <bblanco@plumgrid.com>
Fri, 6 Nov 2015 18:43:05 +0000 (10:43 -0800)
committerBrenden Blanco <bblanco@plumgrid.com>
Fri, 6 Nov 2015 19:52:38 +0000 (11:52 -0800)
Signed-off-by: Brenden Blanco <bblanco@plumgrid.com>
examples/tracing/trace_perf_output.py [changed mode: 0644->0755]
src/cc/libbpf.c
src/libbpf.h
src/python/bcc/__init__.py

old mode 100644 (file)
new mode 100755 (executable)
index 3e28d0f..3d56e9e
@@ -9,29 +9,33 @@
 import atexit
 from bcc import BPF
 import ctypes
+import multiprocessing
 
 counter = 0
-def cb(foo, data, size):
+def cb(cookie, data, size):
     global counter
     counter += 1
 
 prog = """
-BPF_PERF_ARRAY(events, 2);
+BPF_PERF_ARRAY(events, NUMCPU);
 BPF_TABLE("array", int, u64, counters, 10);
 int kprobe__sys_write(void *ctx) {
   struct {
     u64 ts;
   } data = {bpf_ktime_get_ns()};
-  if (events.perf_output(ctx, 0, &data, sizeof(data)) < 0)
-    bpf_trace_printk("perf_output failed\\n");
+  int rc;
+  if ((rc = events.perf_output(ctx, bpf_get_smp_processor_id(), &data, sizeof(data))) < 0)
+    bpf_trace_printk("perf_output failed: %d\\n", rc);
   int zero = 0;
   u64 *val = counters.lookup(&zero);
   if (val) lock_xadd(val, 1);
   return 0;
 }
 """
+numcpu = multiprocessing.cpu_count()
+prog = prog.replace("NUMCPU", str(numcpu))
 b = BPF(text=prog)
-b["events"].open_perf_buffer(0, cb, None)
+b["events"].open_perf_buffers(cb, None)
 
 @atexit.register
 def print_counter():
@@ -39,5 +43,7 @@ def print_counter():
     global b
     print("counter = %d vs %d" % (counter, b["counters"][ctypes.c_int(0)].value))
 
+print("Tracing sys_write, try `dd if=/dev/zero of=/dev/null`")
+print("Tracing... Hit Ctrl-C to end.")
 while 1:
     b.kprobe_poll()
index c7da5fe..fefa294 100644 (file)
@@ -178,7 +178,7 @@ int bpf_attach_socket(int sock, int prog) {
 
 static int bpf_attach_tracing_event(int progfd, const char *event_path,
     struct perf_reader *reader, int pid, int cpu, int group_fd) {
-  int efd = -1, rc = -1, pfd;
+  int efd = -1, pfd;
   ssize_t bytes;
   char buf[256];
   struct perf_event_attr attr = {};
@@ -187,13 +187,13 @@ static int bpf_attach_tracing_event(int progfd, const char *event_path,
   efd = open(buf, O_RDONLY, 0);
   if (efd < 0) {
     fprintf(stderr, "open(%s): %s\n", buf, strerror(errno));
-    goto cleanup;
+    goto error;
   }
 
   bytes = read(efd, buf, sizeof(buf));
   if (bytes <= 0 || bytes >= sizeof(buf)) {
     fprintf(stderr, "read(%s): %s\n", buf, strerror(errno));
-    goto cleanup;
+    goto error;
   }
   buf[bytes] = '\0';
   attr.config = strtol(buf, NULL, 0);
@@ -204,126 +204,128 @@ static int bpf_attach_tracing_event(int progfd, const char *event_path,
   pfd = syscall(__NR_perf_event_open, &attr, pid, cpu, group_fd, PERF_FLAG_FD_CLOEXEC);
   if (pfd < 0) {
     perror("perf_event_open");
-    goto cleanup;
+    goto error;
   }
   perf_reader_set_fd(reader, pfd);
 
   if (perf_reader_mmap(reader, attr.type, attr.sample_type) < 0)
-    goto cleanup;
+    goto error;
 
   if (ioctl(pfd, PERF_EVENT_IOC_SET_BPF, progfd) < 0) {
     perror("ioctl(PERF_EVENT_IOC_SET_BPF)");
-    goto cleanup;
+    goto error;
   }
   if (ioctl(pfd, PERF_EVENT_IOC_ENABLE, 0) < 0) {
     perror("ioctl(PERF_EVENT_IOC_ENABLE)");
-    goto cleanup;
+    goto error;
   }
 
-  rc = 0;
+  return 0;
 
-cleanup:
+error:
   if (efd >= 0)
     close(efd);
 
-  return rc;
+  return -1;
 }
 
 void * bpf_attach_kprobe(int progfd, const char *event,
                          const char *event_desc, pid_t pid,
                          int cpu, int group_fd, perf_reader_cb cb,
                          void *cb_cookie) {
-  int rc = -1, kfd = -1;
+  int kfd = -1;
   char buf[256];
   struct perf_reader *reader = NULL;
 
   reader = perf_reader_new(cb, NULL, cb_cookie);
   if (!reader)
-    goto cleanup;
+    goto error;
 
   kfd = open("/sys/kernel/debug/tracing/kprobe_events", O_WRONLY | O_APPEND, 0);
   if (kfd < 0) {
     perror("open(kprobe_events)");
-    goto cleanup;
+    goto error;
   }
 
   if (write(kfd, event_desc, strlen(event_desc)) < 0) {
     fprintf(stderr, "write of \"%s\" into kprobe_events failed: %s\n", event_desc, strerror(errno));
     if (errno == EINVAL)
       fprintf(stderr, "check dmesg output for possible cause\n");
-    goto cleanup;
+    goto error;
   }
 
   snprintf(buf, sizeof(buf), "/sys/kernel/debug/tracing/events/kprobes/%s", event);
-  rc = bpf_attach_tracing_event(progfd, buf, reader, pid, cpu, group_fd);
+  if (bpf_attach_tracing_event(progfd, buf, reader, pid, cpu, group_fd) < 0)
+    goto error;
 
-cleanup:
+  return reader;
+
+error:
   if (kfd >= 0)
     close(kfd);
-  if (reader && rc < 0) {
+  if (reader)
     perf_reader_free(reader);
-    reader = NULL;
-  }
 
-  return reader;
+  return NULL;
 }
 
 int bpf_detach_kprobe(const char *event_desc) {
-  int rc = -1, kfd = -1;
+  int kfd = -1;
 
   kfd = open("/sys/kernel/debug/tracing/kprobe_events", O_WRONLY | O_APPEND, 0);
   if (kfd < 0) {
     perror("open(kprobe_events)");
-    goto cleanup;
+    goto error;
   }
 
   if (write(kfd, event_desc, strlen(event_desc)) < 0) {
     perror("write(kprobe_events)");
-    goto cleanup;
+    goto error;
   }
-  rc = 0;
 
-cleanup:
+  return 0;
+
+error:
   if (kfd >= 0)
     close(kfd);
 
-  return rc;
+  return -1;
 }
 
-void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb, void *cb_cookie) {
-  int rc = -1, pfd;
+void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb, void *cb_cookie, int pid, int cpu) {
+  int pfd;
   struct perf_event_attr attr = {};
+  struct perf_reader *reader = NULL;
 
-  struct perf_reader *reader = perf_reader_new(NULL, raw_cb, cb_cookie);
-
+  reader = perf_reader_new(NULL, raw_cb, cb_cookie);
   if (!reader)
-    goto cleanup;
+    goto error;
 
   attr.config = PERF_COUNT_SW_BPF_OUTPUT;
   attr.type = PERF_TYPE_SOFTWARE;
   attr.sample_type = PERF_SAMPLE_RAW;
-  pfd = syscall(__NR_perf_event_open, &attr, -1, 0, -1, PERF_FLAG_FD_CLOEXEC);
+  attr.sample_period = 1;
+  attr.wakeup_events = 1;
+  pfd = syscall(__NR_perf_event_open, &attr, pid, cpu, -1, PERF_FLAG_FD_CLOEXEC);
   if (pfd < 0) {
     perror("perf_event_open");
-    goto cleanup;
+    goto error;
   }
   perf_reader_set_fd(reader, pfd);
 
   if (perf_reader_mmap(reader, attr.type, attr.sample_type) < 0)
-    goto cleanup;
+    goto error;
 
   if (ioctl(pfd, PERF_EVENT_IOC_ENABLE, 0) < 0) {
     perror("ioctl(PERF_EVENT_IOC_ENABLE)");
-    goto cleanup;
+    goto error;
   }
 
-  rc = 0;
+  return reader;
 
-cleanup:
-  if (reader && rc < 0) {
+error:
+  if (reader)
     perf_reader_free(reader);
-    reader = NULL;
-  }
 
-  return reader;
+  return NULL;
 }
index 0204799..1b8e622 100644 (file)
@@ -48,6 +48,7 @@ void * bpf_attach_kprobe(int progfd, const char *event, const char *event_desc,
                          int pid, int cpu, int group_fd, perf_reader_cb cb,
                          void *cb_cookie);
 int bpf_detach_kprobe(const char *event_desc);
+void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb, void *cb_cookie, int pid, int cpu);
 
 #define LOG_BUF_SIZE 65536
 extern char bpf_log_buf[LOG_BUF_SIZE];
index a3c7597..5ab826b 100644 (file)
@@ -18,6 +18,7 @@ from collections import MutableMapping
 import ctypes as ct
 import fcntl
 import json
+import multiprocessing
 import os
 from subprocess import Popen, PIPE
 import sys
@@ -95,7 +96,7 @@ lib.bpf_attach_kprobe.argtypes = [ct.c_int, ct.c_char_p, ct.c_char_p, ct.c_int,
 lib.bpf_detach_kprobe.restype = ct.c_int
 lib.bpf_detach_kprobe.argtypes = [ct.c_char_p]
 lib.bpf_open_perf_buffer.restype = ct.c_void_p
-lib.bpf_open_perf_buffer.argtypes = [_RAW_CB_TYPE, ct.py_object]
+lib.bpf_open_perf_buffer.argtypes = [_RAW_CB_TYPE, ct.py_object, ct.c_int, ct.c_int]
 lib.perf_reader_poll.restype = ct.c_int
 lib.perf_reader_poll.argtypes = [ct.c_int, ct.POINTER(ct.c_void_p), ct.c_int]
 lib.perf_reader_free.restype = None
@@ -148,6 +149,7 @@ class BPF(object):
             self.Key = keytype
             self.Leaf = leaftype
             self.ttype = lib.bpf_table_type_id(self.bpf.module, self.map_id)
+            self._cbs = {}
 
         def key_sprintf(self, key):
             key_p = ct.pointer(key)
@@ -185,20 +187,42 @@ class BPF(object):
                 raise Exception("Could not scanf leaf")
             return leaf
 
-        def open_perf_buffer(self, key, cb, cookie):
-            reader = lib.bpf_open_perf_buffer(_RAW_CB_TYPE(cb),
-                    ct.cast(id(cookie), ct.py_object))
+        def open_perf_buffers(self, cb, cookie):
+            """open_perf_buffers(cb, cookie)
+
+            Opens ring buffers, one for each cpu, to receive custom perf event
+            data from the bpf program. The program is expected to use the cpu-id
+            as the key of the perf_output call.
+            """
+
+            for i in range(0, multiprocessing.cpu_count()):
+                self.open_perf_buffer(i, cb, cookie, cpu=i)
+
+        def open_perf_buffer(self, key, cb, cookie, pid=-1, cpu=0):
+            """open_perf_buffer(key, cb, cookie, pid=-1, cpu=0)
+
+            Open a ring buffer to receive custom perf event data from the bpf
+            program. The callback cb is invoked for each event submitted, which
+            can be up to millions of events per second. The signature of cb
+            should be cb(cookie, data, data_size).
+            """
+
+            fn = _RAW_CB_TYPE(lambda x, data, size: cb(cookie, data, size))
+            reader = lib.bpf_open_perf_buffer(fn, None, pid, cpu)
             if not reader:
                 raise Exception("Could not open perf buffer")
             fd = lib.perf_reader_fd(reader)
             self[self.Key(key)] = self.Leaf(fd)
             open_kprobes[(id(self), key)] = reader
+            # keep a refcnt
+            self._cbs[key] = (fn, cookie)
 
         def close_perf_buffer(self, key):
             reader = open_kprobes.get((id(self), key))
             if reader:
                 lib.perf_reader_free(reader)
                 del(open_kprobes[(id(self), key)])
+            del self._cbs[key]
 
         def __getitem__(self, key):
             key_p = ct.pointer(key)
@@ -810,10 +834,10 @@ class BPF(object):
         Poll from the ring buffers for all of the open kprobes, calling the
         cb() that was given in the BPF constructor for each entry.
         """
-        readers = (ct.c_void_p * len(open_kprobes))()
-        for i, v in enumerate(open_kprobes.values()):
-            readers[i] = v
         try:
+            readers = (ct.c_void_p * len(open_kprobes))()
+            for i, v in enumerate(open_kprobes.values()):
+                readers[i] = v
             lib.perf_reader_poll(len(open_kprobes), readers, timeout)
         except KeyboardInterrupt:
             exit()