Add benchmark for peer-to-peer dbus communication using libglib 60/267160/7
authorAdam Michalski <a.michalski2@partner.samsung.com>
Fri, 26 Nov 2021 10:50:18 +0000 (11:50 +0100)
committerAdam Michalski <a.michalski2@partner.samsung.com>
Mon, 13 Dec 2021 16:46:03 +0000 (17:46 +0100)
This commit adds new set of DBus-Glib benchmarks. The difference from
previous benchmarks is that clients communicate directly
(using libglib private socket) and not through dbus-daemon.

The benchmark should allow to measure latency/throughput of
socket-based communication with dbus data format. Compared to
communication to dbus-daemon the following is not a part of
the transfer and needs to be kept in mind while comparing
performance that:

- policy is not checked
- only 1-to-1 communication is allowed (no broadcasts).

Change-Id: I84d090a0f3e0d70f3ceb20de36dac41e19e38ae7

Makefile
benchmark/common.c
benchmark/p2p-gdbus.c [new file with mode: 0644]
packaging/dbus-tools.spec

index 11d38e93546d753bf5adfe776b953f30a3e0f3e7..457b555a2e0193d0ea5bccad8c798e254b6a0df5 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -10,6 +10,7 @@ all:
        gcc $(CFLAGS) -o benchmark/socket $(DFTSRC) benchmark/socket.c $(LDFLAGS)
        gcc $(CFLAGS) -o benchmark/sharedmem $(DFTSRC) benchmark/sharedmem.c $(LDFLAGS)
        gcc $(CFLAGS) -o benchmark/gdbus $(DFTSRC) benchmark/gdbus.c $(LDFLAGS)
+       gcc $(CFLAGS) -o benchmark/p2p-gdbus $(DFTSRC) benchmark/p2p-gdbus.c $(LDFLAGS)
        gcc $(CFLAGS) -o benchmark/libdbus $(DFTSRC) benchmark/libdbus.c $(LDFLAGS)
 
 
@@ -20,4 +21,5 @@ install:
        cp benchmark/socket $(DESTDIR)/usr/bin/socket
        cp benchmark/sharedmem $(DESTDIR)/usr/bin/sharedmem
        cp benchmark/gdbus $(DESTDIR)/usr/bin/gdbus
+       cp benchmark/p2p-gdbus $(DESTDIR)/usr/bin/p2p-gdbus
        cp benchmark/libdbus $(DESTDIR)/usr/bin/libdbus
index d8d5cee651c7648d7fa4326f60f45832e7ebffae..d916711018b721ef74e0ee22ad0f5720075a6d49 100644 (file)
@@ -22,5 +22,9 @@ void print_help(char **argv)
        printf("-p: the number of process (default: the number of CPU)\n");
        printf("-b: measure bandwidth\n");
        printf("-l: measure latency\n");
+       printf("-t: number of real tries (default: 1000)\n");
+       printf("-c: specifies whether client & server processes should be pinned "
+               "to the same or different CPU cores (default: different)\n");
+       printf("-v: verbose (both receiving & sending parties)\n");
        printf("-h: print help page\n");
 }
diff --git a/benchmark/p2p-gdbus.c b/benchmark/p2p-gdbus.c
new file mode 100644 (file)
index 0000000..8a38e93
--- /dev/null
@@ -0,0 +1,630 @@
+/*
+ * p2pgdbus.c - gdbus peer to peer IPC latency & bandwidth
+ */
+
+#include <gio/gio.h>
+#include "common.h"
+
+#define SVC_NAME                "/tmp/my-test-service"
+#define DBUS_TEST_SVC_NAME      "unix:path=" SVC_NAME
+
+int nprocs;                         /* number of processes (for bandwidth) */
+int msize;                          /* message size */
+int n_real_tries;                   /* number of real tries */
+int lt_cnt;
+double min, max, total;             /* minimum, maximum and total time (for latency) */
+unsigned long long bw;              /* bandwidth (partial result) */
+bool verbose;                       /* displays details of communication between peers */
+bool cpu_pin_to_same_core;          /* specifies, whether client & server processes are pinned
+                                       to the same core */
+bool lt_on,                         /* latency test */
+     bw_on;                         /* bandwidth test */
+
+static GDBusNodeInfo *introspection_data = NULL;
+
+typedef struct _state {
+       int iters;                  /* number of test iterations */
+       char *buf;                  /* message buffer */
+       bool is_lt;                 /* true for latency test, false for bandwidth */
+       GMainLoop *main_loop;       /* reference to the main GLib event loop */
+       const char *path;           /* object path (for on_new_connection routine) */
+       bool verbose;               /* for debug purposes */
+} State;
+
+static unsigned long long get_current_time(void)
+{
+       struct timespec clock;
+       clock_gettime (CLOCK_REALTIME, &clock);
+       return (unsigned long long)clock.tv_sec * NS
+               + (unsigned long long)clock.tv_nsec;
+}
+
+static void handle_method_call(GDBusConnection *connection,
+                               const gchar *sender,
+                               const gchar *object_path,
+                               const gchar *interface_name,
+                               const gchar *method_name,
+                               GVariant *parameters,
+                               GDBusMethodInvocation *invocation,
+                               gpointer user_data)
+{
+       static int cnt = 0;
+       static int bw_cnt = 0;
+       static unsigned long long start;
+       unsigned long long end;
+       unsigned long long temp;
+       State *state = (State *)user_data;
+
+       if (g_strcmp0(method_name, "Perf") != 0)
+               return;
+
+       if (state->is_lt)
+               g_variant_get(parameters, "(ts)", &start, &state->buf);
+       else
+               g_variant_get (parameters, "(ts)", &temp, &state->buf);
+
+       g_object_unref(invocation);
+
+       if (++cnt == state->iters) {
+               cnt = 0;
+               g_main_loop_quit(state->main_loop);
+       }
+
+       if (state->buf[0] != 'R')       /* skip warm-up tries */
+               return;
+
+       if (state->is_lt) {             /* we're measuring latency */
+               end = get_current_time();
+               double lt = (double)(end - start) / 1000;
+               if (state->verbose)
+                       g_print("latency: %0.4f\n", lt);
+               if (lt > 0) {           /* latency */
+                       lt_cnt++;
+                       total += lt;
+                       if (lt > max)
+                               max = lt;
+                       if (lt < min)
+                               min = lt;
+               }
+       }
+       else {                          /* we're measuring bandwidth */
+               bw_cnt++;
+               if (state->verbose)
+                       printf("-> bw_cnt = %d (from: %s)\n", bw_cnt, interface_name);
+               if (bw_cnt == 1) {
+                       start = get_current_time();
+               }
+
+               if (bw_cnt == n_real_tries) {
+                       end = get_current_time();
+                       bw = ((unsigned long long)NS * bw_cnt)/(end - start);
+                       printf("bandwidth: %llu (/s)\n", bw);
+               }
+       }
+
+       if (state->verbose) {
+               unsigned long long ts = (state->is_lt ? start : temp);
+               char timestamp[30] = "";
+               if (ts > 0)
+                       sprintf(timestamp, "[%llu]", ts);
+               if (msize > 64) {
+                       state->buf[64] = '\0';  /* truncated the msg deliberately for display purposes */
+                       g_print("Client said: %s (TRUNCATED) %s\n", state->buf, timestamp);
+               }
+               else
+                       g_print("Client said: %s %s\n", state->buf, timestamp);
+       }
+}
+
+static const GDBusInterfaceVTable interface_vtable = {
+       handle_method_call,
+       NULL,
+       NULL,
+};
+
+static gboolean allow_mechanism_cb(GDBusAuthObserver *observer,
+                               const gchar *mechanism,
+                               G_GNUC_UNUSED gpointer user_data)
+{
+       State *state = (State *)user_data;
+       if (state->verbose)
+               g_print ("Considering whether to accept %s authentication...\n", mechanism);
+       return TRUE;
+}
+
+static gboolean authorize_authenticated_peer_cb(GDBusAuthObserver *observer,
+                               G_GNUC_UNUSED GIOStream *stream,
+                               GCredentials *credentials,
+                               G_GNUC_UNUSED gpointer user_data)
+{
+       State *state = (State *)user_data;
+       gboolean authorized = FALSE;
+
+       if (state->verbose)
+               g_print("Considering whether to authorize authenticated peer...\n");
+
+       if (credentials != NULL) {
+               GCredentials *own_credentials;
+               gchar *credentials_string = NULL;
+               // client's credentials
+               credentials_string = g_credentials_to_string(credentials);
+               if (state->verbose)
+                       g_print("Peer's credentials: %s\n", credentials_string);
+               g_free(credentials_string);
+               // server's credentials
+               own_credentials = g_credentials_new();
+               credentials_string = g_credentials_to_string(own_credentials);
+               if (state->verbose)
+                       g_print("Server's credentials: %s\n", credentials_string);
+               g_free(credentials_string);
+
+               if (g_credentials_is_same_user(credentials, own_credentials, NULL))
+                       authorized = TRUE;
+
+               g_object_unref(own_credentials);
+       }
+
+       if (!authorized) {
+               if (state->verbose) {
+                       g_print("A server would often not want to authorize this identity\n");
+                       g_print("Authorizing it anyway for demonstration purposes\n");
+               }
+               authorized = TRUE;
+       }
+
+       return authorized;
+}
+
+static void connection_closed(GDBusConnection *connection,
+                       gboolean remote_peer_vanished,
+                       GError *Error,
+                       gpointer user_data)
+{
+       State *state = (State *)user_data;
+       if (state->verbose)
+               g_print("Client disconnected.\n");
+       g_object_unref(connection);
+}
+
+static gboolean on_new_connection(GDBusServer *server,
+                               GDBusConnection *connection,
+                               gpointer user_data)
+{
+       guint registration_id;
+       GCredentials *credentials;
+       gchar *s;
+       State *state = (State *)user_data;
+
+       credentials = g_dbus_connection_get_peer_credentials(connection);
+       if (credentials == NULL)
+               s = g_strdup("(no credentials received)");
+       else
+               s = g_credentials_to_string(credentials);
+
+       if (state->verbose)
+               g_print("Client connected.\n"
+                               "Peer credentials: %s\n"
+                               "Negotiated capabilities: unix-fd-passing=%d\n",
+                               s,
+                               g_dbus_connection_get_capabilities(connection) & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING);
+
+       g_object_ref(connection);
+       g_signal_connect(connection, "closed", G_CALLBACK(connection_closed), (gpointer)state);
+       registration_id = g_dbus_connection_register_object(connection,
+                                               state->path,
+                                               introspection_data->interfaces[0],
+                                               &interface_vtable,
+                                               (gpointer)state,
+                                               NULL,   /* user_data_free_func */
+                                               NULL);  /* GError** */
+       g_assert(registration_id > 0);
+
+       g_free(s);
+       return TRUE;
+}
+
+int Receive(int size, const char *name, const char *path, const char *number, bool is_lt, bool verbose)
+{
+       gint ret = 1;
+       GError *error = NULL;
+       GMainLoop *loop;
+
+       GDBusAuthObserver *observer;
+       GDBusServerFlags server_flags = G_DBUS_SERVER_FLAGS_NONE | G_DBUS_SERVER_FLAGS_AUTHENTICATION_ALLOW_ANONYMOUS;
+       GDBusServer *server;
+       gchar *guid;
+
+       char *buf = malloc(size);
+
+       loop = g_main_loop_new(NULL, FALSE);
+
+       static gchar introspection_xml[5000];
+       sprintf(introspection_xml,
+               "<node>"
+               "       <interface name='%s'>"
+               "               <method name='Perf'>"
+               "                       <arg type='t' name='Start' direction='in'/>"
+               "                       <arg type='s' name='Msg' direction='in'/>"
+               "               </method>"
+               "       </interface>"
+               "</node>", name);
+       introspection_data = g_dbus_node_info_new_for_xml(introspection_xml, NULL);
+
+       State state;
+       state.iters = WARMUP_TRY + n_real_tries;
+       state.is_lt = is_lt;
+       state.buf = buf;
+       state.main_loop = loop;
+       state.verbose = verbose;
+       state.path = path;
+
+       guid = g_dbus_generate_guid ();
+
+       char svc_name[strlen(SVC_NAME) + strlen(number) + 1];
+       strncpy(&svc_name[0], SVC_NAME, strlen(SVC_NAME)+1);
+       strcpy(&svc_name[strlen(SVC_NAME)], number);
+
+       unlink(svc_name);
+
+       char dbus_test_svc_name[strlen(DBUS_TEST_SVC_NAME) + strlen(number) + 1];
+       strncpy(&dbus_test_svc_name[0], DBUS_TEST_SVC_NAME, strlen(DBUS_TEST_SVC_NAME)+1);
+       strcpy(&dbus_test_svc_name[strlen(DBUS_TEST_SVC_NAME)], number);
+
+       observer = g_dbus_auth_observer_new();
+       if (observer == NULL) {
+               g_printerr("Error creating authenticating observer object.");
+               g_error_free(error);
+               goto out;
+       }
+       g_signal_connect(observer,
+                        "allow-mechanism",
+                        G_CALLBACK(allow_mechanism_cb),
+                        (gpointer)&state);
+       g_signal_connect(observer,
+                       "authorize-authenticated-peer",
+                       G_CALLBACK(authorize_authenticated_peer_cb),
+                       (gpointer)&state);
+
+       server = g_dbus_server_new_sync (dbus_test_svc_name,
+                               server_flags,
+                               guid,
+                               observer,
+                               NULL, /* GCancellable */
+                               &error);
+
+       if (server == NULL)     {
+               g_printerr("Error creating server at address %s: %s\n", dbus_test_svc_name, error->message);
+               g_error_free(error);
+               goto out;
+       }
+
+       g_dbus_server_start(server);
+
+       g_object_unref(observer);
+       g_free (guid);
+
+       if (verbose)
+               g_print("Server is listening at: %s\n", g_dbus_server_get_client_address(server));
+
+       g_signal_connect(server,
+                                        "new-connection",
+                                        G_CALLBACK(on_new_connection),
+                                        (gpointer)&state);
+
+       // Run main GLib loop
+       g_main_loop_run(loop);
+
+       g_object_unref(server);
+       g_main_loop_unref(loop);
+
+       g_dbus_node_info_unref(introspection_data);
+
+       free(buf);
+       ret = 0;
+
+out:
+       return ret;
+}
+
+void Send(int size, const char *name, const char *path, const char *number, bool is_lt, bool verbose)
+{
+       GDBusConnection *connection;
+       GError *error = NULL;
+       register char *cptr = malloc(size);
+       unsigned long long start;
+       int i, j;
+
+       char dbus_test_svc_name[strlen(DBUS_TEST_SVC_NAME) + strlen(number) + 1];
+       strncpy(&dbus_test_svc_name[0], DBUS_TEST_SVC_NAME, strlen(DBUS_TEST_SVC_NAME)+1);
+       strcpy(&dbus_test_svc_name[strlen(DBUS_TEST_SVC_NAME)], number);
+
+       connection = g_dbus_connection_new_for_address_sync(dbus_test_svc_name,
+                               G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_CLIENT,
+                               NULL,      /* GDBusAuthObserver */
+                               NULL,      /* GCancellable */
+                               &error);
+       if (connection == NULL)
+       {
+               g_printerr("Error connecting to D-Bus address %s: %s\n", dbus_test_svc_name, error->message);
+               g_error_free(error);
+               free(cptr);
+               return;
+       }
+
+       if (verbose) {
+               g_print("Client connected.\n"
+                       "Negotiated capabilities: unix-fd-passing=%d\n",
+                       g_dbus_connection_get_capabilities(connection) & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING);
+       }
+
+       sleep(2);
+       for (i = 0; i < WARMUP_TRY + n_real_tries; ++i) {
+               for (j = 0; j < size-1; ++j)
+                       cptr[j] = '!' + (j + i) % ('~' - '!' + 1);
+               cptr[size-1] = '\0';
+
+               if (i >= WARMUP_TRY)
+                       cptr[0] = 'R';
+               else
+                       cptr[0] = 'W';
+
+               if (is_lt)
+                       start = get_current_time();
+               else
+                       start = 0;
+
+               g_dbus_connection_call(connection,
+                               NULL,           /* bus_name = NULL - peer to peer connection */
+                               path,           /* object_path */
+                               name,           /* interface_name */
+                               "Perf",         /* method_name */
+                               g_variant_new("(ts)", start, cptr),
+                               NULL,
+                               G_DBUS_CALL_FLAGS_NONE,
+                               -1,
+                               NULL,
+                               NULL,
+                               NULL);
+               if (is_lt) {
+                       if (size <= ONE_PAGE_SIZE)
+                               usleep(5000);
+                       else if (size <= 10 * ONE_PAGE_SIZE)
+                               usleep(10000);
+                       else if (size <= 100 * ONE_PAGE_SIZE)
+                               usleep(20000);
+                       else
+                               usleep(40000);
+               }
+       }
+
+       if (!is_lt)
+               sleep(5);
+       g_dbus_connection_close(connection, NULL, NULL, NULL);
+       g_object_unref(connection);
+       free(cptr);
+}
+
+void Measure_latency(bool verbose)
+{
+       if (!cpu_pin(0, getpid())) {
+               perror("CPU pinning for parent");
+               exit(1);
+       }
+
+       int pid = fork();
+       switch (pid) {
+       case -1:                /* fork error */
+               perror("fork");
+               return;
+       case 0:                 /* child process */
+               if (!cpu_pin(!cpu_pin_to_same_core, getpid())) {
+                       perror("CPU pinning for child");
+                       exit(1);
+               }
+               sleep(1);
+               Send(msize, "org.gtk.GDBus.TestPeerInterface", "/org/gtk/GDBus/TestObject", "0", true, verbose);
+               exit(0);
+       default:                /* parent process */
+               Receive(msize, "org.gtk.GDBus.TestPeerInterface", "/org/gtk/GDBus/TestObject", "0", true, verbose);
+               g_print("min: %0.4f(us), max: %0.4f(us), avg: %0.4f(us)\n", min, max, total/lt_cnt);
+               waitpid(pid, NULL, 0);    /* wait for the child */
+       }
+}
+
+void Measure_bandwidth(bool verbose)
+{
+       int ret;
+       int returnp[nprocs/2][2];
+       int pids[nprocs];
+       int ready_fd;
+       volatile char *ready_ptr;
+
+       ready_fd = shm_open("isready", O_CREAT | O_TRUNC | O_RDWR, 0666);
+       if (ready_fd == -1)
+       {
+               perror("Open");
+               return;
+       }
+       if (ftruncate(ready_fd, sizeof(char))) {
+               perror("Ftruncate");
+               return;
+       }
+       ready_ptr = mmap(0, sizeof(char), PROT_READ | PROT_WRITE,
+                       MAP_SHARED, ready_fd, 0);
+       if (ready_ptr == MAP_FAILED) {
+               perror("Mmap");
+               return;
+       }
+       close(ready_fd);
+       ready_ptr[0] = 'n';
+
+       for (int i = 0; i < nprocs/2; i++) {
+               if (pipe(returnp[i]) == -1) {
+                       perror("pipe");
+                       exit(1);
+               }
+       }
+
+       if (!cpu_pin(0, getpid())) {
+               perror("CPU pinning for parent");
+               exit(1);
+       }
+
+       for (int child = 1; child < nprocs; child++) {
+               char name[30];
+               char path[30];
+               char number[11];
+               int pinning_no;
+
+               sprintf(name, "org.gdbus.server%d", child/2);
+               sprintf(path, "/org/gdbus/server%d", child/2);
+               sprintf(number, "%d", child/2);
+               int pid = fork();
+               switch (pid) {
+               //child
+               case 0:
+                       pinning_no = (cpu_pin_to_same_core ? child/2 : child);
+                       if (!cpu_pin(pinning_no, getpid())) {
+                               perror("CPU pinning for child");
+                               exit(1);
+                       }
+
+                       // Wait for parent
+                       while (ready_ptr[0] == 'n') {
+                       }
+
+                       if (child % 2 == 0) {
+                               Receive(msize, (const char *)name, (const char *)path, (const char *)number, false, verbose);
+
+                               //send result(bandwidth) to parent
+                               {
+                                       int wres;
+                                       close(returnp[child/2][0]);
+                                       wres = write(returnp[child/2][1], (void *)&bw, sizeof(unsigned long long));
+                                       if (wres != sizeof(unsigned long long)) {
+                                               perror("(write return) read/write on pipe sender");
+                                               exit(1);
+                                       }
+                                       close(returnp[child/2][1]);
+                               }
+                       }
+                       else {
+                               sleep(1);
+                               Send(msize, (const char *)name, (const char *)path, (const char *)number, false, verbose);
+                       }
+
+                       exit(0);
+               case -1:
+                       perror("fork");
+                       return;
+               default:
+                       pids[child] = pid;
+                       break;
+               }
+       }
+
+       sleep(1);
+       ready_ptr[0] = 'y';
+
+       Receive(msize, "org.gdbus.server0", "/org/gdbus/server0", "0", false, verbose);
+       for (int child = 1; child < nprocs; child++) {
+               //receive result(bandwidth) from child(s)
+               if (child % 2 == 0) {
+                       int rres;
+                       unsigned long long child_bw;
+
+                       close(returnp[child/2][1]);
+                       rres = read(returnp[child/2][0], (void *)&child_bw, sizeof(unsigned long long));
+                       if (rres != sizeof(unsigned long long)) {
+                               perror("(write return) read/write on pipe receiver");
+                               exit(1);
+                       }
+                       close(returnp[child/2][0]);
+                       bw += child_bw;
+               }
+
+               waitpid(pids[child], NULL, 0);
+       }
+       ret = munmap((void*)ready_ptr, sizeof(bool));
+       if (ret != 0) {
+               perror("munmap");
+               exit(1);
+       }
+
+       ret = shm_unlink("isready");
+       if (ret != 0) {
+               perror("shm unlink");
+               //exit(1);
+       }
+       printf("Total bandwidth: %llu (/s)\n", bw);
+}
+
+int main(int argc, char *argv[])
+{
+       int opt;
+       msize = 3;
+       lt_on = bw_on = false;
+       verbose = false;
+
+       nprocs = get_nprocs();
+       n_real_tries = REAL_TRY;
+       cpu_pin_to_same_core = false;
+
+       while ((opt = getopt(argc, argv, "m:p:blht:cv")) != -1) {
+               switch(opt) {
+               case 'm':
+                       msize = atoi(optarg);
+                       break;
+               case 'p':
+                       nprocs = atoi(optarg);
+                       if (nprocs <= 0) {
+                               printf("The number of process should be larger than 0\n");
+                               exit(0);
+                       }
+                       if (nprocs % 2 != 0) {
+                               printf("The number of process should be even\n");
+                               exit(0);
+                       }
+                       break;
+               case 'b':
+                       bw_on = true;           /* bandwidth */
+                       break;
+               case 'l':
+                       lt_on = true;           /* latency */
+                       break;
+               case 't':
+                       n_real_tries = atoi(optarg);
+                       if (n_real_tries <= 0) {
+                               printf("Number of real tries should be larger than 0\n");
+                               exit(0);
+                       }
+                       break;
+               case 'c':
+                       cpu_pin_to_same_core = true;
+                       break;
+               case 'v':
+                       verbose = true;
+                       break;
+               case 'h':
+                       print_help(argv);
+                       exit(0);
+               }
+       }
+
+       if (!lt_on && !bw_on) {
+               exit(0);
+       }
+
+       bw = 0;
+       lt_cnt = 0;
+       max = total = 0.0;
+       min = 1000000.0;
+
+       if (lt_on)
+               Measure_latency(verbose);
+       if (bw_on)
+               Measure_bandwidth(verbose);
+
+       return 0;
+}
+
index d8c341216499373aeb068b60718dcfe588764fa4..d632658a6a6991f10deb10383c724cb6622be6a4 100644 (file)
@@ -56,3 +56,4 @@ make
 %{_bindir}/sharedmem
 %{_bindir}/gdbus
 %{_bindir}/libdbus
+%{_bindir}/p2p-gdbus