2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
24 * Stress test for dispatch read and write sources.
27 #include "config/config.h"
29 #include <dispatch/dispatch.h>
32 #include <CommonCrypto/CommonDigest.h>
36 #include <netinet/in.h>
39 #include <sys/param.h>
42 static inline size_t max(size_t a, size_t b) {
43 return (a > b) ? a : b;
46 static inline size_t min(size_t a, size_t b) {
47 return (a < b) ? a : b;
52 #define DEBUG(...) do { \
53 if (debug) fprintf(stderr, __VA_ARGS__); \
56 #define assert_errno(str, expr) do { \
58 fprintf(stderr, "%s: %s\n", (str), strerror(errno)); \
62 #define assert_gai_errno(str, expr) do { \
64 fprintf(stderr, "%s: %s\n", (str), gai_strerror(errno)); \
71 * Context structure used by the reader and writer queues.
73 * Writers begin by generating a random length and writing it to the descriptor.
74 * The write buffer is filled with a random byte value and written until empty
75 * or until the total length is reached. The write buffer is refilled with more
76 * random data when empty. Each write updates an MD5 digest which is written to
77 * the descriptor once the total length is reached.
79 * Readers begin by reading the total length of data. The read buffer is filled
80 * and an MD5 digest is computed on the bytes as they are received. Once the
81 * total length of data has be read, an MD5 digest is read from the descriptor
82 * and compared with the computed value.
97 char md5[CC_MD5_DIGEST_LENGTH];
101 create_writer(int wfd, dispatch_block_t completion)
103 dispatch_source_t ds;
104 struct sock_context *ctx = calloc(1, sizeof(struct sock_context));
107 snprintf(ctx->label, sizeof(ctx->label), "writer.fd.%d", wfd);
108 dispatch_queue_t queue = dispatch_queue_create(ctx->label, 0);
110 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, wfd, 0, queue);
112 dispatch_release(queue);
116 len = (arc4random() & 0x7FFF);
119 CC_MD5_Init(&ctx->md5ctx);
121 ctx->buflen = sizeof(len);
123 memcpy(ctx->buf, &len, ctx->buflen);
124 DEBUG("%s: LENGTH %d\n", ctx->label, ctx->len);
126 dispatch_source_set_event_handler(ds, ^{
127 DEBUG("%s: available %ld\n", ctx->label, dispatch_source_get_data(ds));
129 size_t wrsz = min(ctx->len, ctx->buflen);
130 res = write(wfd, &ctx->buf[ctx->offset], wrsz);
131 DEBUG("%s: write(%d, %p, %ld): %ld\n", ctx->label, wfd, &ctx->buf[ctx->offset], wrsz, res);
133 if (ctx->state == DATA) {
134 CC_MD5_Update(&ctx->md5ctx, &ctx->buf[ctx->offset], res);
139 assert(ctx->offset >= 0);
140 assert(ctx->len >= 0);
141 assert(ctx->buflen >= 0);
142 if (ctx->buflen == 0 || ctx->len == 0) {
143 if (ctx->state == LENGTH) {
144 // finished writing length, move on to data.
146 ctx->buflen = sizeof(ctx->buf);
147 char pattern = arc4random() & 0xFF;
148 memset(ctx->buf, pattern, ctx->buflen);
149 } else if (ctx->state == DATA && ctx->len == 0) {
150 // finished writing data, move on to cksum.
152 ctx->len = sizeof(ctx->md5);
153 ctx->buflen = sizeof(ctx->md5);
154 CC_MD5_Final(ctx->md5, &ctx->md5ctx);
155 memcpy(ctx->buf, ctx->md5, ctx->buflen);
156 } else if (ctx->state == DATA) {
157 ctx->buflen = sizeof(ctx->buf);
158 char pattern = arc4random() & 0xFF;
159 memset(ctx->buf, pattern, ctx->buflen);
160 } else if (ctx->state == CKSUM) {
162 dispatch_source_cancel(ds);
168 } else if (res == 0) {
169 assert(ctx->state == DONE);
171 } else if (res == -1 && errno == EAGAIN) {
172 DEBUG("%s: EAGAIN\n", ctx->label);
174 assert_errno("write", res >= 0);
177 dispatch_source_set_cancel_handler(ds, ^{
178 DEBUG("%s: close(%d)\n", ctx->label, wfd);
179 int res = close(wfd);
180 assert_errno("close", res == 0);
182 dispatch_release(ds);
190 create_reader(int rfd, dispatch_block_t completion)
192 dispatch_source_t ds;
193 struct sock_context *ctx = calloc(1, sizeof(struct sock_context));
196 snprintf(ctx->label, sizeof(ctx->label), "reader.fd.%d", rfd);
197 dispatch_queue_t queue = dispatch_queue_create(ctx->label, 0);
199 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, rfd, 0, queue);
201 dispatch_release(queue);
204 ctx->len = sizeof(ctx->len);
205 ctx->buflen = sizeof(ctx->len);
206 CC_MD5_Init(&ctx->md5ctx);
208 dispatch_source_set_event_handler(ds, ^{
209 DEBUG("%s: available %ld\n", ctx->label, dispatch_source_get_data(ds));
211 size_t rdsz = min(ctx->len, ctx->buflen);
212 res = read(rfd, &ctx->buf[ctx->offset], rdsz);
213 DEBUG("%s: read(%d,%p,%ld): %ld\n", ctx->label, rfd, &ctx->buf[ctx->offset], rdsz, res);
215 // log unexpected data lengths...
216 long expected = dispatch_source_get_data(ds);
218 if (actual >= 0 && (actual != expected && actual != rdsz)) {
219 fprintf(stderr, "%s: expected %ld, actual %ld (rdsz = %ld)\n", ctx->label, expected, actual, rdsz);
223 if (ctx->state == DATA) {
224 CC_MD5_Update(&ctx->md5ctx, &ctx->buf[ctx->offset], res);
229 if (ctx->buflen == 0 || ctx->len == 0) {
230 if (ctx->state == LENGTH) {
231 // buffer is full, interpret as uint32_t
232 memcpy(&ctx->len, ctx->buf, sizeof(ctx->len));
233 ctx->len = ntohl(ctx->len);
234 ctx->buflen = sizeof(ctx->buf);
236 } else if (ctx->state == DATA && ctx->len == 0) {
237 CC_MD5_Final(ctx->md5, &ctx->md5ctx);
239 ctx->len = CC_MD5_DIGEST_LENGTH;
240 ctx->buflen = ctx->len;
241 } else if (ctx->state == DATA) {
242 ctx->buflen = sizeof(ctx->buf);
243 } else if (ctx->state == CKSUM) {
245 res = memcmp(ctx->buf, ctx->md5, sizeof(ctx->md5));
247 DEBUG("%s: MD5 FAILURE\n", ctx->label);
253 } else if (res == 0) {
254 assert(ctx->state == DONE);
255 DEBUG("%s: EOF\n", ctx->label);
256 dispatch_source_cancel(ds);
258 assert_errno("read", res >= 0);
261 dispatch_source_set_cancel_handler(ds, ^{
262 DEBUG("%s: close(%d)\n", ctx->label, rfd);
263 int res = close(rfd);
264 assert_errno("close", res == 0);
266 dispatch_release(ds);
277 flags = fcntl(fd, F_GETFL);
280 res = fcntl(fd, F_SETFL, flags);
281 assert_errno("fcntl(F_SETFL,O_NONBLOCK)", res == 0);
285 create_fifo(int *rfd, int *wfd)
290 char path[MAXPATHLEN];
291 strlcpy(path, "/tmp/fd_stress.fifo.XXXXXX", sizeof(path));
296 res = mkfifo(name, 0700);
297 assert_errno(name, res == 0);
299 *rfd = open(name, O_RDONLY | O_NONBLOCK);
300 assert_errno(name, *rfd >= 0);
302 *wfd = open(name, O_WRONLY | O_NONBLOCK);
303 assert_errno(name, *wfd >= 0);
307 create_pipe(int *rfd, int *wfd)
313 assert_errno("pipe", res == 0);
323 create_server_socket(int *rfd, struct sockaddr_in *sa)
327 socklen_t salen = sizeof(*sa);
329 memset(sa, 0, salen);
331 sa->sin_family = AF_INET;
332 sa->sin_port = htons(12345);
333 sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
335 *rfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
336 assert_errno("socket", *rfd >= 0);
339 res = setsockopt(*rfd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
340 assert_errno("setsockopt(SO_REUSEADDR)", res == 0);
343 res = setsockopt(*rfd, SOL_SOCKET, SO_REUSEPORT, &value, sizeof(value));
344 assert_errno("setsockopt(SO_REUSEPORT)", res == 0);
346 res = bind(*rfd, (const struct sockaddr *)sa, salen);
347 assert_errno("bind", res == 0);
349 res = listen(*rfd, 128);
350 assert_errno("listen", res == 0);
354 create_client_socket(int *wfd, const struct sockaddr_in *sa)
358 *wfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
359 assert_errno("socket", *wfd >= 0);
363 res = connect(*wfd, (const struct sockaddr *)sa, sa->sin_len);
364 assert_errno("connect", res == 0 || errno == EINPROGRESS);
372 fprintf(stderr, "usage: fd_stress [-d] iterations width\n");
377 main(int argc, char* argv[])
380 struct sockaddr_in sa;
381 create_server_socket(&serverfd, &sa);
385 while ((ch = getopt(argc, argv, "d")) != -1) {
403 size_t iterations = strtol(argv[0], NULL, 10);
404 size_t width = strtol(argv[1], NULL, 10);
406 if (iterations == 0 || width == 0) {
410 fprintf(stdout, "pid %d\n", getpid());
412 dispatch_group_t group;
413 group = dispatch_group_create();
417 dispatch_queue_t queue = dispatch_queue_create("server", NULL);
419 dispatch_source_t ds;
420 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, serverfd, 0, queue);
422 dispatch_source_set_event_handler(ds, ^{
425 struct sockaddr peer;
428 fd = accept(serverfd, &peer, &peerlen);
429 assert_errno("accept", fd >= 0);
433 char host[NI_MAXHOST], serv[NI_MAXSERV];
436 res = getnameinfo(&peer, peerlen, host, sizeof(host), serv, sizeof(serv), NI_NUMERICHOST|NI_NUMERICSERV);
437 DEBUG("ACCEPTED %d (%s:%s)\n", fd, host, serv);
439 create_reader(fd, ^{ dispatch_group_leave(group); });
445 for (i = 1; i < iterations; ++i) {
446 fprintf(stderr, "iteration %ld\n", i);
449 for (j = 0; j < width; ++j) {
451 dispatch_group_enter(group);
452 create_pipe(&rfd, &wfd);
453 DEBUG("PIPE %d %d\n", rfd, wfd);
454 dispatch_source_t reader;
455 reader = create_reader(rfd, ^{ dispatch_group_leave(group); });
456 create_writer(wfd, ^{});
461 dispatch_group_enter(group);
462 create_client_socket(&clientfd, &sa);
463 DEBUG("CLIENT %d\n", clientfd);
464 create_writer(clientfd, ^{});
466 dispatch_group_enter(group);
467 create_fifo(&rfd, &wfd);
468 DEBUG("FIFO %d %d\n", rfd, wfd);
469 create_writer(wfd, ^{});
470 create_reader(rfd, ^{ dispatch_group_leave(group); });
473 dispatch_group_wait(group, DISPATCH_TIME_FOREVER);
475 fprintf(stdout, "pid %d\n", getpid());