Imported Upstream version 2.1.10
[platform/upstream/libevent.git] / bufferevent_async.c
1 /*
2  * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
3  *
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. The name of the author may not be used to endorse or promote products
15  *    derived from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27  */
28
29 #include "event2/event-config.h"
30 #include "evconfig-private.h"
31
32 #ifdef EVENT__HAVE_SYS_TIME_H
33 #include <sys/time.h>
34 #endif
35
36 #include <errno.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #ifdef EVENT__HAVE_STDARG_H
41 #include <stdarg.h>
42 #endif
43 #ifdef EVENT__HAVE_UNISTD_H
44 #include <unistd.h>
45 #endif
46
47 #ifdef _WIN32
48 #include <winsock2.h>
49 #include <ws2tcpip.h>
50 #endif
51
52 #include <sys/queue.h>
53
54 #include "event2/util.h"
55 #include "event2/bufferevent.h"
56 #include "event2/buffer.h"
57 #include "event2/bufferevent_struct.h"
58 #include "event2/event.h"
59 #include "event2/util.h"
60 #include "event-internal.h"
61 #include "log-internal.h"
62 #include "mm-internal.h"
63 #include "bufferevent-internal.h"
64 #include "util-internal.h"
65 #include "iocp-internal.h"
66
67 #ifndef SO_UPDATE_CONNECT_CONTEXT
68 /* Mingw is sometimes missing this */
69 #define SO_UPDATE_CONNECT_CONTEXT 0x7010
70 #endif
71
72 /* prototypes */
73 static int be_async_enable(struct bufferevent *, short);
74 static int be_async_disable(struct bufferevent *, short);
75 static void be_async_destruct(struct bufferevent *);
76 static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
77 static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
78
79 struct bufferevent_async {
80         struct bufferevent_private bev;
81         struct event_overlapped connect_overlapped;
82         struct event_overlapped read_overlapped;
83         struct event_overlapped write_overlapped;
84         size_t read_in_progress;
85         size_t write_in_progress;
86         unsigned ok : 1;
87         unsigned read_added : 1;
88         unsigned write_added : 1;
89 };
90
91 const struct bufferevent_ops bufferevent_ops_async = {
92         "socket_async",
93         evutil_offsetof(struct bufferevent_async, bev.bev),
94         be_async_enable,
95         be_async_disable,
96         NULL, /* Unlink */
97         be_async_destruct,
98         bufferevent_generic_adj_timeouts_,
99         be_async_flush,
100         be_async_ctrl,
101 };
102
103 static inline void
104 be_async_run_eventcb(struct bufferevent *bev, short what, int options)
105 { bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
106
107 static inline void
108 be_async_trigger_nolock(struct bufferevent *bev, short what, int options)
109 { bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
110
111 static inline int
112 fatal_error(int err)
113 {
114         switch (err) {
115                 /* We may have already associated this fd with a port.
116                  * Let's hope it's this port, and that the error code
117                  * for doing this neer changes. */
118                 case ERROR_INVALID_PARAMETER:
119                         return 0;
120         }
121         return 1;
122 }
123
124 static inline struct bufferevent_async *
125 upcast(struct bufferevent *bev)
126 {
127         struct bufferevent_async *bev_a;
128         if (!BEV_IS_ASYNC(bev))
129                 return NULL;
130         bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
131         return bev_a;
132 }
133
134 static inline struct bufferevent_async *
135 upcast_connect(struct event_overlapped *eo)
136 {
137         struct bufferevent_async *bev_a;
138         bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
139         EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
140         return bev_a;
141 }
142
143 static inline struct bufferevent_async *
144 upcast_read(struct event_overlapped *eo)
145 {
146         struct bufferevent_async *bev_a;
147         bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
148         EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
149         return bev_a;
150 }
151
152 static inline struct bufferevent_async *
153 upcast_write(struct event_overlapped *eo)
154 {
155         struct bufferevent_async *bev_a;
156         bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
157         EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
158         return bev_a;
159 }
160
161 static void
162 bev_async_del_write(struct bufferevent_async *beva)
163 {
164         struct bufferevent *bev = &beva->bev.bev;
165
166         if (beva->write_added) {
167                 beva->write_added = 0;
168                 event_base_del_virtual_(bev->ev_base);
169         }
170 }
171
172 static void
173 bev_async_del_read(struct bufferevent_async *beva)
174 {
175         struct bufferevent *bev = &beva->bev.bev;
176
177         if (beva->read_added) {
178                 beva->read_added = 0;
179                 event_base_del_virtual_(bev->ev_base);
180         }
181 }
182
183 static void
184 bev_async_add_write(struct bufferevent_async *beva)
185 {
186         struct bufferevent *bev = &beva->bev.bev;
187
188         if (!beva->write_added) {
189                 beva->write_added = 1;
190                 event_base_add_virtual_(bev->ev_base);
191         }
192 }
193
194 static void
195 bev_async_add_read(struct bufferevent_async *beva)
196 {
197         struct bufferevent *bev = &beva->bev.bev;
198
199         if (!beva->read_added) {
200                 beva->read_added = 1;
201                 event_base_add_virtual_(bev->ev_base);
202         }
203 }
204
205 static void
206 bev_async_consider_writing(struct bufferevent_async *beva)
207 {
208         size_t at_most;
209         int limit;
210         struct bufferevent *bev = &beva->bev.bev;
211
212         /* Don't write if there's a write in progress, or we do not
213          * want to write, or when there's nothing left to write. */
214         if (beva->write_in_progress || beva->bev.connecting)
215                 return;
216         if (!beva->ok || !(bev->enabled&EV_WRITE) ||
217             !evbuffer_get_length(bev->output)) {
218                 bev_async_del_write(beva);
219                 return;
220         }
221
222         at_most = evbuffer_get_length(bev->output);
223
224         /* This is safe so long as bufferevent_get_write_max never returns
225          * more than INT_MAX.  That's true for now. XXXX */
226         limit = (int)bufferevent_get_write_max_(&beva->bev);
227         if (at_most >= (size_t)limit && limit >= 0)
228                 at_most = limit;
229
230         if (beva->bev.write_suspended) {
231                 bev_async_del_write(beva);
232                 return;
233         }
234
235         /*  XXXX doesn't respect low-water mark very well. */
236         bufferevent_incref_(bev);
237         if (evbuffer_launch_write_(bev->output, at_most,
238             &beva->write_overlapped)) {
239                 bufferevent_decref_(bev);
240                 beva->ok = 0;
241                 be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
242         } else {
243                 beva->write_in_progress = at_most;
244                 bufferevent_decrement_write_buckets_(&beva->bev, at_most);
245                 bev_async_add_write(beva);
246         }
247 }
248
249 static void
250 bev_async_consider_reading(struct bufferevent_async *beva)
251 {
252         size_t cur_size;
253         size_t read_high;
254         size_t at_most;
255         int limit;
256         struct bufferevent *bev = &beva->bev.bev;
257
258         /* Don't read if there is a read in progress, or we do not
259          * want to read. */
260         if (beva->read_in_progress || beva->bev.connecting)
261                 return;
262         if (!beva->ok || !(bev->enabled&EV_READ)) {
263                 bev_async_del_read(beva);
264                 return;
265         }
266
267         /* Don't read if we're full */
268         cur_size = evbuffer_get_length(bev->input);
269         read_high = bev->wm_read.high;
270         if (read_high) {
271                 if (cur_size >= read_high) {
272                         bev_async_del_read(beva);
273                         return;
274                 }
275                 at_most = read_high - cur_size;
276         } else {
277                 at_most = 16384; /* FIXME totally magic. */
278         }
279
280         /* XXXX This over-commits. */
281         /* XXXX see also not above on cast on bufferevent_get_write_max_() */
282         limit = (int)bufferevent_get_read_max_(&beva->bev);
283         if (at_most >= (size_t)limit && limit >= 0)
284                 at_most = limit;
285
286         if (beva->bev.read_suspended) {
287                 bev_async_del_read(beva);
288                 return;
289         }
290
291         bufferevent_incref_(bev);
292         if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
293                 beva->ok = 0;
294                 be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
295                 bufferevent_decref_(bev);
296         } else {
297                 beva->read_in_progress = at_most;
298                 bufferevent_decrement_read_buckets_(&beva->bev, at_most);
299                 bev_async_add_read(beva);
300         }
301
302         return;
303 }
304
305 static void
306 be_async_outbuf_callback(struct evbuffer *buf,
307     const struct evbuffer_cb_info *cbinfo,
308     void *arg)
309 {
310         struct bufferevent *bev = arg;
311         struct bufferevent_async *bev_async = upcast(bev);
312
313         /* If we added data to the outbuf and were not writing before,
314          * we may want to write now. */
315
316         bufferevent_incref_and_lock_(bev);
317
318         if (cbinfo->n_added)
319                 bev_async_consider_writing(bev_async);
320
321         bufferevent_decref_and_unlock_(bev);
322 }
323
324 static void
325 be_async_inbuf_callback(struct evbuffer *buf,
326     const struct evbuffer_cb_info *cbinfo,
327     void *arg)
328 {
329         struct bufferevent *bev = arg;
330         struct bufferevent_async *bev_async = upcast(bev);
331
332         /* If we drained data from the inbuf and were not reading before,
333          * we may want to read now */
334
335         bufferevent_incref_and_lock_(bev);
336
337         if (cbinfo->n_deleted)
338                 bev_async_consider_reading(bev_async);
339
340         bufferevent_decref_and_unlock_(bev);
341 }
342
343 static int
344 be_async_enable(struct bufferevent *buf, short what)
345 {
346         struct bufferevent_async *bev_async = upcast(buf);
347
348         if (!bev_async->ok)
349                 return -1;
350
351         if (bev_async->bev.connecting) {
352                 /* Don't launch anything during connection attempts. */
353                 return 0;
354         }
355
356         if (what & EV_READ)
357                 BEV_RESET_GENERIC_READ_TIMEOUT(buf);
358         if (what & EV_WRITE)
359                 BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
360
361         /* If we newly enable reading or writing, and we aren't reading or
362            writing already, consider launching a new read or write. */
363
364         if (what & EV_READ)
365                 bev_async_consider_reading(bev_async);
366         if (what & EV_WRITE)
367                 bev_async_consider_writing(bev_async);
368         return 0;
369 }
370
371 static int
372 be_async_disable(struct bufferevent *bev, short what)
373 {
374         struct bufferevent_async *bev_async = upcast(bev);
375         /* XXXX If we disable reading or writing, we may want to consider
376          * canceling any in-progress read or write operation, though it might
377          * not work. */
378
379         if (what & EV_READ) {
380                 BEV_DEL_GENERIC_READ_TIMEOUT(bev);
381                 bev_async_del_read(bev_async);
382         }
383         if (what & EV_WRITE) {
384                 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
385                 bev_async_del_write(bev_async);
386         }
387
388         return 0;
389 }
390
391 static void
392 be_async_destruct(struct bufferevent *bev)
393 {
394         struct bufferevent_async *bev_async = upcast(bev);
395         struct bufferevent_private *bev_p = BEV_UPCAST(bev);
396         evutil_socket_t fd;
397
398         EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
399                         !upcast(bev)->read_in_progress);
400
401         bev_async_del_read(bev_async);
402         bev_async_del_write(bev_async);
403
404         fd = evbuffer_overlapped_get_fd_(bev->input);
405         if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
406                 (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
407                 evutil_closesocket(fd);
408                 evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
409         }
410 }
411
412 /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
413  * we use WSAGetOverlappedResult to translate. */
414 static void
415 bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
416 {
417         DWORD bytes, flags;
418         evutil_socket_t fd;
419
420         fd = evbuffer_overlapped_get_fd_(bev->input);
421         WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
422 }
423
424 static int
425 be_async_flush(struct bufferevent *bev, short what,
426     enum bufferevent_flush_mode mode)
427 {
428         return 0;
429 }
430
431 static void
432 connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
433     ev_ssize_t nbytes, int ok)
434 {
435         struct bufferevent_async *bev_a = upcast_connect(eo);
436         struct bufferevent *bev = &bev_a->bev.bev;
437         evutil_socket_t sock;
438
439         BEV_LOCK(bev);
440
441         EVUTIL_ASSERT(bev_a->bev.connecting);
442         bev_a->bev.connecting = 0;
443         sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
444         /* XXXX Handle error? */
445         setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
446
447         if (ok)
448                 bufferevent_async_set_connected_(bev);
449         else
450                 bev_async_set_wsa_error(bev, eo);
451
452         be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
453
454         event_base_del_virtual_(bev->ev_base);
455
456         bufferevent_decref_and_unlock_(bev);
457 }
458
459 static void
460 read_complete(struct event_overlapped *eo, ev_uintptr_t key,
461     ev_ssize_t nbytes, int ok)
462 {
463         struct bufferevent_async *bev_a = upcast_read(eo);
464         struct bufferevent *bev = &bev_a->bev.bev;
465         short what = BEV_EVENT_READING;
466         ev_ssize_t amount_unread;
467         BEV_LOCK(bev);
468         EVUTIL_ASSERT(bev_a->read_in_progress);
469
470         amount_unread = bev_a->read_in_progress - nbytes;
471         evbuffer_commit_read_(bev->input, nbytes);
472         bev_a->read_in_progress = 0;
473         if (amount_unread)
474                 bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
475
476         if (!ok)
477                 bev_async_set_wsa_error(bev, eo);
478
479         if (bev_a->ok) {
480                 if (ok && nbytes) {
481                         BEV_RESET_GENERIC_READ_TIMEOUT(bev);
482                         be_async_trigger_nolock(bev, EV_READ, 0);
483                         bev_async_consider_reading(bev_a);
484                 } else if (!ok) {
485                         what |= BEV_EVENT_ERROR;
486                         bev_a->ok = 0;
487                         be_async_run_eventcb(bev, what, 0);
488                 } else if (!nbytes) {
489                         what |= BEV_EVENT_EOF;
490                         bev_a->ok = 0;
491                         be_async_run_eventcb(bev, what, 0);
492                 }
493         }
494
495         bufferevent_decref_and_unlock_(bev);
496 }
497
498 static void
499 write_complete(struct event_overlapped *eo, ev_uintptr_t key,
500     ev_ssize_t nbytes, int ok)
501 {
502         struct bufferevent_async *bev_a = upcast_write(eo);
503         struct bufferevent *bev = &bev_a->bev.bev;
504         short what = BEV_EVENT_WRITING;
505         ev_ssize_t amount_unwritten;
506
507         BEV_LOCK(bev);
508         EVUTIL_ASSERT(bev_a->write_in_progress);
509
510         amount_unwritten = bev_a->write_in_progress - nbytes;
511         evbuffer_commit_write_(bev->output, nbytes);
512         bev_a->write_in_progress = 0;
513
514         if (amount_unwritten)
515                 bufferevent_decrement_write_buckets_(&bev_a->bev,
516                                                      -amount_unwritten);
517
518
519         if (!ok)
520                 bev_async_set_wsa_error(bev, eo);
521
522         if (bev_a->ok) {
523                 if (ok && nbytes) {
524                         BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
525                         be_async_trigger_nolock(bev, EV_WRITE, 0);
526                         bev_async_consider_writing(bev_a);
527                 } else if (!ok) {
528                         what |= BEV_EVENT_ERROR;
529                         bev_a->ok = 0;
530                         be_async_run_eventcb(bev, what, 0);
531                 } else if (!nbytes) {
532                         what |= BEV_EVENT_EOF;
533                         bev_a->ok = 0;
534                         be_async_run_eventcb(bev, what, 0);
535                 }
536         }
537
538         bufferevent_decref_and_unlock_(bev);
539 }
540
541 struct bufferevent *
542 bufferevent_async_new_(struct event_base *base,
543     evutil_socket_t fd, int options)
544 {
545         struct bufferevent_async *bev_a;
546         struct bufferevent *bev;
547         struct event_iocp_port *iocp;
548
549         options |= BEV_OPT_THREADSAFE;
550
551         if (!(iocp = event_base_get_iocp_(base)))
552                 return NULL;
553
554         if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
555                 if (fatal_error(GetLastError()))
556                         return NULL;
557         }
558
559         if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
560                 return NULL;
561
562         bev = &bev_a->bev.bev;
563         if (!(bev->input = evbuffer_overlapped_new_(fd))) {
564                 mm_free(bev_a);
565                 return NULL;
566         }
567         if (!(bev->output = evbuffer_overlapped_new_(fd))) {
568                 evbuffer_free(bev->input);
569                 mm_free(bev_a);
570                 return NULL;
571         }
572
573         if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
574                 options)<0)
575                 goto err;
576
577         evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
578         evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
579
580         event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
581         event_overlapped_init_(&bev_a->read_overlapped, read_complete);
582         event_overlapped_init_(&bev_a->write_overlapped, write_complete);
583
584         bufferevent_init_generic_timeout_cbs_(bev);
585
586         bev_a->ok = fd >= 0;
587
588         return bev;
589 err:
590         bufferevent_free(&bev_a->bev.bev);
591         return NULL;
592 }
593
594 void
595 bufferevent_async_set_connected_(struct bufferevent *bev)
596 {
597         struct bufferevent_async *bev_async = upcast(bev);
598         bev_async->ok = 1;
599         /* Now's a good time to consider reading/writing */
600         be_async_enable(bev, bev->enabled);
601 }
602
603 int
604 bufferevent_async_can_connect_(struct bufferevent *bev)
605 {
606         const struct win32_extension_fns *ext =
607             event_get_win32_extension_fns_();
608
609         if (BEV_IS_ASYNC(bev) &&
610             event_base_get_iocp_(bev->ev_base) &&
611             ext && ext->ConnectEx)
612                 return 1;
613
614         return 0;
615 }
616
617 int
618 bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
619         const struct sockaddr *sa, int socklen)
620 {
621         BOOL rc;
622         struct bufferevent_async *bev_async = upcast(bev);
623         struct sockaddr_storage ss;
624         const struct win32_extension_fns *ext =
625             event_get_win32_extension_fns_();
626
627         EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
628
629         /* ConnectEx() requires that the socket be bound to an address
630          * with bind() before using, otherwise it will fail. We attempt
631          * to issue a bind() here, taking into account that the error
632          * code is set to WSAEINVAL when the socket is already bound. */
633         memset(&ss, 0, sizeof(ss));
634         if (sa->sa_family == AF_INET) {
635                 struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
636                 sin->sin_family = AF_INET;
637                 sin->sin_addr.s_addr = INADDR_ANY;
638         } else if (sa->sa_family == AF_INET6) {
639                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
640                 sin6->sin6_family = AF_INET6;
641                 sin6->sin6_addr = in6addr_any;
642         } else {
643                 /* Well, the user will have to bind() */
644                 return -1;
645         }
646         if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
647             WSAGetLastError() != WSAEINVAL)
648                 return -1;
649
650         event_base_add_virtual_(bev->ev_base);
651         bufferevent_incref_(bev);
652         rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
653                             &bev_async->connect_overlapped.overlapped);
654         if (rc || WSAGetLastError() == ERROR_IO_PENDING)
655                 return 0;
656
657         event_base_del_virtual_(bev->ev_base);
658         bufferevent_decref_(bev);
659
660         return -1;
661 }
662
663 static int
664 be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
665     union bufferevent_ctrl_data *data)
666 {
667         switch (op) {
668         case BEV_CTRL_GET_FD:
669                 data->fd = evbuffer_overlapped_get_fd_(bev->input);
670                 return 0;
671         case BEV_CTRL_SET_FD: {
672                 struct bufferevent_async *bev_a = upcast(bev);
673                 struct event_iocp_port *iocp;
674
675                 if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
676                         return 0;
677                 if (!(iocp = event_base_get_iocp_(bev->ev_base)))
678                         return -1;
679                 if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) {
680                         if (fatal_error(GetLastError()))
681                                 return -1;
682                 }
683                 evbuffer_overlapped_set_fd_(bev->input, data->fd);
684                 evbuffer_overlapped_set_fd_(bev->output, data->fd);
685                 bev_a->ok = data->fd >= 0;
686                 return 0;
687         }
688         case BEV_CTRL_CANCEL_ALL: {
689                 struct bufferevent_async *bev_a = upcast(bev);
690                 evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
691                 if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
692                     (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
693                         closesocket(fd);
694                         evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
695                 }
696                 bev_a->ok = 0;
697                 return 0;
698         }
699         case BEV_CTRL_GET_UNDERLYING:
700         default:
701                 return -1;
702         }
703 }
704
705