1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
20 // CancelIo Windows API cancels all outstanding IO for a particular
21 // socket on current thread. To overcome that limitation, we run
22 // special goroutine, locked to OS single thread, that both starts
23 // and cancels IO. It means, there are 2 unavoidable thread switches
25 // Some newer versions of Windows has new CancelIoEx API, that does
26 // not have that limitation and can be used from any thread. This
27 // package uses CancelIoEx API, if present, otherwise it fallback
30 var canCancelIO bool // determines if CancelIoEx API is present
34 e := syscall.WSAStartup(uint32(0x202), &d)
36 initErr = os.NewSyscallError("WSAStartup", e)
38 canCancelIO = syscall.LoadCancelIoEx() == nil
39 if syscall.LoadGetAddrInfo() == nil {
40 lookupIP = newLookupIP
44 func closesocket(s syscall.Handle) error {
45 return syscall.Closesocket(s)
48 func canUseConnectEx(net string) bool {
49 if net == "udp" || net == "udp4" || net == "udp6" {
50 // ConnectEx windows API does not support connectionless sockets.
53 return syscall.LoadConnectEx() == nil
56 func dialTimeout(net, addr string, timeout time.Duration) (Conn, error) {
57 if !canUseConnectEx(net) {
58 // Use the relatively inefficient goroutine-racing
59 // implementation of DialTimeout.
60 return dialTimeoutRace(net, addr, timeout)
62 deadline := time.Now().Add(timeout)
63 _, addri, err := resolveNetAddr("dial", net, addr, deadline)
67 return dialAddr(net, addr, addri, deadline)
70 // Interface for all IO operations.
71 type anOpIface interface {
77 // IO completion result parameters.
78 type ioResult struct {
83 // anOp implements functionality common to all IO operations.
85 // Used by IOCP interface, it must be first field
86 // of the struct, as our code rely on it.
94 func (o *anOp) Init(fd *netFD, mode int) {
102 if fd.resultc[i] == nil {
103 fd.resultc[i] = make(chan ioResult, 1)
105 o.resultc = fd.resultc[i]
106 if fd.errnoc[i] == nil {
107 fd.errnoc[i] = make(chan error)
109 o.errnoc = fd.errnoc[i]
112 func (o *anOp) Op() *anOp {
116 // bufOp is used by IO operations that read / write
117 // data from / to client buffer.
123 func (o *bufOp) Init(fd *netFD, buf []byte, mode int) {
124 o.anOp.Init(fd, mode)
125 o.buf.Len = uint32(len(buf))
129 o.buf.Buf = (*byte)(unsafe.Pointer(&buf[0]))
133 // resultSrv will retrieve all IO completion results from
134 // iocp and send them to the correspondent waiting client
135 // goroutine via channel supplied in the request.
136 type resultSrv struct {
140 func (s *resultSrv) Run() {
141 var o *syscall.Overlapped
145 r.err = syscall.GetQueuedCompletionStatus(s.iocp, &(r.qty), &key, &o, syscall.INFINITE)
148 // Dequeued successfully completed IO packet.
149 case r.err == syscall.Errno(syscall.WAIT_TIMEOUT) && o == nil:
150 // Wait has timed out (should not happen now, but might be used in the future).
151 panic("GetQueuedCompletionStatus timed out")
153 // Failed to dequeue anything -> report the error.
154 panic("GetQueuedCompletionStatus failed " + r.err.Error())
156 // Dequeued failed IO packet.
158 (*anOp)(unsafe.Pointer(o)).resultc <- r
162 // ioSrv executes net IO requests.
164 submchan chan anOpIface // submit IO requests
165 canchan chan anOpIface // cancel IO requests
168 // ProcessRemoteIO will execute submit IO requests on behalf
169 // of other goroutines, all on a single os thread, so it can
170 // cancel them later. Results of all operations will be sent
171 // back to their requesters via channel supplied in request.
172 // It is used only when the CancelIoEx API is unavailable.
173 func (s *ioSrv) ProcessRemoteIO() {
174 runtime.LockOSThread()
175 defer runtime.UnlockOSThread()
178 case o := <-s.submchan:
179 o.Op().errnoc <- o.Submit()
180 case o := <-s.canchan:
181 o.Op().errnoc <- syscall.CancelIo(syscall.Handle(o.Op().fd.sysfd))
186 // ExecIO executes a single IO operation oi. It submits and cancels
187 // IO in the current thread for systems where Windows CancelIoEx API
188 // is available. Alternatively, it passes the request onto
189 // a special goroutine and waits for completion or cancels request.
190 // deadline is unix nanos.
191 func (s *ioSrv) ExecIO(oi anOpIface, deadline int64) (int, error) {
194 // Calculate timeout delta.
197 delta = deadline - time.Now().UnixNano()
199 return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, errTimeout}
206 // Send request to a special dedicated thread,
207 // so it can stop the IO with CancelIO later.
213 // IO completed immediately, but we need to get our completion message anyway.
214 case syscall.ERROR_IO_PENDING:
215 // IO started, and we have to wait for its completion.
218 return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
220 // Setup timer, if deadline is given.
221 var timer <-chan time.Time
223 t := time.NewTimer(time.Duration(delta) * time.Nanosecond)
227 // Wait for our request to complete.
229 var cancelled, timeout bool
231 case r = <-o.resultc:
241 err := syscall.CancelIoEx(syscall.Handle(o.Op().fd.sysfd), &o.o)
242 // Assuming ERROR_NOT_FOUND is returned, if IO is completed.
243 if err != nil && err != syscall.ERROR_NOT_FOUND {
244 // TODO(brainman): maybe do something else, but panic.
251 // Wait for IO to be canceled or complete successfully.
253 if r.err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled
262 err = &OpError{oi.Name(), o.fd.net, o.fd.laddr, r.err}
264 return int(r.qty), err
267 // Start helper goroutines.
268 var resultsrv *resultSrv
270 var onceStartServer sync.Once
273 resultsrv = new(resultSrv)
275 resultsrv.iocp, err = syscall.CreateIoCompletionPort(syscall.InvalidHandle, 0, 0, 1)
277 panic("CreateIoCompletionPort: " + err.Error())
283 // Only CancelIo API is available. Lets start special goroutine
284 // locked to an OS thread, that both starts and cancels IO.
285 iosrv.submchan = make(chan anOpIface)
286 iosrv.canchan = make(chan anOpIface)
287 go iosrv.ProcessRemoteIO()
291 // Network file descriptor.
293 // locking/lifetime of sysfd
298 // immutable until Close
306 resultc [2]chan ioResult // read/write completion results
307 errnoc [2]chan error // read/write submit or cancel operation errors
308 closec chan bool // used by Close to cancel pending IO
310 // serialize access to Read and Write methods
313 // read and write deadlines
314 rdeadline, wdeadline deadline
317 func allocFD(fd syscall.Handle, family, sotype int, net string) *netFD {
323 closec: make(chan bool),
328 func newFD(fd syscall.Handle, family, proto int, net string) (*netFD, error) {
332 onceStartServer.Do(startServer)
333 // Associate our socket with resultsrv.iocp.
334 if _, err := syscall.CreateIoCompletionPort(syscall.Handle(fd), resultsrv.iocp, 0, 0); err != nil {
337 return allocFD(fd, family, proto, net), nil
340 func (fd *netFD) setAddr(laddr, raddr Addr) {
343 runtime.SetFinalizer(fd, (*netFD).closesocket)
346 // Make new connection.
348 type connectOp struct {
353 func (o *connectOp) Submit() error {
354 return syscall.ConnectEx(o.fd.sysfd, o.ra, nil, 0, nil, &o.o)
357 func (o *connectOp) Name() string {
361 func (fd *netFD) connect(ra syscall.Sockaddr) error {
362 if !canUseConnectEx(fd.net) {
363 return syscall.Connect(fd.sysfd, ra)
365 // ConnectEx windows API requires an unconnected, previously bound socket.
366 var la syscall.Sockaddr
368 case *syscall.SockaddrInet4:
369 la = &syscall.SockaddrInet4{}
370 case *syscall.SockaddrInet6:
371 la = &syscall.SockaddrInet6{}
373 panic("unexpected type in connect")
375 if err := syscall.Bind(fd.sysfd, la); err != nil {
378 // Call ConnectEx API.
382 _, err := iosrv.ExecIO(&o, fd.wdeadline.value())
386 // Refresh socket properties.
387 return syscall.Setsockopt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_UPDATE_CONNECT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd)))
390 // Add a reference to this fd.
391 // If closing==true, mark the fd as closing.
392 // Returns an error if the fd cannot be used.
393 func (fd *netFD) incref(closing bool) error {
411 // Remove a reference to this FD and close if we've been asked to do so (and
412 // there are no references left.
413 func (fd *netFD) decref() {
419 if fd.closing && fd.sysref == 0 && fd.sysfd != syscall.InvalidHandle {
420 closesocket(fd.sysfd)
421 fd.sysfd = syscall.InvalidHandle
422 // no need for a finalizer anymore
423 runtime.SetFinalizer(fd, nil)
428 func (fd *netFD) Close() error {
429 if err := fd.incref(true); err != nil {
433 // unblock pending reader and writer
435 // wait for both reader and writer to exit
437 defer fd.rio.Unlock()
439 defer fd.wio.Unlock()
443 func (fd *netFD) shutdown(how int) error {
444 if err := fd.incref(false); err != nil {
448 err := syscall.Shutdown(fd.sysfd, how)
450 return &OpError{"shutdown", fd.net, fd.laddr, err}
455 func (fd *netFD) CloseRead() error {
456 return fd.shutdown(syscall.SHUT_RD)
459 func (fd *netFD) CloseWrite() error {
460 return fd.shutdown(syscall.SHUT_WR)
463 func (fd *netFD) closesocket() error {
464 return closesocket(fd.sysfd)
467 // Read from network.
473 func (o *readOp) Submit() error {
475 return syscall.WSARecv(syscall.Handle(o.fd.sysfd), &o.buf, 1, &d, &f, &o.o, nil)
478 func (o *readOp) Name() string {
482 func (fd *netFD) Read(buf []byte) (int, error) {
483 if err := fd.incref(false); err != nil {
488 defer fd.rio.Unlock()
491 n, err := iosrv.ExecIO(&o, fd.rdeadline.value())
492 if err == nil && n == 0 {
498 // ReadFrom from network.
500 type readFromOp struct {
502 rsa syscall.RawSockaddrAny
506 func (o *readFromOp) Submit() error {
508 return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &d, &f, &o.rsa, &o.rsan, &o.o, nil)
511 func (o *readFromOp) Name() string {
515 func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
519 if err := fd.incref(false); err != nil {
524 defer fd.rio.Unlock()
527 o.rsan = int32(unsafe.Sizeof(o.rsa))
528 n, err = iosrv.ExecIO(&o, fd.rdeadline.value())
532 sa, _ = o.rsa.Sockaddr()
538 type writeOp struct {
542 func (o *writeOp) Submit() error {
544 return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &d, 0, &o.o, nil)
547 func (o *writeOp) Name() string {
551 func (fd *netFD) Write(buf []byte) (int, error) {
552 if err := fd.incref(false); err != nil {
557 defer fd.wio.Unlock()
560 return iosrv.ExecIO(&o, fd.wdeadline.value())
563 // WriteTo to network.
565 type writeToOp struct {
570 func (o *writeToOp) Submit() error {
572 return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &d, 0, o.sa, &o.o, nil)
575 func (o *writeToOp) Name() string {
579 func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
583 if err := fd.incref(false); err != nil {
588 defer fd.wio.Unlock()
592 return iosrv.ExecIO(&o, fd.wdeadline.value())
595 // Accept new network connections.
597 type acceptOp struct {
599 newsock syscall.Handle
600 attrs [2]syscall.RawSockaddrAny // space for local and remote address only
603 func (o *acceptOp) Submit() error {
605 l := uint32(unsafe.Sizeof(o.attrs[0]))
606 return syscall.AcceptEx(o.fd.sysfd, o.newsock,
607 (*byte)(unsafe.Pointer(&o.attrs[0])), 0, l, l, &d, &o.o)
610 func (o *acceptOp) Name() string {
614 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
615 if err := fd.incref(false); err != nil {
621 // See ../syscall/exec_unix.go for description of ForkLock.
622 syscall.ForkLock.RLock()
623 s, err := syscall.Socket(fd.family, fd.sotype, 0)
625 syscall.ForkLock.RUnlock()
626 return nil, &OpError{"socket", fd.net, fd.laddr, err}
628 syscall.CloseOnExec(s)
629 syscall.ForkLock.RUnlock()
631 // Associate our new socket with IOCP.
632 onceStartServer.Do(startServer)
633 if _, err := syscall.CreateIoCompletionPort(s, resultsrv.iocp, 0, 0); err != nil {
635 return nil, &OpError{"CreateIoCompletionPort", fd.net, fd.laddr, err}
638 // Submit accept request.
642 _, err = iosrv.ExecIO(&o, fd.rdeadline.value())
648 // Inherit properties of the listening socket.
649 err = syscall.Setsockopt(s, syscall.SOL_SOCKET, syscall.SO_UPDATE_ACCEPT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd)))
652 return nil, &OpError{"Setsockopt", fd.net, fd.laddr, err}
655 // Get local and peer addr out of AcceptEx buffer.
656 var lrsa, rrsa *syscall.RawSockaddrAny
658 l := uint32(unsafe.Sizeof(*lrsa))
659 syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&o.attrs[0])),
660 0, l, l, &lrsa, &llen, &rrsa, &rlen)
661 lsa, _ := lrsa.Sockaddr()
662 rsa, _ := rrsa.Sockaddr()
664 netfd := allocFD(s, fd.family, fd.sotype, fd.net)
665 netfd.setAddr(toAddr(lsa), toAddr(rsa))
669 // Unimplemented functions.
671 func (fd *netFD) dup() (*os.File, error) {
672 // TODO: Implement this
673 return nil, os.NewSyscallError("dup", syscall.EWINDOWS)
676 var errNoSupport = errors.New("address family not supported")
678 func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err error) {
679 return 0, 0, 0, nil, errNoSupport
682 func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err error) {
683 return 0, 0, errNoSupport