Tizen 2.1 base
[platform/upstream/gcd.git] / dispatch-1.0 / testing / fd_stress.c
1 /*
2  * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
3  *
4  * @APPLE_APACHE_LICENSE_HEADER_START@
5  * 
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  * 
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  * 
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  * 
18  * @APPLE_APACHE_LICENSE_HEADER_END@
19  */
20
21 /*
22  * fd_stress.c
23  *
24  * Stress test for dispatch read and write sources.
25  */
26
27 #include "config/config.h"
28
29 #include <dispatch/dispatch.h>
30
31 #include <assert.h>
32 #include <CommonCrypto/CommonDigest.h>
33 #include <errno.h>
34 #include <fcntl.h>
35 #include <netdb.h>
36 #include <netinet/in.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <sys/param.h>
40 #include <unistd.h>
41
42 static inline size_t max(size_t a, size_t b) {
43         return (a > b) ? a : b;
44 }
45
46 static inline size_t min(size_t a, size_t b) {
47         return (a < b) ? a : b;
48 }
49
50 int debug = 0;
51
52 #define DEBUG(...) do { \
53                 if (debug) fprintf(stderr, __VA_ARGS__); \
54         } while(0);
55
56 #define assert_errno(str, expr) do { \
57         if (!(expr)) { \
58                 fprintf(stderr, "%s: %s\n", (str), strerror(errno)); \
59                 exit(1); \
60         } } while(0);
61
62 #define assert_gai_errno(str, expr) do { \
63         if (!(expr)) { \
64                 fprintf(stderr, "%s: %s\n", (str), gai_strerror(errno)); \
65                 exit(1); \
66         } } while(0);
67
68
69 /* sock_context
70  *
71  * Context structure used by the reader and writer queues.
72  *
73  * Writers begin by generating a random length and writing it to the descriptor.
74  * The write buffer is filled with a random byte value and written until empty
75  * or until the total length is reached. The write buffer is refilled with more
76  * random data when empty. Each write updates an MD5 digest which is written to
77  * the descriptor once the total length is reached.
78  *
79  * Readers begin by reading the total length of data. The read buffer is filled
80  * and an MD5 digest is computed on the bytes as they are received. Once the
81  * total length of data has be read, an MD5 digest is read from the descriptor
82  * and compared with the computed value.
83  */ 
84 struct sock_context {
85         enum {
86                 LENGTH,
87                 DATA,
88                 CKSUM,
89                 DONE,
90         } state;
91         char label[64];
92         uint32_t len;
93         off_t offset;
94         char buf[8192];
95         size_t buflen;
96         CC_MD5_CTX md5ctx;
97         char md5[CC_MD5_DIGEST_LENGTH];
98 };
99
100 dispatch_source_t
101 create_writer(int wfd, dispatch_block_t completion)
102 {
103         dispatch_source_t ds;
104         struct sock_context *ctx = calloc(1, sizeof(struct sock_context));
105         assert(ctx);
106
107         snprintf(ctx->label, sizeof(ctx->label), "writer.fd.%d", wfd);
108         dispatch_queue_t queue = dispatch_queue_create(ctx->label, 0);
109         
110         ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, wfd, 0, queue);
111         assert(ds);
112         dispatch_release(queue);
113         
114         uint32_t len;
115         do {
116                 len = (arc4random() & 0x7FFF);
117         } while (len == 0);
118         ctx->state = LENGTH;
119         CC_MD5_Init(&ctx->md5ctx);
120         ctx->len = len;
121         ctx->buflen = sizeof(len);
122         len = htonl(len);
123         memcpy(ctx->buf, &len, ctx->buflen);
124         DEBUG("%s: LENGTH %d\n", ctx->label, ctx->len);
125         
126         dispatch_source_set_event_handler(ds, ^{
127                 DEBUG("%s: available %ld\n", ctx->label, dispatch_source_get_data(ds));
128                 ssize_t res;
129                 size_t wrsz = min(ctx->len, ctx->buflen);
130                 res = write(wfd, &ctx->buf[ctx->offset], wrsz);
131                 DEBUG("%s: write(%d, %p, %ld): %ld\n", ctx->label, wfd, &ctx->buf[ctx->offset], wrsz, res);
132                 if (res > 0) {
133                         if (ctx->state == DATA) {
134                                 CC_MD5_Update(&ctx->md5ctx, &ctx->buf[ctx->offset], res);
135                                 ctx->len -= res;
136                         }
137                         ctx->offset += res;
138                         ctx->buflen -= res;
139                         assert(ctx->offset >= 0);
140                         assert(ctx->len >= 0);
141                         assert(ctx->buflen >= 0);
142                         if (ctx->buflen == 0 || ctx->len == 0) {
143                                 if (ctx->state == LENGTH) {
144                                         // finished writing length, move on to data.
145                                         ctx->state = DATA;
146                                         ctx->buflen = sizeof(ctx->buf);
147                                         char pattern = arc4random() & 0xFF;
148                                         memset(ctx->buf, pattern, ctx->buflen);
149                                 } else if (ctx->state == DATA && ctx->len == 0) {
150                                         // finished writing data, move on to cksum.
151                                         ctx->state = CKSUM;
152                                         ctx->len = sizeof(ctx->md5);
153                                         ctx->buflen = sizeof(ctx->md5);
154                                         CC_MD5_Final(ctx->md5, &ctx->md5ctx);
155                                         memcpy(ctx->buf, ctx->md5, ctx->buflen);
156                                 } else if (ctx->state == DATA) {
157                                         ctx->buflen = sizeof(ctx->buf);
158                                         char pattern = arc4random() & 0xFF;
159                                         memset(ctx->buf, pattern, ctx->buflen);
160                                 } else if (ctx->state == CKSUM) {
161                                         ctx->state = DONE;
162                                         dispatch_source_cancel(ds);
163                                 } else {
164                                         assert(0);
165                                 }
166                                 ctx->offset = 0;
167                         }
168                 } else if (res == 0) {
169                         assert(ctx->state == DONE);
170                         assert(0);
171                 } else if (res == -1 && errno == EAGAIN) {
172                         DEBUG("%s: EAGAIN\n", ctx->label);
173                 } else {
174                         assert_errno("write", res >= 0);
175                 }
176         });
177         dispatch_source_set_cancel_handler(ds, ^{
178                 DEBUG("%s: close(%d)\n", ctx->label, wfd);
179                 int res = close(wfd);
180                 assert_errno("close", res == 0);
181                 completion();
182                 dispatch_release(ds);
183                 free(ctx);
184         });
185         dispatch_resume(ds);
186         return ds;
187 }
188
189 dispatch_source_t
190 create_reader(int rfd, dispatch_block_t completion)
191 {
192         dispatch_source_t ds;
193         struct sock_context *ctx = calloc(1, sizeof(struct sock_context));
194         assert(ctx);
195         
196         snprintf(ctx->label, sizeof(ctx->label), "reader.fd.%d", rfd);
197         dispatch_queue_t queue = dispatch_queue_create(ctx->label, 0);
198         
199         ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, rfd, 0, queue);
200         assert(ds);
201         dispatch_release(queue);
202         
203         ctx->state = LENGTH;
204         ctx->len = sizeof(ctx->len);
205         ctx->buflen = sizeof(ctx->len);
206         CC_MD5_Init(&ctx->md5ctx);
207         
208         dispatch_source_set_event_handler(ds, ^{
209                 DEBUG("%s: available %ld\n", ctx->label, dispatch_source_get_data(ds));
210                 ssize_t res;
211                 size_t rdsz = min(ctx->len, ctx->buflen);
212                 res = read(rfd, &ctx->buf[ctx->offset], rdsz);
213                 DEBUG("%s: read(%d,%p,%ld): %ld\n", ctx->label, rfd, &ctx->buf[ctx->offset], rdsz, res);
214
215                 // log unexpected data lengths...
216                 long expected = dispatch_source_get_data(ds);
217                 long actual = res;
218                 if (actual >= 0 && (actual != expected && actual != rdsz)) {
219                         fprintf(stderr, "%s: expected %ld, actual %ld (rdsz = %ld)\n", ctx->label, expected, actual, rdsz);
220                 }
221
222                 if (res > 0) {
223                         if (ctx->state == DATA) {
224                                 CC_MD5_Update(&ctx->md5ctx, &ctx->buf[ctx->offset], res);
225                                 ctx->len -= res;
226                         }
227                         ctx->offset += res;
228                         ctx->buflen -= res;
229                         if (ctx->buflen == 0 || ctx->len == 0) {
230                                 if (ctx->state == LENGTH) {
231                                         // buffer is full, interpret as uint32_t
232                                         memcpy(&ctx->len, ctx->buf, sizeof(ctx->len));
233                                         ctx->len = ntohl(ctx->len);
234                                         ctx->buflen = sizeof(ctx->buf);
235                                         ctx->state = DATA;
236                                 } else if (ctx->state == DATA && ctx->len == 0) {
237                                         CC_MD5_Final(ctx->md5, &ctx->md5ctx);
238                                         ctx->state = CKSUM;
239                                         ctx->len = CC_MD5_DIGEST_LENGTH;
240                                         ctx->buflen = ctx->len;
241                                 } else if (ctx->state == DATA) {
242                                         ctx->buflen = sizeof(ctx->buf);
243                                 } else if (ctx->state == CKSUM) {
244                                         ctx->state = DONE;
245                                         res = memcmp(ctx->buf, ctx->md5, sizeof(ctx->md5));
246                                         if (res != 0) {
247                                                 DEBUG("%s: MD5 FAILURE\n", ctx->label);
248                                         }
249                                         assert(res == 0);
250                                 }
251                                 ctx->offset = 0;
252                         }
253                 } else if (res == 0) {
254                         assert(ctx->state == DONE);
255                         DEBUG("%s: EOF\n", ctx->label);
256                         dispatch_source_cancel(ds);
257                 } else {
258                         assert_errno("read", res >= 0);
259                 }
260         });
261         dispatch_source_set_cancel_handler(ds, ^{
262                 DEBUG("%s: close(%d)\n", ctx->label, rfd);
263                 int res = close(rfd);
264                 assert_errno("close", res == 0);
265                 completion();
266                 dispatch_release(ds);
267                 free(ctx);
268         });
269         dispatch_resume(ds);
270         return ds;
271 }
272
273 void
274 set_nonblock(int fd)
275 {
276         int res, flags;
277         flags = fcntl(fd, F_GETFL);
278
279         flags |= O_NONBLOCK;
280         res = fcntl(fd, F_SETFL, flags);
281         assert_errno("fcntl(F_SETFL,O_NONBLOCK)", res == 0);
282 }
283
284 void
285 create_fifo(int *rfd, int *wfd)
286 {
287         int res;
288         char *name;
289         
290         char path[MAXPATHLEN];
291         strlcpy(path, "/tmp/fd_stress.fifo.XXXXXX", sizeof(path));
292         name = mktemp(path);
293         
294         res = unlink(name);
295
296         res = mkfifo(name, 0700);
297         assert_errno(name, res == 0);
298         
299         *rfd = open(name, O_RDONLY | O_NONBLOCK);
300         assert_errno(name, *rfd >= 0);
301         
302         *wfd = open(name, O_WRONLY | O_NONBLOCK);
303         assert_errno(name, *wfd >= 0);
304 }
305
306 void
307 create_pipe(int *rfd, int *wfd)
308 {
309         int res;
310         int fildes[2];
311         
312         res = pipe(fildes);
313         assert_errno("pipe", res == 0);
314         
315         *rfd = fildes[0];
316         *wfd = fildes[1];
317
318         set_nonblock(*rfd);
319         set_nonblock(*wfd);
320 }
321
322 void
323 create_server_socket(int *rfd, struct sockaddr_in *sa)
324 {
325         int res;
326         int value;
327         socklen_t salen = sizeof(*sa);
328
329         memset(sa, 0, salen);
330         sa->sin_len = salen;
331         sa->sin_family = AF_INET;
332         sa->sin_port = htons(12345);
333         sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
334
335         *rfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
336         assert_errno("socket", *rfd >= 0);
337         
338         value = 1;
339         res = setsockopt(*rfd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
340         assert_errno("setsockopt(SO_REUSEADDR)", res == 0);
341
342         value = 1;
343         res = setsockopt(*rfd, SOL_SOCKET, SO_REUSEPORT, &value, sizeof(value));
344         assert_errno("setsockopt(SO_REUSEPORT)", res == 0);
345         
346         res = bind(*rfd, (const struct sockaddr *)sa, salen);
347         assert_errno("bind", res == 0);
348
349         res = listen(*rfd, 128);
350         assert_errno("listen", res == 0);
351 }
352
353 void
354 create_client_socket(int *wfd, const struct sockaddr_in *sa)
355 {
356         int res;
357
358         *wfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
359         assert_errno("socket", *wfd >= 0);
360         
361         set_nonblock(*wfd);
362
363         res = connect(*wfd, (const struct sockaddr *)sa, sa->sin_len);
364         assert_errno("connect", res == 0 || errno == EINPROGRESS);
365 }
366
367 extern int optind;
368
369 void
370 usage(void)
371 {
372         fprintf(stderr, "usage: fd_stress [-d] iterations width\n");
373         exit(1);
374 }
375
376 int
377 main(int argc, char* argv[])
378 {
379         int serverfd;
380         struct sockaddr_in sa;
381         create_server_socket(&serverfd, &sa);
382
383         int ch;
384         
385         while ((ch = getopt(argc, argv, "d")) != -1) {
386                 switch (ch) {
387                         case 'd':
388                                 debug = 1;
389                                 break;
390                         case '?':
391                         default:
392                                 usage();
393                                 break;
394                 }
395         }
396         argc -= optind;
397         argv += optind;
398         
399         if (argc != 2) {
400                 usage();
401         }
402         
403         size_t iterations = strtol(argv[0], NULL, 10);
404         size_t width = strtol(argv[1], NULL, 10);
405         
406         if (iterations == 0 || width == 0) {
407                 usage();
408         }
409
410         fprintf(stdout, "pid %d\n", getpid());
411
412         dispatch_group_t group;
413         group = dispatch_group_create();
414         assert(group);
415
416 #if 0
417         dispatch_queue_t queue = dispatch_queue_create("server", NULL);
418
419         dispatch_source_t ds;
420         ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, serverfd, 0, queue);
421         assert(ds);
422         dispatch_source_set_event_handler(ds, ^{
423                 int res;
424                 int fd;
425                 struct sockaddr peer;
426                 socklen_t peerlen;
427
428                 fd = accept(serverfd, &peer, &peerlen);
429                 assert_errno("accept", fd >= 0);
430
431                 set_nonblock(fd);
432                 
433                 char host[NI_MAXHOST], serv[NI_MAXSERV];
434                 host[0] = 0;
435                 serv[0] = 0;
436                 res = getnameinfo(&peer, peerlen, host, sizeof(host), serv, sizeof(serv), NI_NUMERICHOST|NI_NUMERICSERV);
437                 DEBUG("ACCEPTED %d (%s:%s)\n", fd, host, serv);
438                 
439                 create_reader(fd, ^{ dispatch_group_leave(group); });
440         });
441         dispatch_resume(ds);
442 #endif
443
444         size_t i;
445         for (i = 1; i < iterations; ++i) {
446                 fprintf(stderr, "iteration %ld\n", i);
447
448                 size_t j;
449                 for (j = 0; j < width; ++j) {
450                         int rfd, wfd;
451                         dispatch_group_enter(group);
452                         create_pipe(&rfd, &wfd);
453                         DEBUG("PIPE %d %d\n", rfd, wfd);
454                         dispatch_source_t reader;
455                         reader = create_reader(rfd, ^{ dispatch_group_leave(group); });
456                         create_writer(wfd, ^{});
457                 }
458                 
459 #if 0
460                 int clientfd;
461                 dispatch_group_enter(group);
462                 create_client_socket(&clientfd, &sa);
463                 DEBUG("CLIENT %d\n", clientfd);
464                 create_writer(clientfd, ^{});
465
466                 dispatch_group_enter(group);
467                 create_fifo(&rfd, &wfd);
468                 DEBUG("FIFO %d %d\n", rfd, wfd);
469                 create_writer(wfd, ^{});
470                 create_reader(rfd, ^{ dispatch_group_leave(group); });
471 #endif
472
473                 dispatch_group_wait(group, DISPATCH_TIME_FOREVER);
474         }
475         fprintf(stdout, "pid %d\n", getpid());
476         dispatch_main();
477
478         return 0;
479 }