From a863b68f5fdd9c4254b05c3d69a796557f8eec7b Mon Sep 17 00:00:00 2001 From: Adam Michalski Date: Fri, 26 Nov 2021 11:50:18 +0100 Subject: [PATCH] Add benchmark for peer-to-peer dbus communication using libglib This commit adds new set of DBus-Glib benchmarks. The difference from previous benchmarks is that clients communicate directly (using libglib private socket) and not through dbus-daemon. The benchmark should allow to measure latency/throughput of socket-based communication with dbus data format. Compared to communication to dbus-daemon the following is not a part of the transfer and needs to be kept in mind while comparing performance that: - policy is not checked - only 1-to-1 communication is allowed (no broadcasts). Change-Id: I84d090a0f3e0d70f3ceb20de36dac41e19e38ae7 --- Makefile | 2 + benchmark/common.c | 4 + benchmark/p2p-gdbus.c | 630 ++++++++++++++++++++++++++++++++++++++ packaging/dbus-tools.spec | 1 + 4 files changed, 637 insertions(+) create mode 100644 benchmark/p2p-gdbus.c diff --git a/Makefile b/Makefile index 11d38e9..457b555 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,7 @@ all: gcc $(CFLAGS) -o benchmark/socket $(DFTSRC) benchmark/socket.c $(LDFLAGS) gcc $(CFLAGS) -o benchmark/sharedmem $(DFTSRC) benchmark/sharedmem.c $(LDFLAGS) gcc $(CFLAGS) -o benchmark/gdbus $(DFTSRC) benchmark/gdbus.c $(LDFLAGS) + gcc $(CFLAGS) -o benchmark/p2p-gdbus $(DFTSRC) benchmark/p2p-gdbus.c $(LDFLAGS) gcc $(CFLAGS) -o benchmark/libdbus $(DFTSRC) benchmark/libdbus.c $(LDFLAGS) @@ -20,4 +21,5 @@ install: cp benchmark/socket $(DESTDIR)/usr/bin/socket cp benchmark/sharedmem $(DESTDIR)/usr/bin/sharedmem cp benchmark/gdbus $(DESTDIR)/usr/bin/gdbus + cp benchmark/p2p-gdbus $(DESTDIR)/usr/bin/p2p-gdbus cp benchmark/libdbus $(DESTDIR)/usr/bin/libdbus diff --git a/benchmark/common.c b/benchmark/common.c index d8d5cee..d916711 100644 --- a/benchmark/common.c +++ b/benchmark/common.c @@ -22,5 +22,9 @@ void print_help(char **argv) printf("-p: the number of process (default: the number of CPU)\n"); printf("-b: measure bandwidth\n"); printf("-l: measure latency\n"); + printf("-t: number of real tries (default: 1000)\n"); + printf("-c: specifies whether client & server processes should be pinned " + "to the same or different CPU cores (default: different)\n"); + printf("-v: verbose (both receiving & sending parties)\n"); printf("-h: print help page\n"); } diff --git a/benchmark/p2p-gdbus.c b/benchmark/p2p-gdbus.c new file mode 100644 index 0000000..8a38e93 --- /dev/null +++ b/benchmark/p2p-gdbus.c @@ -0,0 +1,630 @@ +/* + * p2pgdbus.c - gdbus peer to peer IPC latency & bandwidth + */ + +#include +#include "common.h" + +#define SVC_NAME "/tmp/my-test-service" +#define DBUS_TEST_SVC_NAME "unix:path=" SVC_NAME + +int nprocs; /* number of processes (for bandwidth) */ +int msize; /* message size */ +int n_real_tries; /* number of real tries */ +int lt_cnt; +double min, max, total; /* minimum, maximum and total time (for latency) */ +unsigned long long bw; /* bandwidth (partial result) */ +bool verbose; /* displays details of communication between peers */ +bool cpu_pin_to_same_core; /* specifies, whether client & server processes are pinned + to the same core */ +bool lt_on, /* latency test */ + bw_on; /* bandwidth test */ + +static GDBusNodeInfo *introspection_data = NULL; + +typedef struct _state { + int iters; /* number of test iterations */ + char *buf; /* message buffer */ + bool is_lt; /* true for latency test, false for bandwidth */ + GMainLoop *main_loop; /* reference to the main GLib event loop */ + const char *path; /* object path (for on_new_connection routine) */ + bool verbose; /* for debug purposes */ +} State; + +static unsigned long long get_current_time(void) +{ + struct timespec clock; + clock_gettime (CLOCK_REALTIME, &clock); + return (unsigned long long)clock.tv_sec * NS + + (unsigned long long)clock.tv_nsec; +} + +static void handle_method_call(GDBusConnection *connection, + const gchar *sender, + const gchar *object_path, + const gchar *interface_name, + const gchar *method_name, + GVariant *parameters, + GDBusMethodInvocation *invocation, + gpointer user_data) +{ + static int cnt = 0; + static int bw_cnt = 0; + static unsigned long long start; + unsigned long long end; + unsigned long long temp; + State *state = (State *)user_data; + + if (g_strcmp0(method_name, "Perf") != 0) + return; + + if (state->is_lt) + g_variant_get(parameters, "(ts)", &start, &state->buf); + else + g_variant_get (parameters, "(ts)", &temp, &state->buf); + + g_object_unref(invocation); + + if (++cnt == state->iters) { + cnt = 0; + g_main_loop_quit(state->main_loop); + } + + if (state->buf[0] != 'R') /* skip warm-up tries */ + return; + + if (state->is_lt) { /* we're measuring latency */ + end = get_current_time(); + double lt = (double)(end - start) / 1000; + if (state->verbose) + g_print("latency: %0.4f\n", lt); + if (lt > 0) { /* latency */ + lt_cnt++; + total += lt; + if (lt > max) + max = lt; + if (lt < min) + min = lt; + } + } + else { /* we're measuring bandwidth */ + bw_cnt++; + if (state->verbose) + printf("-> bw_cnt = %d (from: %s)\n", bw_cnt, interface_name); + if (bw_cnt == 1) { + start = get_current_time(); + } + + if (bw_cnt == n_real_tries) { + end = get_current_time(); + bw = ((unsigned long long)NS * bw_cnt)/(end - start); + printf("bandwidth: %llu (/s)\n", bw); + } + } + + if (state->verbose) { + unsigned long long ts = (state->is_lt ? start : temp); + char timestamp[30] = ""; + if (ts > 0) + sprintf(timestamp, "[%llu]", ts); + if (msize > 64) { + state->buf[64] = '\0'; /* truncated the msg deliberately for display purposes */ + g_print("Client said: %s (TRUNCATED) %s\n", state->buf, timestamp); + } + else + g_print("Client said: %s %s\n", state->buf, timestamp); + } +} + +static const GDBusInterfaceVTable interface_vtable = { + handle_method_call, + NULL, + NULL, +}; + +static gboolean allow_mechanism_cb(GDBusAuthObserver *observer, + const gchar *mechanism, + G_GNUC_UNUSED gpointer user_data) +{ + State *state = (State *)user_data; + if (state->verbose) + g_print ("Considering whether to accept %s authentication...\n", mechanism); + return TRUE; +} + +static gboolean authorize_authenticated_peer_cb(GDBusAuthObserver *observer, + G_GNUC_UNUSED GIOStream *stream, + GCredentials *credentials, + G_GNUC_UNUSED gpointer user_data) +{ + State *state = (State *)user_data; + gboolean authorized = FALSE; + + if (state->verbose) + g_print("Considering whether to authorize authenticated peer...\n"); + + if (credentials != NULL) { + GCredentials *own_credentials; + gchar *credentials_string = NULL; + // client's credentials + credentials_string = g_credentials_to_string(credentials); + if (state->verbose) + g_print("Peer's credentials: %s\n", credentials_string); + g_free(credentials_string); + // server's credentials + own_credentials = g_credentials_new(); + credentials_string = g_credentials_to_string(own_credentials); + if (state->verbose) + g_print("Server's credentials: %s\n", credentials_string); + g_free(credentials_string); + + if (g_credentials_is_same_user(credentials, own_credentials, NULL)) + authorized = TRUE; + + g_object_unref(own_credentials); + } + + if (!authorized) { + if (state->verbose) { + g_print("A server would often not want to authorize this identity\n"); + g_print("Authorizing it anyway for demonstration purposes\n"); + } + authorized = TRUE; + } + + return authorized; +} + +static void connection_closed(GDBusConnection *connection, + gboolean remote_peer_vanished, + GError *Error, + gpointer user_data) +{ + State *state = (State *)user_data; + if (state->verbose) + g_print("Client disconnected.\n"); + g_object_unref(connection); +} + +static gboolean on_new_connection(GDBusServer *server, + GDBusConnection *connection, + gpointer user_data) +{ + guint registration_id; + GCredentials *credentials; + gchar *s; + State *state = (State *)user_data; + + credentials = g_dbus_connection_get_peer_credentials(connection); + if (credentials == NULL) + s = g_strdup("(no credentials received)"); + else + s = g_credentials_to_string(credentials); + + if (state->verbose) + g_print("Client connected.\n" + "Peer credentials: %s\n" + "Negotiated capabilities: unix-fd-passing=%d\n", + s, + g_dbus_connection_get_capabilities(connection) & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING); + + g_object_ref(connection); + g_signal_connect(connection, "closed", G_CALLBACK(connection_closed), (gpointer)state); + registration_id = g_dbus_connection_register_object(connection, + state->path, + introspection_data->interfaces[0], + &interface_vtable, + (gpointer)state, + NULL, /* user_data_free_func */ + NULL); /* GError** */ + g_assert(registration_id > 0); + + g_free(s); + return TRUE; +} + +int Receive(int size, const char *name, const char *path, const char *number, bool is_lt, bool verbose) +{ + gint ret = 1; + GError *error = NULL; + GMainLoop *loop; + + GDBusAuthObserver *observer; + GDBusServerFlags server_flags = G_DBUS_SERVER_FLAGS_NONE | G_DBUS_SERVER_FLAGS_AUTHENTICATION_ALLOW_ANONYMOUS; + GDBusServer *server; + gchar *guid; + + char *buf = malloc(size); + + loop = g_main_loop_new(NULL, FALSE); + + static gchar introspection_xml[5000]; + sprintf(introspection_xml, + "" + " " + " " + " " + " " + " " + " " + "", name); + introspection_data = g_dbus_node_info_new_for_xml(introspection_xml, NULL); + + State state; + state.iters = WARMUP_TRY + n_real_tries; + state.is_lt = is_lt; + state.buf = buf; + state.main_loop = loop; + state.verbose = verbose; + state.path = path; + + guid = g_dbus_generate_guid (); + + char svc_name[strlen(SVC_NAME) + strlen(number) + 1]; + strncpy(&svc_name[0], SVC_NAME, strlen(SVC_NAME)+1); + strcpy(&svc_name[strlen(SVC_NAME)], number); + + unlink(svc_name); + + char dbus_test_svc_name[strlen(DBUS_TEST_SVC_NAME) + strlen(number) + 1]; + strncpy(&dbus_test_svc_name[0], DBUS_TEST_SVC_NAME, strlen(DBUS_TEST_SVC_NAME)+1); + strcpy(&dbus_test_svc_name[strlen(DBUS_TEST_SVC_NAME)], number); + + observer = g_dbus_auth_observer_new(); + if (observer == NULL) { + g_printerr("Error creating authenticating observer object."); + g_error_free(error); + goto out; + } + g_signal_connect(observer, + "allow-mechanism", + G_CALLBACK(allow_mechanism_cb), + (gpointer)&state); + g_signal_connect(observer, + "authorize-authenticated-peer", + G_CALLBACK(authorize_authenticated_peer_cb), + (gpointer)&state); + + server = g_dbus_server_new_sync (dbus_test_svc_name, + server_flags, + guid, + observer, + NULL, /* GCancellable */ + &error); + + if (server == NULL) { + g_printerr("Error creating server at address %s: %s\n", dbus_test_svc_name, error->message); + g_error_free(error); + goto out; + } + + g_dbus_server_start(server); + + g_object_unref(observer); + g_free (guid); + + if (verbose) + g_print("Server is listening at: %s\n", g_dbus_server_get_client_address(server)); + + g_signal_connect(server, + "new-connection", + G_CALLBACK(on_new_connection), + (gpointer)&state); + + // Run main GLib loop + g_main_loop_run(loop); + + g_object_unref(server); + g_main_loop_unref(loop); + + g_dbus_node_info_unref(introspection_data); + + free(buf); + ret = 0; + +out: + return ret; +} + +void Send(int size, const char *name, const char *path, const char *number, bool is_lt, bool verbose) +{ + GDBusConnection *connection; + GError *error = NULL; + register char *cptr = malloc(size); + unsigned long long start; + int i, j; + + char dbus_test_svc_name[strlen(DBUS_TEST_SVC_NAME) + strlen(number) + 1]; + strncpy(&dbus_test_svc_name[0], DBUS_TEST_SVC_NAME, strlen(DBUS_TEST_SVC_NAME)+1); + strcpy(&dbus_test_svc_name[strlen(DBUS_TEST_SVC_NAME)], number); + + connection = g_dbus_connection_new_for_address_sync(dbus_test_svc_name, + G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_CLIENT, + NULL, /* GDBusAuthObserver */ + NULL, /* GCancellable */ + &error); + if (connection == NULL) + { + g_printerr("Error connecting to D-Bus address %s: %s\n", dbus_test_svc_name, error->message); + g_error_free(error); + free(cptr); + return; + } + + if (verbose) { + g_print("Client connected.\n" + "Negotiated capabilities: unix-fd-passing=%d\n", + g_dbus_connection_get_capabilities(connection) & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING); + } + + sleep(2); + for (i = 0; i < WARMUP_TRY + n_real_tries; ++i) { + for (j = 0; j < size-1; ++j) + cptr[j] = '!' + (j + i) % ('~' - '!' + 1); + cptr[size-1] = '\0'; + + if (i >= WARMUP_TRY) + cptr[0] = 'R'; + else + cptr[0] = 'W'; + + if (is_lt) + start = get_current_time(); + else + start = 0; + + g_dbus_connection_call(connection, + NULL, /* bus_name = NULL - peer to peer connection */ + path, /* object_path */ + name, /* interface_name */ + "Perf", /* method_name */ + g_variant_new("(ts)", start, cptr), + NULL, + G_DBUS_CALL_FLAGS_NONE, + -1, + NULL, + NULL, + NULL); + if (is_lt) { + if (size <= ONE_PAGE_SIZE) + usleep(5000); + else if (size <= 10 * ONE_PAGE_SIZE) + usleep(10000); + else if (size <= 100 * ONE_PAGE_SIZE) + usleep(20000); + else + usleep(40000); + } + } + + if (!is_lt) + sleep(5); + g_dbus_connection_close(connection, NULL, NULL, NULL); + g_object_unref(connection); + free(cptr); +} + +void Measure_latency(bool verbose) +{ + if (!cpu_pin(0, getpid())) { + perror("CPU pinning for parent"); + exit(1); + } + + int pid = fork(); + switch (pid) { + case -1: /* fork error */ + perror("fork"); + return; + case 0: /* child process */ + if (!cpu_pin(!cpu_pin_to_same_core, getpid())) { + perror("CPU pinning for child"); + exit(1); + } + sleep(1); + Send(msize, "org.gtk.GDBus.TestPeerInterface", "/org/gtk/GDBus/TestObject", "0", true, verbose); + exit(0); + default: /* parent process */ + Receive(msize, "org.gtk.GDBus.TestPeerInterface", "/org/gtk/GDBus/TestObject", "0", true, verbose); + g_print("min: %0.4f(us), max: %0.4f(us), avg: %0.4f(us)\n", min, max, total/lt_cnt); + waitpid(pid, NULL, 0); /* wait for the child */ + } +} + +void Measure_bandwidth(bool verbose) +{ + int ret; + int returnp[nprocs/2][2]; + int pids[nprocs]; + int ready_fd; + volatile char *ready_ptr; + + ready_fd = shm_open("isready", O_CREAT | O_TRUNC | O_RDWR, 0666); + if (ready_fd == -1) + { + perror("Open"); + return; + } + if (ftruncate(ready_fd, sizeof(char))) { + perror("Ftruncate"); + return; + } + ready_ptr = mmap(0, sizeof(char), PROT_READ | PROT_WRITE, + MAP_SHARED, ready_fd, 0); + if (ready_ptr == MAP_FAILED) { + perror("Mmap"); + return; + } + close(ready_fd); + ready_ptr[0] = 'n'; + + for (int i = 0; i < nprocs/2; i++) { + if (pipe(returnp[i]) == -1) { + perror("pipe"); + exit(1); + } + } + + if (!cpu_pin(0, getpid())) { + perror("CPU pinning for parent"); + exit(1); + } + + for (int child = 1; child < nprocs; child++) { + char name[30]; + char path[30]; + char number[11]; + int pinning_no; + + sprintf(name, "org.gdbus.server%d", child/2); + sprintf(path, "/org/gdbus/server%d", child/2); + sprintf(number, "%d", child/2); + int pid = fork(); + switch (pid) { + //child + case 0: + pinning_no = (cpu_pin_to_same_core ? child/2 : child); + if (!cpu_pin(pinning_no, getpid())) { + perror("CPU pinning for child"); + exit(1); + } + + // Wait for parent + while (ready_ptr[0] == 'n') { + } + + if (child % 2 == 0) { + Receive(msize, (const char *)name, (const char *)path, (const char *)number, false, verbose); + + //send result(bandwidth) to parent + { + int wres; + close(returnp[child/2][0]); + wres = write(returnp[child/2][1], (void *)&bw, sizeof(unsigned long long)); + if (wres != sizeof(unsigned long long)) { + perror("(write return) read/write on pipe sender"); + exit(1); + } + close(returnp[child/2][1]); + } + } + else { + sleep(1); + Send(msize, (const char *)name, (const char *)path, (const char *)number, false, verbose); + } + + exit(0); + case -1: + perror("fork"); + return; + default: + pids[child] = pid; + break; + } + } + + sleep(1); + ready_ptr[0] = 'y'; + + Receive(msize, "org.gdbus.server0", "/org/gdbus/server0", "0", false, verbose); + for (int child = 1; child < nprocs; child++) { + //receive result(bandwidth) from child(s) + if (child % 2 == 0) { + int rres; + unsigned long long child_bw; + + close(returnp[child/2][1]); + rres = read(returnp[child/2][0], (void *)&child_bw, sizeof(unsigned long long)); + if (rres != sizeof(unsigned long long)) { + perror("(write return) read/write on pipe receiver"); + exit(1); + } + close(returnp[child/2][0]); + bw += child_bw; + } + + waitpid(pids[child], NULL, 0); + } + ret = munmap((void*)ready_ptr, sizeof(bool)); + if (ret != 0) { + perror("munmap"); + exit(1); + } + + ret = shm_unlink("isready"); + if (ret != 0) { + perror("shm unlink"); + //exit(1); + } + printf("Total bandwidth: %llu (/s)\n", bw); +} + +int main(int argc, char *argv[]) +{ + int opt; + msize = 3; + lt_on = bw_on = false; + verbose = false; + + nprocs = get_nprocs(); + n_real_tries = REAL_TRY; + cpu_pin_to_same_core = false; + + while ((opt = getopt(argc, argv, "m:p:blht:cv")) != -1) { + switch(opt) { + case 'm': + msize = atoi(optarg); + break; + case 'p': + nprocs = atoi(optarg); + if (nprocs <= 0) { + printf("The number of process should be larger than 0\n"); + exit(0); + } + if (nprocs % 2 != 0) { + printf("The number of process should be even\n"); + exit(0); + } + break; + case 'b': + bw_on = true; /* bandwidth */ + break; + case 'l': + lt_on = true; /* latency */ + break; + case 't': + n_real_tries = atoi(optarg); + if (n_real_tries <= 0) { + printf("Number of real tries should be larger than 0\n"); + exit(0); + } + break; + case 'c': + cpu_pin_to_same_core = true; + break; + case 'v': + verbose = true; + break; + case 'h': + print_help(argv); + exit(0); + } + } + + if (!lt_on && !bw_on) { + exit(0); + } + + bw = 0; + lt_cnt = 0; + max = total = 0.0; + min = 1000000.0; + + if (lt_on) + Measure_latency(verbose); + if (bw_on) + Measure_bandwidth(verbose); + + return 0; +} + diff --git a/packaging/dbus-tools.spec b/packaging/dbus-tools.spec index d8c3412..d632658 100644 --- a/packaging/dbus-tools.spec +++ b/packaging/dbus-tools.spec @@ -56,3 +56,4 @@ make %{_bindir}/sharedmem %{_bindir}/gdbus %{_bindir}/libdbus +%{_bindir}/p2p-gdbus -- 2.34.1