--- /dev/null
+/*
+ * p2pgdbus.c - gdbus peer to peer IPC latency & bandwidth
+ */
+
+#include <gio/gio.h>
+#include "common.h"
+
+#define SVC_NAME "/tmp/my-test-service"
+#define DBUS_TEST_SVC_NAME "unix:path=" SVC_NAME
+
+int nprocs; /* number of processes (for bandwidth) */
+int msize; /* message size */
+int n_real_tries; /* number of real tries */
+int lt_cnt;
+double min, max, total; /* minimum, maximum and total time (for latency) */
+unsigned long long bw; /* bandwidth (partial result) */
+bool verbose; /* displays details of communication between peers */
+bool cpu_pin_to_same_core; /* specifies, whether client & server processes are pinned
+ to the same core */
+bool lt_on, /* latency test */
+ bw_on; /* bandwidth test */
+
+static GDBusNodeInfo *introspection_data = NULL;
+
+typedef struct _state {
+ int iters; /* number of test iterations */
+ char *buf; /* message buffer */
+ bool is_lt; /* true for latency test, false for bandwidth */
+ GMainLoop *main_loop; /* reference to the main GLib event loop */
+ const char *path; /* object path (for on_new_connection routine) */
+ bool verbose; /* for debug purposes */
+} State;
+
+static unsigned long long get_current_time(void)
+{
+ struct timespec clock;
+ clock_gettime (CLOCK_REALTIME, &clock);
+ return (unsigned long long)clock.tv_sec * NS
+ + (unsigned long long)clock.tv_nsec;
+}
+
+static void handle_method_call(GDBusConnection *connection,
+ const gchar *sender,
+ const gchar *object_path,
+ const gchar *interface_name,
+ const gchar *method_name,
+ GVariant *parameters,
+ GDBusMethodInvocation *invocation,
+ gpointer user_data)
+{
+ static int cnt = 0;
+ static int bw_cnt = 0;
+ static unsigned long long start;
+ unsigned long long end;
+ unsigned long long temp;
+ State *state = (State *)user_data;
+
+ if (g_strcmp0(method_name, "Perf") != 0)
+ return;
+
+ if (state->is_lt)
+ g_variant_get(parameters, "(ts)", &start, &state->buf);
+ else
+ g_variant_get (parameters, "(ts)", &temp, &state->buf);
+
+ g_object_unref(invocation);
+
+ if (++cnt == state->iters) {
+ cnt = 0;
+ g_main_loop_quit(state->main_loop);
+ }
+
+ if (state->buf[0] != 'R') /* skip warm-up tries */
+ return;
+
+ if (state->is_lt) { /* we're measuring latency */
+ end = get_current_time();
+ double lt = (double)(end - start) / 1000;
+ if (state->verbose)
+ g_print("latency: %0.4f\n", lt);
+ if (lt > 0) { /* latency */
+ lt_cnt++;
+ total += lt;
+ if (lt > max)
+ max = lt;
+ if (lt < min)
+ min = lt;
+ }
+ }
+ else { /* we're measuring bandwidth */
+ bw_cnt++;
+ if (state->verbose)
+ printf("-> bw_cnt = %d (from: %s)\n", bw_cnt, interface_name);
+ if (bw_cnt == 1) {
+ start = get_current_time();
+ }
+
+ if (bw_cnt == n_real_tries) {
+ end = get_current_time();
+ bw = ((unsigned long long)NS * bw_cnt)/(end - start);
+ printf("bandwidth: %llu (/s)\n", bw);
+ }
+ }
+
+ if (state->verbose) {
+ unsigned long long ts = (state->is_lt ? start : temp);
+ char timestamp[30] = "";
+ if (ts > 0)
+ sprintf(timestamp, "[%llu]", ts);
+ if (msize > 64) {
+ state->buf[64] = '\0'; /* truncated the msg deliberately for display purposes */
+ g_print("Client said: %s (TRUNCATED) %s\n", state->buf, timestamp);
+ }
+ else
+ g_print("Client said: %s %s\n", state->buf, timestamp);
+ }
+}
+
+static const GDBusInterfaceVTable interface_vtable = {
+ handle_method_call,
+ NULL,
+ NULL,
+};
+
+static gboolean allow_mechanism_cb(GDBusAuthObserver *observer,
+ const gchar *mechanism,
+ G_GNUC_UNUSED gpointer user_data)
+{
+ State *state = (State *)user_data;
+ if (state->verbose)
+ g_print ("Considering whether to accept %s authentication...\n", mechanism);
+ return TRUE;
+}
+
+static gboolean authorize_authenticated_peer_cb(GDBusAuthObserver *observer,
+ G_GNUC_UNUSED GIOStream *stream,
+ GCredentials *credentials,
+ G_GNUC_UNUSED gpointer user_data)
+{
+ State *state = (State *)user_data;
+ gboolean authorized = FALSE;
+
+ if (state->verbose)
+ g_print("Considering whether to authorize authenticated peer...\n");
+
+ if (credentials != NULL) {
+ GCredentials *own_credentials;
+ gchar *credentials_string = NULL;
+ // client's credentials
+ credentials_string = g_credentials_to_string(credentials);
+ if (state->verbose)
+ g_print("Peer's credentials: %s\n", credentials_string);
+ g_free(credentials_string);
+ // server's credentials
+ own_credentials = g_credentials_new();
+ credentials_string = g_credentials_to_string(own_credentials);
+ if (state->verbose)
+ g_print("Server's credentials: %s\n", credentials_string);
+ g_free(credentials_string);
+
+ if (g_credentials_is_same_user(credentials, own_credentials, NULL))
+ authorized = TRUE;
+
+ g_object_unref(own_credentials);
+ }
+
+ if (!authorized) {
+ if (state->verbose) {
+ g_print("A server would often not want to authorize this identity\n");
+ g_print("Authorizing it anyway for demonstration purposes\n");
+ }
+ authorized = TRUE;
+ }
+
+ return authorized;
+}
+
+static void connection_closed(GDBusConnection *connection,
+ gboolean remote_peer_vanished,
+ GError *Error,
+ gpointer user_data)
+{
+ State *state = (State *)user_data;
+ if (state->verbose)
+ g_print("Client disconnected.\n");
+ g_object_unref(connection);
+}
+
+static gboolean on_new_connection(GDBusServer *server,
+ GDBusConnection *connection,
+ gpointer user_data)
+{
+ guint registration_id;
+ GCredentials *credentials;
+ gchar *s;
+ State *state = (State *)user_data;
+
+ credentials = g_dbus_connection_get_peer_credentials(connection);
+ if (credentials == NULL)
+ s = g_strdup("(no credentials received)");
+ else
+ s = g_credentials_to_string(credentials);
+
+ if (state->verbose)
+ g_print("Client connected.\n"
+ "Peer credentials: %s\n"
+ "Negotiated capabilities: unix-fd-passing=%d\n",
+ s,
+ g_dbus_connection_get_capabilities(connection) & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING);
+
+ g_object_ref(connection);
+ g_signal_connect(connection, "closed", G_CALLBACK(connection_closed), (gpointer)state);
+ registration_id = g_dbus_connection_register_object(connection,
+ state->path,
+ introspection_data->interfaces[0],
+ &interface_vtable,
+ (gpointer)state,
+ NULL, /* user_data_free_func */
+ NULL); /* GError** */
+ g_assert(registration_id > 0);
+
+ g_free(s);
+ return TRUE;
+}
+
+int Receive(int size, const char *name, const char *path, const char *number, bool is_lt, bool verbose)
+{
+ gint ret = 1;
+ GError *error = NULL;
+ GMainLoop *loop;
+
+ GDBusAuthObserver *observer;
+ GDBusServerFlags server_flags = G_DBUS_SERVER_FLAGS_NONE | G_DBUS_SERVER_FLAGS_AUTHENTICATION_ALLOW_ANONYMOUS;
+ GDBusServer *server;
+ gchar *guid;
+
+ char *buf = malloc(size);
+
+ loop = g_main_loop_new(NULL, FALSE);
+
+ static gchar introspection_xml[5000];
+ sprintf(introspection_xml,
+ "<node>"
+ " <interface name='%s'>"
+ " <method name='Perf'>"
+ " <arg type='t' name='Start' direction='in'/>"
+ " <arg type='s' name='Msg' direction='in'/>"
+ " </method>"
+ " </interface>"
+ "</node>", name);
+ introspection_data = g_dbus_node_info_new_for_xml(introspection_xml, NULL);
+
+ State state;
+ state.iters = WARMUP_TRY + n_real_tries;
+ state.is_lt = is_lt;
+ state.buf = buf;
+ state.main_loop = loop;
+ state.verbose = verbose;
+ state.path = path;
+
+ guid = g_dbus_generate_guid ();
+
+ char svc_name[strlen(SVC_NAME) + strlen(number) + 1];
+ strncpy(&svc_name[0], SVC_NAME, strlen(SVC_NAME)+1);
+ strcpy(&svc_name[strlen(SVC_NAME)], number);
+
+ unlink(svc_name);
+
+ char dbus_test_svc_name[strlen(DBUS_TEST_SVC_NAME) + strlen(number) + 1];
+ strncpy(&dbus_test_svc_name[0], DBUS_TEST_SVC_NAME, strlen(DBUS_TEST_SVC_NAME)+1);
+ strcpy(&dbus_test_svc_name[strlen(DBUS_TEST_SVC_NAME)], number);
+
+ observer = g_dbus_auth_observer_new();
+ if (observer == NULL) {
+ g_printerr("Error creating authenticating observer object.");
+ g_error_free(error);
+ goto out;
+ }
+ g_signal_connect(observer,
+ "allow-mechanism",
+ G_CALLBACK(allow_mechanism_cb),
+ (gpointer)&state);
+ g_signal_connect(observer,
+ "authorize-authenticated-peer",
+ G_CALLBACK(authorize_authenticated_peer_cb),
+ (gpointer)&state);
+
+ server = g_dbus_server_new_sync (dbus_test_svc_name,
+ server_flags,
+ guid,
+ observer,
+ NULL, /* GCancellable */
+ &error);
+
+ if (server == NULL) {
+ g_printerr("Error creating server at address %s: %s\n", dbus_test_svc_name, error->message);
+ g_error_free(error);
+ goto out;
+ }
+
+ g_dbus_server_start(server);
+
+ g_object_unref(observer);
+ g_free (guid);
+
+ if (verbose)
+ g_print("Server is listening at: %s\n", g_dbus_server_get_client_address(server));
+
+ g_signal_connect(server,
+ "new-connection",
+ G_CALLBACK(on_new_connection),
+ (gpointer)&state);
+
+ // Run main GLib loop
+ g_main_loop_run(loop);
+
+ g_object_unref(server);
+ g_main_loop_unref(loop);
+
+ g_dbus_node_info_unref(introspection_data);
+
+ free(buf);
+ ret = 0;
+
+out:
+ return ret;
+}
+
+void Send(int size, const char *name, const char *path, const char *number, bool is_lt, bool verbose)
+{
+ GDBusConnection *connection;
+ GError *error = NULL;
+ register char *cptr = malloc(size);
+ unsigned long long start;
+ int i, j;
+
+ char dbus_test_svc_name[strlen(DBUS_TEST_SVC_NAME) + strlen(number) + 1];
+ strncpy(&dbus_test_svc_name[0], DBUS_TEST_SVC_NAME, strlen(DBUS_TEST_SVC_NAME)+1);
+ strcpy(&dbus_test_svc_name[strlen(DBUS_TEST_SVC_NAME)], number);
+
+ connection = g_dbus_connection_new_for_address_sync(dbus_test_svc_name,
+ G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_CLIENT,
+ NULL, /* GDBusAuthObserver */
+ NULL, /* GCancellable */
+ &error);
+ if (connection == NULL)
+ {
+ g_printerr("Error connecting to D-Bus address %s: %s\n", dbus_test_svc_name, error->message);
+ g_error_free(error);
+ free(cptr);
+ return;
+ }
+
+ if (verbose) {
+ g_print("Client connected.\n"
+ "Negotiated capabilities: unix-fd-passing=%d\n",
+ g_dbus_connection_get_capabilities(connection) & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING);
+ }
+
+ sleep(2);
+ for (i = 0; i < WARMUP_TRY + n_real_tries; ++i) {
+ for (j = 0; j < size-1; ++j)
+ cptr[j] = '!' + (j + i) % ('~' - '!' + 1);
+ cptr[size-1] = '\0';
+
+ if (i >= WARMUP_TRY)
+ cptr[0] = 'R';
+ else
+ cptr[0] = 'W';
+
+ if (is_lt)
+ start = get_current_time();
+ else
+ start = 0;
+
+ g_dbus_connection_call(connection,
+ NULL, /* bus_name = NULL - peer to peer connection */
+ path, /* object_path */
+ name, /* interface_name */
+ "Perf", /* method_name */
+ g_variant_new("(ts)", start, cptr),
+ NULL,
+ G_DBUS_CALL_FLAGS_NONE,
+ -1,
+ NULL,
+ NULL,
+ NULL);
+ if (is_lt) {
+ if (size <= ONE_PAGE_SIZE)
+ usleep(5000);
+ else if (size <= 10 * ONE_PAGE_SIZE)
+ usleep(10000);
+ else if (size <= 100 * ONE_PAGE_SIZE)
+ usleep(20000);
+ else
+ usleep(40000);
+ }
+ }
+
+ if (!is_lt)
+ sleep(5);
+ g_dbus_connection_close(connection, NULL, NULL, NULL);
+ g_object_unref(connection);
+ free(cptr);
+}
+
+void Measure_latency(bool verbose)
+{
+ if (!cpu_pin(0, getpid())) {
+ perror("CPU pinning for parent");
+ exit(1);
+ }
+
+ int pid = fork();
+ switch (pid) {
+ case -1: /* fork error */
+ perror("fork");
+ return;
+ case 0: /* child process */
+ if (!cpu_pin(!cpu_pin_to_same_core, getpid())) {
+ perror("CPU pinning for child");
+ exit(1);
+ }
+ sleep(1);
+ Send(msize, "org.gtk.GDBus.TestPeerInterface", "/org/gtk/GDBus/TestObject", "0", true, verbose);
+ exit(0);
+ default: /* parent process */
+ Receive(msize, "org.gtk.GDBus.TestPeerInterface", "/org/gtk/GDBus/TestObject", "0", true, verbose);
+ g_print("min: %0.4f(us), max: %0.4f(us), avg: %0.4f(us)\n", min, max, total/lt_cnt);
+ waitpid(pid, NULL, 0); /* wait for the child */
+ }
+}
+
+void Measure_bandwidth(bool verbose)
+{
+ int ret;
+ int returnp[nprocs/2][2];
+ int pids[nprocs];
+ int ready_fd;
+ volatile char *ready_ptr;
+
+ ready_fd = shm_open("isready", O_CREAT | O_TRUNC | O_RDWR, 0666);
+ if (ready_fd == -1)
+ {
+ perror("Open");
+ return;
+ }
+ if (ftruncate(ready_fd, sizeof(char))) {
+ perror("Ftruncate");
+ return;
+ }
+ ready_ptr = mmap(0, sizeof(char), PROT_READ | PROT_WRITE,
+ MAP_SHARED, ready_fd, 0);
+ if (ready_ptr == MAP_FAILED) {
+ perror("Mmap");
+ return;
+ }
+ close(ready_fd);
+ ready_ptr[0] = 'n';
+
+ for (int i = 0; i < nprocs/2; i++) {
+ if (pipe(returnp[i]) == -1) {
+ perror("pipe");
+ exit(1);
+ }
+ }
+
+ if (!cpu_pin(0, getpid())) {
+ perror("CPU pinning for parent");
+ exit(1);
+ }
+
+ for (int child = 1; child < nprocs; child++) {
+ char name[30];
+ char path[30];
+ char number[11];
+ int pinning_no;
+
+ sprintf(name, "org.gdbus.server%d", child/2);
+ sprintf(path, "/org/gdbus/server%d", child/2);
+ sprintf(number, "%d", child/2);
+ int pid = fork();
+ switch (pid) {
+ //child
+ case 0:
+ pinning_no = (cpu_pin_to_same_core ? child/2 : child);
+ if (!cpu_pin(pinning_no, getpid())) {
+ perror("CPU pinning for child");
+ exit(1);
+ }
+
+ // Wait for parent
+ while (ready_ptr[0] == 'n') {
+ }
+
+ if (child % 2 == 0) {
+ Receive(msize, (const char *)name, (const char *)path, (const char *)number, false, verbose);
+
+ //send result(bandwidth) to parent
+ {
+ int wres;
+ close(returnp[child/2][0]);
+ wres = write(returnp[child/2][1], (void *)&bw, sizeof(unsigned long long));
+ if (wres != sizeof(unsigned long long)) {
+ perror("(write return) read/write on pipe sender");
+ exit(1);
+ }
+ close(returnp[child/2][1]);
+ }
+ }
+ else {
+ sleep(1);
+ Send(msize, (const char *)name, (const char *)path, (const char *)number, false, verbose);
+ }
+
+ exit(0);
+ case -1:
+ perror("fork");
+ return;
+ default:
+ pids[child] = pid;
+ break;
+ }
+ }
+
+ sleep(1);
+ ready_ptr[0] = 'y';
+
+ Receive(msize, "org.gdbus.server0", "/org/gdbus/server0", "0", false, verbose);
+ for (int child = 1; child < nprocs; child++) {
+ //receive result(bandwidth) from child(s)
+ if (child % 2 == 0) {
+ int rres;
+ unsigned long long child_bw;
+
+ close(returnp[child/2][1]);
+ rres = read(returnp[child/2][0], (void *)&child_bw, sizeof(unsigned long long));
+ if (rres != sizeof(unsigned long long)) {
+ perror("(write return) read/write on pipe receiver");
+ exit(1);
+ }
+ close(returnp[child/2][0]);
+ bw += child_bw;
+ }
+
+ waitpid(pids[child], NULL, 0);
+ }
+ ret = munmap((void*)ready_ptr, sizeof(bool));
+ if (ret != 0) {
+ perror("munmap");
+ exit(1);
+ }
+
+ ret = shm_unlink("isready");
+ if (ret != 0) {
+ perror("shm unlink");
+ //exit(1);
+ }
+ printf("Total bandwidth: %llu (/s)\n", bw);
+}
+
+int main(int argc, char *argv[])
+{
+ int opt;
+ msize = 3;
+ lt_on = bw_on = false;
+ verbose = false;
+
+ nprocs = get_nprocs();
+ n_real_tries = REAL_TRY;
+ cpu_pin_to_same_core = false;
+
+ while ((opt = getopt(argc, argv, "m:p:blht:cv")) != -1) {
+ switch(opt) {
+ case 'm':
+ msize = atoi(optarg);
+ break;
+ case 'p':
+ nprocs = atoi(optarg);
+ if (nprocs <= 0) {
+ printf("The number of process should be larger than 0\n");
+ exit(0);
+ }
+ if (nprocs % 2 != 0) {
+ printf("The number of process should be even\n");
+ exit(0);
+ }
+ break;
+ case 'b':
+ bw_on = true; /* bandwidth */
+ break;
+ case 'l':
+ lt_on = true; /* latency */
+ break;
+ case 't':
+ n_real_tries = atoi(optarg);
+ if (n_real_tries <= 0) {
+ printf("Number of real tries should be larger than 0\n");
+ exit(0);
+ }
+ break;
+ case 'c':
+ cpu_pin_to_same_core = true;
+ break;
+ case 'v':
+ verbose = true;
+ break;
+ case 'h':
+ print_help(argv);
+ exit(0);
+ }
+ }
+
+ if (!lt_on && !bw_on) {
+ exit(0);
+ }
+
+ bw = 0;
+ lt_cnt = 0;
+ max = total = 0.0;
+ min = 1000000.0;
+
+ if (lt_on)
+ Measure_latency(verbose);
+ if (bw_on)
+ Measure_bandwidth(verbose);
+
+ return 0;
+}
+