1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/udp/udp_socket_libevent.h"
11 #include <netinet/in.h>
12 #include <sys/ioctl.h>
13 #include <sys/socket.h>
15 #include "base/callback.h"
16 #include "base/logging.h"
17 #include "base/message_loop/message_loop.h"
18 #include "base/metrics/sparse_histogram.h"
19 #include "base/metrics/stats_counters.h"
20 #include "base/posix/eintr_wrapper.h"
21 #include "base/rand_util.h"
22 #include "net/base/io_buffer.h"
23 #include "net/base/ip_endpoint.h"
24 #include "net/base/net_errors.h"
25 #include "net/base/net_log.h"
26 #include "net/base/net_util.h"
27 #include "net/socket/socket_descriptor.h"
28 #include "net/udp/udp_net_log_parameters.h"
35 const int kBindRetries = 10;
36 const int kPortStart = 1024;
37 const int kPortEnd = 65535;
39 #if defined(OS_MACOSX)
41 // Returns IPv4 address in network order.
42 int GetIPv4AddressFromIndex(int socket, uint32 index, uint32* address){
44 *address = htonl(INADDR_ANY);
48 ifr.ifr_addr.sa_family = AF_INET;
49 if (!if_indextoname(index, ifr.ifr_name))
50 return MapSystemError(errno);
51 int rv = ioctl(socket, SIOCGIFADDR, &ifr);
53 return MapSystemError(errno);
54 *address = reinterpret_cast<sockaddr_in*>(&ifr.ifr_addr)->sin_addr.s_addr;
62 UDPSocketLibevent::UDPSocketLibevent(
63 DatagramSocket::BindType bind_type,
64 const RandIntCallback& rand_int_cb,
66 const net::NetLog::Source& source)
67 : socket_(kInvalidSocket),
69 socket_options_(SOCKET_OPTION_MULTICAST_LOOP),
70 multicast_interface_(0),
71 multicast_time_to_live_(1),
72 bind_type_(bind_type),
73 rand_int_cb_(rand_int_cb),
77 recv_from_address_(NULL),
79 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_UDP_SOCKET)) {
80 net_log_.BeginEvent(NetLog::TYPE_SOCKET_ALIVE,
81 source.ToEventParametersCallback());
82 if (bind_type == DatagramSocket::RANDOM_BIND)
83 DCHECK(!rand_int_cb.is_null());
86 UDPSocketLibevent::~UDPSocketLibevent() {
88 net_log_.EndEvent(NetLog::TYPE_SOCKET_ALIVE);
91 void UDPSocketLibevent::Close() {
92 DCHECK(CalledOnValidThread());
97 // Zero out any pending read/write callback state.
100 read_callback_.Reset();
101 recv_from_address_ = NULL;
104 write_callback_.Reset();
105 send_to_address_.reset();
107 bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
109 ok = write_socket_watcher_.StopWatchingFileDescriptor();
112 if (IGNORE_EINTR(close(socket_)) < 0)
113 PLOG(ERROR) << "close";
115 socket_ = kInvalidSocket;
119 int UDPSocketLibevent::GetPeerAddress(IPEndPoint* address) const {
120 DCHECK(CalledOnValidThread());
123 return ERR_SOCKET_NOT_CONNECTED;
125 if (!remote_address_.get()) {
126 SockaddrStorage storage;
127 if (getpeername(socket_, storage.addr, &storage.addr_len))
128 return MapSystemError(errno);
129 scoped_ptr<IPEndPoint> address(new IPEndPoint());
130 if (!address->FromSockAddr(storage.addr, storage.addr_len))
131 return ERR_ADDRESS_INVALID;
132 remote_address_.reset(address.release());
135 *address = *remote_address_;
139 int UDPSocketLibevent::GetLocalAddress(IPEndPoint* address) const {
140 DCHECK(CalledOnValidThread());
143 return ERR_SOCKET_NOT_CONNECTED;
145 if (!local_address_.get()) {
146 SockaddrStorage storage;
147 if (getsockname(socket_, storage.addr, &storage.addr_len))
148 return MapSystemError(errno);
149 scoped_ptr<IPEndPoint> address(new IPEndPoint());
150 if (!address->FromSockAddr(storage.addr, storage.addr_len))
151 return ERR_ADDRESS_INVALID;
152 local_address_.reset(address.release());
153 net_log_.AddEvent(NetLog::TYPE_UDP_LOCAL_ADDRESS,
154 CreateNetLogUDPConnectCallback(local_address_.get()));
157 *address = *local_address_;
161 int UDPSocketLibevent::Read(IOBuffer* buf,
163 const CompletionCallback& callback) {
164 return RecvFrom(buf, buf_len, NULL, callback);
167 int UDPSocketLibevent::RecvFrom(IOBuffer* buf,
170 const CompletionCallback& callback) {
171 DCHECK(CalledOnValidThread());
172 DCHECK_NE(kInvalidSocket, socket_);
173 CHECK(read_callback_.is_null());
174 DCHECK(!recv_from_address_);
175 DCHECK(!callback.is_null()); // Synchronous operation not supported
176 DCHECK_GT(buf_len, 0);
178 int nread = InternalRecvFrom(buf, buf_len, address);
179 if (nread != ERR_IO_PENDING)
182 if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
183 socket_, true, base::MessageLoopForIO::WATCH_READ,
184 &read_socket_watcher_, &read_watcher_)) {
185 PLOG(ERROR) << "WatchFileDescriptor failed on read";
186 int result = MapSystemError(errno);
187 LogRead(result, NULL, 0, NULL);
192 read_buf_len_ = buf_len;
193 recv_from_address_ = address;
194 read_callback_ = callback;
195 return ERR_IO_PENDING;
198 int UDPSocketLibevent::Write(IOBuffer* buf,
200 const CompletionCallback& callback) {
201 return SendToOrWrite(buf, buf_len, NULL, callback);
204 int UDPSocketLibevent::SendTo(IOBuffer* buf,
206 const IPEndPoint& address,
207 const CompletionCallback& callback) {
208 return SendToOrWrite(buf, buf_len, &address, callback);
211 int UDPSocketLibevent::SendToOrWrite(IOBuffer* buf,
213 const IPEndPoint* address,
214 const CompletionCallback& callback) {
215 DCHECK(CalledOnValidThread());
216 DCHECK_NE(kInvalidSocket, socket_);
217 CHECK(write_callback_.is_null());
218 DCHECK(!callback.is_null()); // Synchronous operation not supported
219 DCHECK_GT(buf_len, 0);
221 int result = InternalSendTo(buf, buf_len, address);
222 if (result != ERR_IO_PENDING)
225 if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
226 socket_, true, base::MessageLoopForIO::WATCH_WRITE,
227 &write_socket_watcher_, &write_watcher_)) {
228 DVLOG(1) << "WatchFileDescriptor failed on write, errno " << errno;
229 int result = MapSystemError(errno);
230 LogWrite(result, NULL, NULL);
235 write_buf_len_ = buf_len;
236 DCHECK(!send_to_address_.get());
238 send_to_address_.reset(new IPEndPoint(*address));
240 write_callback_ = callback;
241 return ERR_IO_PENDING;
244 int UDPSocketLibevent::Connect(const IPEndPoint& address) {
245 net_log_.BeginEvent(NetLog::TYPE_UDP_CONNECT,
246 CreateNetLogUDPConnectCallback(&address));
247 int rv = InternalConnect(address);
250 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_UDP_CONNECT, rv);
254 int UDPSocketLibevent::InternalConnect(const IPEndPoint& address) {
255 DCHECK(CalledOnValidThread());
256 DCHECK(!is_connected());
257 DCHECK(!remote_address_.get());
258 int addr_family = address.GetSockAddrFamily();
259 int rv = CreateSocket(addr_family);
263 if (bind_type_ == DatagramSocket::RANDOM_BIND) {
264 // Construct IPAddressNumber of appropriate size (IPv4 or IPv6) of 0s,
265 // representing INADDR_ANY or in6addr_any.
267 addr_family == AF_INET ? kIPv4AddressSize : kIPv6AddressSize;
268 IPAddressNumber addr_any(addr_size);
269 rv = RandomBind(addr_any);
271 // else connect() does the DatagramSocket::DEFAULT_BIND
274 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.UdpSocketRandomBindErrorCode", -rv);
279 SockaddrStorage storage;
280 if (!address.ToSockAddr(storage.addr, &storage.addr_len)) {
282 return ERR_ADDRESS_INVALID;
285 rv = HANDLE_EINTR(connect(socket_, storage.addr, storage.addr_len));
287 // Close() may change the current errno. Map errno beforehand.
288 int result = MapSystemError(errno);
293 remote_address_.reset(new IPEndPoint(address));
297 int UDPSocketLibevent::Bind(const IPEndPoint& address) {
298 DCHECK(CalledOnValidThread());
299 DCHECK(!is_connected());
300 int rv = CreateSocket(address.GetSockAddrFamily());
304 rv = SetSocketOptions();
309 rv = DoBind(address);
314 local_address_.reset();
318 int UDPSocketLibevent::SetReceiveBufferSize(int32 size) {
319 DCHECK(CalledOnValidThread());
320 int rv = setsockopt(socket_, SOL_SOCKET, SO_RCVBUF,
321 reinterpret_cast<const char*>(&size), sizeof(size));
322 int last_error = errno;
323 DCHECK(!rv) << "Could not set socket receive buffer size: " << last_error;
324 return rv == 0 ? OK : MapSystemError(last_error);
327 int UDPSocketLibevent::SetSendBufferSize(int32 size) {
328 DCHECK(CalledOnValidThread());
329 int rv = setsockopt(socket_, SOL_SOCKET, SO_SNDBUF,
330 reinterpret_cast<const char*>(&size), sizeof(size));
331 int last_error = errno;
332 DCHECK(!rv) << "Could not set socket send buffer size: " << last_error;
333 return rv == 0 ? OK : MapSystemError(last_error);
336 void UDPSocketLibevent::AllowAddressReuse() {
337 DCHECK(CalledOnValidThread());
338 DCHECK(!is_connected());
340 socket_options_ |= SOCKET_OPTION_REUSE_ADDRESS;
343 void UDPSocketLibevent::AllowBroadcast() {
344 DCHECK(CalledOnValidThread());
345 DCHECK(!is_connected());
347 socket_options_ |= SOCKET_OPTION_BROADCAST;
350 void UDPSocketLibevent::ReadWatcher::OnFileCanReadWithoutBlocking(int) {
351 if (!socket_->read_callback_.is_null())
352 socket_->DidCompleteRead();
355 void UDPSocketLibevent::WriteWatcher::OnFileCanWriteWithoutBlocking(int) {
356 if (!socket_->write_callback_.is_null())
357 socket_->DidCompleteWrite();
360 void UDPSocketLibevent::DoReadCallback(int rv) {
361 DCHECK_NE(rv, ERR_IO_PENDING);
362 DCHECK(!read_callback_.is_null());
364 // since Run may result in Read being called, clear read_callback_ up front.
365 CompletionCallback c = read_callback_;
366 read_callback_.Reset();
370 void UDPSocketLibevent::DoWriteCallback(int rv) {
371 DCHECK_NE(rv, ERR_IO_PENDING);
372 DCHECK(!write_callback_.is_null());
374 // since Run may result in Write being called, clear write_callback_ up front.
375 CompletionCallback c = write_callback_;
376 write_callback_.Reset();
380 void UDPSocketLibevent::DidCompleteRead() {
382 InternalRecvFrom(read_buf_.get(), read_buf_len_, recv_from_address_);
383 if (result != ERR_IO_PENDING) {
386 recv_from_address_ = NULL;
387 bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
389 DoReadCallback(result);
393 void UDPSocketLibevent::LogRead(int result,
396 const sockaddr* addr) const {
398 net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_RECEIVE_ERROR, result);
402 if (net_log_.IsLogging()) {
403 DCHECK(addr_len > 0);
407 bool is_address_valid = address.FromSockAddr(addr, addr_len);
409 NetLog::TYPE_UDP_BYTES_RECEIVED,
410 CreateNetLogUDPDataTranferCallback(
412 is_address_valid ? &address : NULL));
415 base::StatsCounter read_bytes("udp.read_bytes");
416 read_bytes.Add(result);
419 int UDPSocketLibevent::CreateSocket(int addr_family) {
420 addr_family_ = addr_family;
421 socket_ = CreatePlatformSocket(addr_family_, SOCK_DGRAM, 0);
422 if (socket_ == kInvalidSocket)
423 return MapSystemError(errno);
424 if (SetNonBlocking(socket_)) {
425 const int err = MapSystemError(errno);
432 void UDPSocketLibevent::DidCompleteWrite() {
434 InternalSendTo(write_buf_.get(), write_buf_len_, send_to_address_.get());
436 if (result != ERR_IO_PENDING) {
439 send_to_address_.reset();
440 write_socket_watcher_.StopWatchingFileDescriptor();
441 DoWriteCallback(result);
445 void UDPSocketLibevent::LogWrite(int result,
447 const IPEndPoint* address) const {
449 net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_SEND_ERROR, result);
453 if (net_log_.IsLogging()) {
455 NetLog::TYPE_UDP_BYTES_SENT,
456 CreateNetLogUDPDataTranferCallback(result, bytes, address));
459 base::StatsCounter write_bytes("udp.write_bytes");
460 write_bytes.Add(result);
463 int UDPSocketLibevent::InternalRecvFrom(IOBuffer* buf, int buf_len,
464 IPEndPoint* address) {
465 int bytes_transferred;
468 SockaddrStorage storage;
471 HANDLE_EINTR(recvfrom(socket_,
478 if (bytes_transferred >= 0) {
479 result = bytes_transferred;
480 if (address && !address->FromSockAddr(storage.addr, storage.addr_len))
481 result = ERR_ADDRESS_INVALID;
483 result = MapSystemError(errno);
485 if (result != ERR_IO_PENDING)
486 LogRead(result, buf->data(), storage.addr_len, storage.addr);
490 int UDPSocketLibevent::InternalSendTo(IOBuffer* buf, int buf_len,
491 const IPEndPoint* address) {
492 SockaddrStorage storage;
493 struct sockaddr* addr = storage.addr;
496 storage.addr_len = 0;
498 if (!address->ToSockAddr(storage.addr, &storage.addr_len)) {
499 int result = ERR_ADDRESS_INVALID;
500 LogWrite(result, NULL, NULL);
505 int result = HANDLE_EINTR(sendto(socket_,
512 result = MapSystemError(errno);
513 if (result != ERR_IO_PENDING)
514 LogWrite(result, buf->data(), address);
518 int UDPSocketLibevent::SetSocketOptions() {
520 if (socket_options_ & SOCKET_OPTION_REUSE_ADDRESS) {
521 int rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &true_value,
524 return MapSystemError(errno);
526 if (socket_options_ & SOCKET_OPTION_BROADCAST) {
528 #if defined(OS_MACOSX)
529 // SO_REUSEPORT on OSX permits multiple processes to each receive
530 // UDP multicast or broadcast datagrams destined for the bound
532 rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &true_value,
535 rv = setsockopt(socket_, SOL_SOCKET, SO_BROADCAST, &true_value,
537 #endif // defined(OS_MACOSX)
539 return MapSystemError(errno);
542 if (!(socket_options_ & SOCKET_OPTION_MULTICAST_LOOP)) {
544 if (addr_family_ == AF_INET) {
546 rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_LOOP,
547 &loop, sizeof(loop));
550 rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
551 &loop, sizeof(loop));
554 return MapSystemError(errno);
556 if (multicast_time_to_live_ != IP_DEFAULT_MULTICAST_TTL) {
558 if (addr_family_ == AF_INET) {
559 u_char ttl = multicast_time_to_live_;
560 rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_TTL,
563 // Signed integer. -1 to use route default.
564 int ttl = multicast_time_to_live_;
565 rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
569 return MapSystemError(errno);
571 if (multicast_interface_ != 0) {
572 switch (addr_family_) {
574 #if !defined(OS_MACOSX)
576 mreq.imr_ifindex = multicast_interface_;
577 mreq.imr_address.s_addr = htonl(INADDR_ANY);
580 int error = GetIPv4AddressFromIndex(socket_, multicast_interface_,
581 &mreq.imr_interface.s_addr);
585 int rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_IF,
586 reinterpret_cast<const char*>(&mreq), sizeof(mreq));
588 return MapSystemError(errno);
592 uint32 interface_index = multicast_interface_;
593 int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_IF,
594 reinterpret_cast<const char*>(&interface_index),
595 sizeof(interface_index));
597 return MapSystemError(errno);
601 NOTREACHED() << "Invalid address family";
602 return ERR_ADDRESS_INVALID;
608 int UDPSocketLibevent::DoBind(const IPEndPoint& address) {
609 SockaddrStorage storage;
610 if (!address.ToSockAddr(storage.addr, &storage.addr_len))
611 return ERR_ADDRESS_INVALID;
612 int rv = bind(socket_, storage.addr, storage.addr_len);
615 int last_error = errno;
616 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.UdpSocketBindErrorFromPosix", last_error);
617 #if defined(OS_CHROMEOS)
618 if (last_error == EINVAL)
619 return ERR_ADDRESS_IN_USE;
620 #elif defined(OS_MACOSX)
621 if (last_error == EADDRNOTAVAIL)
622 return ERR_ADDRESS_IN_USE;
624 return MapSystemError(last_error);
627 int UDPSocketLibevent::RandomBind(const IPAddressNumber& address) {
628 DCHECK(bind_type_ == DatagramSocket::RANDOM_BIND && !rand_int_cb_.is_null());
630 for (int i = 0; i < kBindRetries; ++i) {
631 int rv = DoBind(IPEndPoint(address,
632 rand_int_cb_.Run(kPortStart, kPortEnd)));
633 if (rv == OK || rv != ERR_ADDRESS_IN_USE)
636 return DoBind(IPEndPoint(address, 0));
639 int UDPSocketLibevent::JoinGroup(const IPAddressNumber& group_address) const {
640 DCHECK(CalledOnValidThread());
642 return ERR_SOCKET_NOT_CONNECTED;
644 switch (group_address.size()) {
645 case kIPv4AddressSize: {
646 if (addr_family_ != AF_INET)
647 return ERR_ADDRESS_INVALID;
649 #if !defined(OS_MACOSX)
651 mreq.imr_ifindex = multicast_interface_;
652 mreq.imr_address.s_addr = htonl(INADDR_ANY);
655 int error = GetIPv4AddressFromIndex(socket_, multicast_interface_,
656 &mreq.imr_interface.s_addr);
660 memcpy(&mreq.imr_multiaddr, &group_address[0], kIPv4AddressSize);
661 int rv = setsockopt(socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP,
662 &mreq, sizeof(mreq));
664 return MapSystemError(errno);
667 case kIPv6AddressSize: {
668 if (addr_family_ != AF_INET6)
669 return ERR_ADDRESS_INVALID;
671 mreq.ipv6mr_interface = multicast_interface_;
672 memcpy(&mreq.ipv6mr_multiaddr, &group_address[0], kIPv6AddressSize);
673 int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_JOIN_GROUP,
674 &mreq, sizeof(mreq));
676 return MapSystemError(errno);
680 NOTREACHED() << "Invalid address family";
681 return ERR_ADDRESS_INVALID;
685 int UDPSocketLibevent::LeaveGroup(const IPAddressNumber& group_address) const {
686 DCHECK(CalledOnValidThread());
689 return ERR_SOCKET_NOT_CONNECTED;
691 switch (group_address.size()) {
692 case kIPv4AddressSize: {
693 if (addr_family_ != AF_INET)
694 return ERR_ADDRESS_INVALID;
696 mreq.imr_interface.s_addr = INADDR_ANY;
697 memcpy(&mreq.imr_multiaddr, &group_address[0], kIPv4AddressSize);
698 int rv = setsockopt(socket_, IPPROTO_IP, IP_DROP_MEMBERSHIP,
699 &mreq, sizeof(mreq));
701 return MapSystemError(errno);
704 case kIPv6AddressSize: {
705 if (addr_family_ != AF_INET6)
706 return ERR_ADDRESS_INVALID;
708 mreq.ipv6mr_interface = 0; // 0 indicates default multicast interface.
709 memcpy(&mreq.ipv6mr_multiaddr, &group_address[0], kIPv6AddressSize);
710 int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_LEAVE_GROUP,
711 &mreq, sizeof(mreq));
713 return MapSystemError(errno);
717 NOTREACHED() << "Invalid address family";
718 return ERR_ADDRESS_INVALID;
722 int UDPSocketLibevent::SetMulticastInterface(uint32 interface_index) {
723 DCHECK(CalledOnValidThread());
725 return ERR_SOCKET_IS_CONNECTED;
726 multicast_interface_ = interface_index;
730 int UDPSocketLibevent::SetMulticastTimeToLive(int time_to_live) {
731 DCHECK(CalledOnValidThread());
733 return ERR_SOCKET_IS_CONNECTED;
735 if (time_to_live < 0 || time_to_live > 255)
736 return ERR_INVALID_ARGUMENT;
737 multicast_time_to_live_ = time_to_live;
741 int UDPSocketLibevent::SetMulticastLoopbackMode(bool loopback) {
742 DCHECK(CalledOnValidThread());
744 return ERR_SOCKET_IS_CONNECTED;
747 socket_options_ |= SOCKET_OPTION_MULTICAST_LOOP;
749 socket_options_ &= ~SOCKET_OPTION_MULTICAST_LOOP;
753 int UDPSocketLibevent::SetDiffServCodePoint(DiffServCodePoint dscp) {
754 if (dscp == DSCP_NO_CHANGE) {
758 int dscp_and_ecn = dscp << 2;
759 if (addr_family_ == AF_INET) {
760 rv = setsockopt(socket_, IPPROTO_IP, IP_TOS,
761 &dscp_and_ecn, sizeof(dscp_and_ecn));
763 rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_TCLASS,
764 &dscp_and_ecn, sizeof(dscp_and_ecn));
767 return MapSystemError(errno);
772 void UDPSocketLibevent::DetachFromThread() {
773 base::NonThreadSafe::DetachFromThread();