introduce-multiple-client-ping.patch
authorAndy Green <andy@warmcat.com>
Sun, 30 Jan 2011 12:15:25 +0000 (12:15 +0000)
committerAndy Green <andy@warmcat.com>
Sun, 30 Jan 2011 12:15:25 +0000 (12:15 +0000)
Signed-off-by: Andy Green <andy@warmcat.com>
test-server/test-ping.c

index 83febbe..7b901ec 100644 (file)
@@ -43,6 +43,8 @@
  */
 #define MAX_PING_PAYLOAD 125
 #define MAX_MIRROR_PAYLOAD 4096
+#define MAX_PING_CLIENTS 256
+#define PING_RINGBUFFER_SIZE 256
 
 static unsigned int interval_us = 1000000;
 static unsigned int size = 64;
@@ -50,16 +52,18 @@ static int flood;
 static const char *address;
 static unsigned char pingbuf[LWS_SEND_BUFFER_PRE_PADDING + MAX_MIRROR_PAYLOAD +
                                                  LWS_SEND_BUFFER_POST_PADDING];
-static unsigned long ping_index = 1;
 static char *hname = "(unknown)";
-static unsigned long rx_count;
 static unsigned long started;
+static int screen_width = 80;
+static int use_mirror;
 
 static unsigned long rtt_min = 100000000;
 static unsigned long rtt_max;
 static unsigned long rtt_avg;
-static int screen_width = 80;
-static int use_mirror;
+static unsigned long global_rx_count;
+static unsigned long global_tx_count;
+static int clients = 1;
+static unsigned long interrupted_time;
 
 struct ping {
        unsigned long issue_timestamp;
@@ -67,11 +71,15 @@ struct ping {
        unsigned int seen;
 };
 
-#define PING_RINGBUFFER_SIZE 256
+struct per_session_data__ping {
+       unsigned long ping_index;
+
+       struct ping ringbuffer[PING_RINGBUFFER_SIZE];
+       int ringbuffer_head;
+       int ringbuffer_tail;
 
-struct ping ringbuffer[PING_RINGBUFFER_SIZE];
-int ringbuffer_head;
-int ringbuffer_tail;
+       unsigned long rx_count;
+};
 
 /*
  * uses the ping pong protocol features to provide an equivalent for the
@@ -99,10 +107,16 @@ callback_lws_mirror(struct libwebsocket *wsi,
        unsigned long iv;
        int n;
        int match = 0;
+       struct per_session_data__ping *psd = user;
 
        switch (reason) {
        case LWS_CALLBACK_CLIENT_ESTABLISHED:
 
+               psd->rx_count = 0;
+               psd->ping_index = 1;
+               psd->ringbuffer_head = 0;
+               psd->ringbuffer_tail = 0;
+
                /*
                 * start the ball rolling,
                 * LWS_CALLBACK_CLIENT_WRITEABLE will come next service
@@ -116,7 +130,7 @@ callback_lws_mirror(struct libwebsocket *wsi,
                gettimeofday(&tv, NULL);
                iv = (tv.tv_sec * 1000000) + tv.tv_usec;
 
-               rx_count++;
+               psd->rx_count++;
 
                shift = 56;
                p = in;
@@ -128,16 +142,16 @@ callback_lws_mirror(struct libwebsocket *wsi,
                }
 
                /* find it in the ringbuffer, look backwards from head */
-               n = ringbuffer_head;
+               n = psd->ringbuffer_head;
                while (!match) {
 
-                       if (ringbuffer[n].index == l) {
-                               ringbuffer[n].seen++;
+                       if (psd->ringbuffer[n].index == l) {
+                               psd->ringbuffer[n].seen++;
                                match = 1;
                                continue;
                        }
 
-                       if (n == ringbuffer_tail) {
+                       if (n == psd->ringbuffer_tail) {
                                match = -1;
                                continue;
                        }
@@ -159,23 +173,23 @@ callback_lws_mirror(struct libwebsocket *wsi,
                        break;
                }
 
-               if (ringbuffer[n].seen > 1)
+               if (psd->ringbuffer[n].seen > 1)
                        fprintf(stderr, "DUP! ");
 
-               if ((iv - ringbuffer[n].issue_timestamp) < rtt_min)
-                       rtt_min = iv - ringbuffer[n].issue_timestamp;
-
-               if ((iv - ringbuffer[n].issue_timestamp) > rtt_max)
-                       rtt_max = iv - ringbuffer[n].issue_timestamp;
+               if ((iv - psd->ringbuffer[n].issue_timestamp) < rtt_min)
+                       rtt_min = iv - psd->ringbuffer[n].issue_timestamp;
 
-               rtt_avg += iv - ringbuffer[n].issue_timestamp;
+               if ((iv - psd->ringbuffer[n].issue_timestamp) > rtt_max)
+                       rtt_max = iv - psd->ringbuffer[n].issue_timestamp;
 
+               rtt_avg += iv - psd->ringbuffer[n].issue_timestamp;
+               global_rx_count++;
 
                if (!flood)
                        fprintf(stderr, "%d bytes from %s: req=%ld "
                                "time=%lu.%lums\n", (int)len, address, l,
-                               (iv - ringbuffer[n].issue_timestamp) / 1000,
-                            ((iv - ringbuffer[n].issue_timestamp) / 100) % 10);
+                              (iv - psd->ringbuffer[n].issue_timestamp) / 1000,
+                       ((iv - psd->ringbuffer[n].issue_timestamp) / 100) % 10);
                else
                        fprintf(stderr, "\b \b");
                break;
@@ -185,32 +199,36 @@ callback_lws_mirror(struct libwebsocket *wsi,
                shift = 56;
                p = &pingbuf[LWS_SEND_BUFFER_PRE_PADDING];
 
+               /* 64-bit ping index in network byte order */
+
                while (shift >= 0) {
-                       *p++ = ping_index >> shift;
+                       *p++ = psd->ping_index >> shift;
                        shift -= 8;
                }
 
                gettimeofday(&tv, NULL);
 
-               ringbuffer[ringbuffer_head].issue_timestamp =
+               psd->ringbuffer[psd->ringbuffer_head].issue_timestamp =
                                             (tv.tv_sec * 1000000) + tv.tv_usec;
-               ringbuffer[ringbuffer_head].index = ping_index++;
-               ringbuffer[ringbuffer_head].seen = 0;
+               psd->ringbuffer[psd->ringbuffer_head].index = psd->ping_index++;
+               psd->ringbuffer[psd->ringbuffer_head].seen = 0;
 
-               if (ringbuffer_head == PING_RINGBUFFER_SIZE - 1)
-                       ringbuffer_head = 0;
+               if (psd->ringbuffer_head == PING_RINGBUFFER_SIZE - 1)
+                       psd->ringbuffer_head = 0;
                else
-                       ringbuffer_head++;
+                       psd->ringbuffer_head++;
 
-               /* snip any re-used tail so we keep the whole buffer length */
+               /* snip any re-used tail so we keep to the ring length */
 
-               if (ringbuffer_tail == ringbuffer_head) {
-                       if (ringbuffer_tail == PING_RINGBUFFER_SIZE - 1)
-                               ringbuffer_tail = 0;
+               if (psd->ringbuffer_tail == psd->ringbuffer_head) {
+                       if (psd->ringbuffer_tail == PING_RINGBUFFER_SIZE - 1)
+                               psd->ringbuffer_tail = 0;
                        else
-                               ringbuffer_tail++;
+                               psd->ringbuffer_tail++;
                }
 
+               global_tx_count++;
+
                if (use_mirror)
                        libwebsocket_write(wsi,
                                &pingbuf[LWS_SEND_BUFFER_PRE_PADDING],
@@ -220,7 +238,8 @@ callback_lws_mirror(struct libwebsocket *wsi,
                                &pingbuf[LWS_SEND_BUFFER_PRE_PADDING],
                                                          size, LWS_WRITE_PING);
 
-               if (flood && (ping_index - rx_count) < (screen_width - 1))
+               if (flood &&
+                        (psd->ping_index - psd->rx_count) < (screen_width - 1))
                        fprintf(stderr, ".");
                break;
 
@@ -239,6 +258,7 @@ static struct libwebsocket_protocols protocols[] = {
        [PROTOCOL_LWS_MIRROR] = {
                .name = "lws-mirror-protocol",
                .callback = callback_lws_mirror,
+               .per_session_data_size = sizeof (struct per_session_data__ping),
        },
        [DEMO_PROTOCOL_COUNT] = {  /* end of list */
                .callback = NULL
@@ -254,6 +274,7 @@ static struct option options[] = {
        { "protocol",   required_argument,      NULL, 'n' },
        { "flood",      no_argument,            NULL, 'f' },
        { "mirror",     no_argument,            NULL, 'm' },
+       { "replicate",  required_argument,      NULL, 'r' },
        { NULL, 0, 0, 0 }
 };
 
@@ -262,24 +283,9 @@ static void
 signal_handler(int sig, siginfo_t *si, void *v)
 {
        struct timeval tv;
-       unsigned long l;
 
        gettimeofday(&tv, NULL);
-       l = (tv.tv_sec * 1000000) + tv.tv_usec;
-
-       fprintf(stderr, "\n--- %s websocket ping statistics ---\n"
-               "%lu packets transmitted, %lu received, "
-               "%lu%% packet loss, time %ldms\n"
-               "rtt min/avg/max = %0.3f/%0.3f/%0.3f ms\n",
-               hname, ping_index - 1, rx_count,
-               (((ping_index - 1) - rx_count) * 100) / (ping_index - 1),
-               (l - started) / 1000,
-               ((double)rtt_min) / 1000.0,
-               ((double)rtt_avg / rx_count) / 1000.0,
-               ((double)rtt_max) / 1000.0
-       );
-
-       exit(0);
+       interrupted_time = (tv.tv_sec * 1000000) + tv.tv_usec;
 }
 
 
@@ -289,7 +295,7 @@ int main(int argc, char **argv)
        int port = 7681;
        int use_ssl = 0;
        struct libwebsocket_context *context;
-       struct libwebsocket *wsi_mirror;
+       struct libwebsocket *wsi[MAX_PING_CLIENTS];
        char protocol_name[256];
        unsigned int len;
        struct sockaddr_in sin;
@@ -301,6 +307,7 @@ int main(int argc, char **argv)
        struct timeval tv;
        struct winsize w;
        unsigned long oldus = 0;
+       unsigned long l;
 
        if (argc < 2)
                goto usage;
@@ -309,7 +316,7 @@ int main(int argc, char **argv)
        optind++;
 
        while (n >= 0) {
-               n = getopt_long(argc, argv, "hmfts:n:i:p:", options, NULL);
+               n = getopt_long(argc, argv, "r:hmfts:n:i:p:", options, NULL);
                if (n < 0)
                        continue;
                switch (n) {
@@ -336,6 +343,14 @@ int main(int argc, char **argv)
                case 'f':
                        flood = 1;
                        break;
+               case 'r':
+                       clients = atoi(optarg);
+                       if (clients > MAX_PING_CLIENTS || clients < 1) {
+                               fprintf(stderr, "Max clients supportd = %d\n",
+                                                             MAX_PING_CLIENTS);
+                               return 1;
+                       }
+                       break;
                case 'h':
                        goto usage;
                }
@@ -368,20 +383,22 @@ int main(int argc, char **argv)
                return 1;
        }
 
-       /* create a client websocket using dumb increment protocol */
+       /* create client websockets using dumb increment protocol */
 
-       wsi_mirror = libwebsocket_client_connect(context, address, port,
+       for (n = 0; n < clients; n++) {
+               wsi[n] = libwebsocket_client_connect(context, address, port,
                        use_ssl, "/", libwebsocket_canonical_hostname(context),
                                 "origin", protocols[PROTOCOL_LWS_MIRROR].name);
-
-       if (wsi_mirror == NULL) {
-               fprintf(stderr, "libwebsocket connect failed\n");
-               return -1;
+               if (wsi[n] == NULL) {
+                       fprintf(stderr, "client connnection %d failed to "
+                                                               "connect\n", n);
+                       return 1;
+               }
        }
 
        strcpy(ip, "(unknown)");
        len = sizeof sin;
-       if (getpeername(libwebsocket_get_socket_fd(wsi_mirror),
+       if (getpeername(libwebsocket_get_socket_fd(wsi[0]),
                                            (struct sockaddr *) &sin, &len) < 0)
                perror("getpeername");
        else {
@@ -429,15 +446,25 @@ int main(int argc, char **argv)
 
        n = 0;
        while (n >= 0) {
-               unsigned long l;
 
                gettimeofday(&tv, NULL);
-
                l = (tv.tv_sec * 1000000) + tv.tv_usec;
-               if ((l - oldus) > interval_us) {
-                       libwebsocket_callback_on_writable(wsi_mirror);
-                       oldus = l;
-               }
+
+
+               if (!interrupted_time) {
+                       if ((l - oldus) > interval_us) {
+                               for (n = 0; n < clients; n++)
+                                       libwebsocket_callback_on_writable(wsi[n]);
+                               oldus = l;
+                       }
+               } else
+
+                       /* allow time for in-flight pongs to come */
+               
+                       if ((l - interrupted_time) > 250000) {
+                               n = -1;
+                               continue;
+                       }
 
                if (!interval_us)
                        n = libwebsocket_service(context, 0);
@@ -445,7 +472,29 @@ int main(int argc, char **argv)
                        n = libwebsocket_service(context, 1);
        }
 
-       libwebsocket_client_close(wsi_mirror);
+       /* stats */
+
+       fprintf(stderr, "\n--- %s websocket ping statistics "
+               "using %d connections ---\n"
+               "%lu packets transmitted, %lu received, "
+               "%lu%% packet loss, time %ldms\n"
+               "rtt min/avg/max = %0.3f/%0.3f/%0.3f ms\n"
+               "payload bandwidth average %0.3f KiBytes/sec\n",
+               hname, clients, global_tx_count, global_rx_count,
+               ((global_tx_count - global_rx_count) * 100) / global_tx_count,
+               (l - started) / 1000,
+               ((double)rtt_min) / 1000.0,
+               ((double)rtt_avg / global_rx_count) / 1000.0,
+               ((double)rtt_max) / 1000.0,
+               ((double)global_rx_count * (double)size) /
+                                 ((double)(l - started) / 1000000.0) / 1024.0);
+
+//     return 0;
+fprintf(stderr, "a\n");
+       for (n = 0; n < clients; n++)
+               libwebsocket_client_close(wsi[n]);
+
+fprintf(stderr, "b\n");
        libwebsocket_context_destroy(context);
 
        return 0;
@@ -457,6 +506,7 @@ usage:
                                             "[--size=<bytes>] "
                                             "[--protocol=<protocolname>] "
                                             "[--mirror] "
+                                            "[--replicate=clients>]"
                                             "\n");
        return 1;
 }