1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
5 // +build darwin freebsd linux netbsd openbsd
18 // Network file descriptor.
20 // locking/lifetime of sysfd
24 // must lock both sysmu and pollserver to write
25 // can lock either to read
28 // immutable until Close
46 // owned by fd wait server
50 // A pollServer helps FDs determine when to retry a non-blocking
51 // read or write after they get EAGAIN. When an FD needs to wait,
52 // send the fd on s.cr (for a read) or s.cw (for a write) to pass the
53 // request to the poll server. Then receive on fd.cr/fd.cw.
54 // When the pollServer finds that i/o on FD should be possible
55 // again, it will send fd on fd.cr/fd.cw to wake any waiting processes.
56 // This protocol is implemented as s.WaitRead() and s.WaitWrite().
58 // There is one subtlety: when sending on s.cr/s.cw, the
59 // poll server is probably in a system call, waiting for an fd
60 // to become ready. It's not looking at the request channels.
61 // To resolve this, the poll server waits not just on the FDs it has
62 // been given but also its own pipe. After sending on the
63 // buffered channel s.cr/s.cw, WaitRead/WaitWrite writes a
64 // byte to the pipe, causing the pollServer's poll system call to
65 // return. In response to the pipe being readable, the pollServer
66 // re-polls its request channels.
68 // Note that the ordering is "send request" and then "wake up server".
69 // If the operations were reversed, there would be a race: the poll
70 // server might wake up and look at the request channel, see that it
71 // was empty, and go back to sleep, all before the requester managed
72 // to send the request. Because the send must complete before the wakeup,
73 // the request channel must be buffered. A buffer of size 1 is sufficient
74 // for any request load. If many processes are trying to submit requests,
75 // one will succeed, the pollServer will read the request, and then the
76 // channel will be empty for the next process's request. A larger buffer
77 // might help batch requests.
79 // To avoid races in closing, all fd operations are locked and
80 // refcounted. when netFD.Close() is called, it calls syscall.Shutdown
81 // and sets a closing flag. Only when the last reference is removed
82 // will the fd be closed.
84 type pollServer struct {
85 cr, cw chan *netFD // buffered >= 1
87 poll *pollster // low-level OS hooks
88 sync.Mutex // controls pending and deadline
89 pending map[int]*netFD
90 deadline int64 // next deadline (nsec since 1970)
93 func (s *pollServer) AddFD(fd *netFD, mode int) error {
96 if intfd < 0 || fd.closing {
97 // fd closed underfoot
114 if t > 0 && (s.deadline == 0 || t < s.deadline) {
119 wake, err := s.poll.AddFD(intfd, mode, false)
121 panic("pollServer AddFD " + err.Error())
134 // Evict evicts fd from the pending list, unblocking
135 // any I/O running on fd. The caller must have locked
137 func (s *pollServer) Evict(fd *netFD) {
138 if s.pending[fd.sysfd<<1] == fd {
139 s.WakeFD(fd, 'r', errClosing)
140 s.poll.DelFD(fd.sysfd, 'r')
141 delete(s.pending, fd.sysfd<<1)
143 if s.pending[fd.sysfd<<1|1] == fd {
144 s.WakeFD(fd, 'w', errClosing)
145 s.poll.DelFD(fd.sysfd, 'w')
146 delete(s.pending, fd.sysfd<<1|1)
150 var wakeupbuf [1]byte
152 func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) }
154 func (s *pollServer) LookupFD(fd int, mode int) *netFD {
159 netfd, ok := s.pending[key]
163 delete(s.pending, key)
167 func (s *pollServer) WakeFD(fd *netFD, mode int, err error) {
181 func (s *pollServer) Now() int64 {
182 return time.Now().UnixNano()
185 func (s *pollServer) CheckDeadlines() {
187 // TODO(rsc): This will need to be handled more efficiently,
188 // probably with a heap indexed by wakeup time.
190 var next_deadline int64
191 for key, fd := range s.pending {
206 delete(s.pending, key)
208 s.poll.DelFD(fd.sysfd, mode)
211 s.poll.DelFD(fd.sysfd, mode)
214 s.WakeFD(fd, mode, nil)
215 } else if next_deadline == 0 || t < next_deadline {
220 s.deadline = next_deadline
223 func (s *pollServer) Run() {
224 var scratch [100]byte
236 fd, mode, err := s.poll.WaitFD(s, t)
238 print("pollServer WaitFD: ", err.Error(), "\n")
246 if fd == int(s.pr.Fd()) {
247 // Drain our wakeup pipe (we could loop here,
248 // but it's unlikely that there are more than
249 // len(scratch) wakeup calls).
250 s.pr.Read(scratch[0:])
253 netfd := s.LookupFD(fd, mode)
255 // This can happen because the WaitFD runs without
256 // holding s's lock, so there might be a pending wakeup
257 // for an fd that has been evicted. No harm done.
260 s.WakeFD(netfd, mode, nil)
265 func (s *pollServer) WaitRead(fd *netFD) error {
266 err := s.AddFD(fd, 'r')
273 func (s *pollServer) WaitWrite(fd *netFD) error {
274 err := s.AddFD(fd, 'w')
281 // Network FD methods.
282 // All the network FDs use a single pollServer.
284 var pollserver *pollServer
285 var onceStartServer sync.Once
288 p, err := newPollServer()
290 print("Start pollServer: ", err.Error(), "\n")
295 func newFD(fd, family, sotype int, net string) (*netFD, error) {
296 onceStartServer.Do(startServer)
297 if err := syscall.SetNonblock(fd, true); err != nil {
306 netfd.cr = make(chan error, 1)
307 netfd.cw = make(chan error, 1)
311 func (fd *netFD) setAddr(laddr, raddr Addr) {
321 fd.sysfile = os.NewFile(uintptr(fd.sysfd), fd.net+":"+ls+"->"+rs)
324 func (fd *netFD) connect(ra syscall.Sockaddr) error {
325 err := syscall.Connect(fd.sysfd, ra)
326 if err == syscall.EINPROGRESS {
327 if err = pollserver.WaitWrite(fd); err != nil {
331 e, err = syscall.GetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR)
333 return os.NewSyscallError("getsockopt", err)
336 err = syscall.Errno(e)
342 var errClosing = errors.New("use of closed network connection")
344 // Add a reference to this fd.
345 // If closing==true, pollserver must be locked; mark the fd as closing.
346 // Returns an error if the fd cannot be used.
347 func (fd *netFD) incref(closing bool) error {
364 // Remove a reference to this FD and close if we've been asked to do so (and
365 // there are no references left.
366 func (fd *netFD) decref() {
372 if fd.closing && fd.sysref == 0 && fd.sysfile != nil {
380 func (fd *netFD) Close() error {
381 pollserver.Lock() // needed for both fd.incref(true) and pollserver.Evict
382 defer pollserver.Unlock()
383 if err := fd.incref(true); err != nil {
386 // Unblock any I/O. Once it all unblocks and returns,
387 // so that it cannot be referring to fd.sysfd anymore,
388 // the final decref will close fd.sysfd. This should happen
389 // fairly quickly, since all the I/O is non-blocking, and any
390 // attempts to block in the pollserver will return errClosing.
396 func (fd *netFD) shutdown(how int) error {
397 if err := fd.incref(false); err != nil {
401 err := syscall.Shutdown(fd.sysfd, how)
403 return &OpError{"shutdown", fd.net, fd.laddr, err}
408 func (fd *netFD) CloseRead() error {
409 return fd.shutdown(syscall.SHUT_RD)
412 func (fd *netFD) CloseWrite() error {
413 return fd.shutdown(syscall.SHUT_WR)
416 func (fd *netFD) Read(p []byte) (n int, err error) {
418 defer fd.rio.Unlock()
419 if err := fd.incref(false); err != nil {
424 n, err = syscall.Read(int(fd.sysfd), p)
425 if err == syscall.EAGAIN {
427 if fd.rdeadline >= 0 {
428 if err = pollserver.WaitRead(fd); err == nil {
435 } else if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM {
440 if err != nil && err != io.EOF {
441 err = &OpError{"read", fd.net, fd.raddr, err}
446 func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
448 defer fd.rio.Unlock()
449 if err := fd.incref(false); err != nil {
454 n, sa, err = syscall.Recvfrom(fd.sysfd, p, 0)
455 if err == syscall.EAGAIN {
457 if fd.rdeadline >= 0 {
458 if err = pollserver.WaitRead(fd); err == nil {
468 if err != nil && err != io.EOF {
469 err = &OpError{"read", fd.net, fd.laddr, err}
474 func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err error) {
476 defer fd.rio.Unlock()
477 if err := fd.incref(false); err != nil {
478 return 0, 0, 0, nil, err
482 n, oobn, flags, sa, err = syscall.Recvmsg(fd.sysfd, p, oob, 0)
483 if err == syscall.EAGAIN {
485 if fd.rdeadline >= 0 {
486 if err = pollserver.WaitRead(fd); err == nil {
491 if err == nil && n == 0 {
496 if err != nil && err != io.EOF {
497 err = &OpError{"read", fd.net, fd.laddr, err}
503 func (fd *netFD) Write(p []byte) (int, error) {
505 defer fd.wio.Unlock()
506 if err := fd.incref(false); err != nil {
510 if fd.sysfile == nil {
511 return 0, syscall.EINVAL
518 n, err = syscall.Write(int(fd.sysfd), p[nn:])
525 if err == syscall.EAGAIN {
527 if fd.wdeadline >= 0 {
528 if err = pollserver.WaitWrite(fd); err == nil {
538 err = io.ErrUnexpectedEOF
543 err = &OpError{"write", fd.net, fd.raddr, err}
548 func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
550 defer fd.wio.Unlock()
551 if err := fd.incref(false); err != nil {
556 err = syscall.Sendto(fd.sysfd, p, 0, sa)
557 if err == syscall.EAGAIN {
559 if fd.wdeadline >= 0 {
560 if err = pollserver.WaitWrite(fd); err == nil {
570 err = &OpError{"write", fd.net, fd.raddr, err}
575 func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err error) {
577 defer fd.wio.Unlock()
578 if err := fd.incref(false); err != nil {
583 err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0)
584 if err == syscall.EAGAIN {
586 if fd.wdeadline >= 0 {
587 if err = pollserver.WaitWrite(fd); err == nil {
598 err = &OpError{"write", fd.net, fd.raddr, err}
603 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err error) {
604 if err := fd.incref(false); err != nil {
609 // See ../syscall/exec.go for description of ForkLock.
610 // It is okay to hold the lock across syscall.Accept
611 // because we have put fd.sysfd into non-blocking mode.
613 var rsa syscall.Sockaddr
615 syscall.ForkLock.RLock()
616 s, rsa, err = syscall.Accept(fd.sysfd)
618 syscall.ForkLock.RUnlock()
619 if err == syscall.EAGAIN {
621 if fd.rdeadline >= 0 {
622 if err = pollserver.WaitRead(fd); err == nil {
626 } else if err == syscall.ECONNABORTED {
627 // This means that a socket on the listen queue was closed
628 // before we Accept()ed it; it's a silly error, so try again.
631 return nil, &OpError{"accept", fd.net, fd.laddr, err}
635 syscall.CloseOnExec(s)
636 syscall.ForkLock.RUnlock()
638 if netfd, err = newFD(s, fd.family, fd.sotype, fd.net); err != nil {
642 lsa, _ := syscall.Getsockname(netfd.sysfd)
643 netfd.setAddr(toAddr(lsa), toAddr(rsa))
647 func (fd *netFD) dup() (f *os.File, err error) {
648 ns, err := syscall.Dup(fd.sysfd)
650 return nil, &OpError{"dup", fd.net, fd.laddr, err}
653 // We want blocking mode for the new fd, hence the double negative.
654 if err = syscall.SetNonblock(ns, false); err != nil {
655 return nil, &OpError{"setnonblock", fd.net, fd.laddr, err}
658 return os.NewFile(uintptr(ns), fd.sysfile.Name()), nil
661 func closesocket(s int) error {
662 return syscall.Close(s)