1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/udp/udp_socket_libevent.h"
10 #include <sys/socket.h>
11 #include <netinet/in.h>
13 #include "base/callback.h"
14 #include "base/logging.h"
15 #include "base/message_loop/message_loop.h"
16 #include "base/metrics/stats_counters.h"
17 #include "base/posix/eintr_wrapper.h"
18 #include "base/rand_util.h"
19 #include "net/base/io_buffer.h"
20 #include "net/base/ip_endpoint.h"
21 #include "net/base/net_errors.h"
22 #include "net/base/net_log.h"
23 #include "net/base/net_util.h"
24 #include "net/socket/socket_descriptor.h"
25 #include "net/udp/udp_net_log_parameters.h"
29 const int kBindRetries = 10;
30 const int kPortStart = 1024;
31 const int kPortEnd = 65535;
37 UDPSocketLibevent::UDPSocketLibevent(
38 DatagramSocket::BindType bind_type,
39 const RandIntCallback& rand_int_cb,
41 const net::NetLog::Source& source)
42 : socket_(kInvalidSocket),
44 socket_options_(SOCKET_OPTION_MULTICAST_LOOP),
45 multicast_time_to_live_(1),
46 bind_type_(bind_type),
47 rand_int_cb_(rand_int_cb),
51 recv_from_address_(NULL),
53 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_UDP_SOCKET)) {
54 net_log_.BeginEvent(NetLog::TYPE_SOCKET_ALIVE,
55 source.ToEventParametersCallback());
56 if (bind_type == DatagramSocket::RANDOM_BIND)
57 DCHECK(!rand_int_cb.is_null());
60 UDPSocketLibevent::~UDPSocketLibevent() {
62 net_log_.EndEvent(NetLog::TYPE_SOCKET_ALIVE);
65 void UDPSocketLibevent::Close() {
66 DCHECK(CalledOnValidThread());
71 // Zero out any pending read/write callback state.
74 read_callback_.Reset();
75 recv_from_address_ = NULL;
78 write_callback_.Reset();
79 send_to_address_.reset();
81 bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
83 ok = write_socket_watcher_.StopWatchingFileDescriptor();
86 if (HANDLE_EINTR(close(socket_)) < 0)
87 PLOG(ERROR) << "close";
89 socket_ = kInvalidSocket;
93 int UDPSocketLibevent::GetPeerAddress(IPEndPoint* address) const {
94 DCHECK(CalledOnValidThread());
97 return ERR_SOCKET_NOT_CONNECTED;
99 if (!remote_address_.get()) {
100 SockaddrStorage storage;
101 if (getpeername(socket_, storage.addr, &storage.addr_len))
102 return MapSystemError(errno);
103 scoped_ptr<IPEndPoint> address(new IPEndPoint());
104 if (!address->FromSockAddr(storage.addr, storage.addr_len))
106 remote_address_.reset(address.release());
109 *address = *remote_address_;
113 int UDPSocketLibevent::GetLocalAddress(IPEndPoint* address) const {
114 DCHECK(CalledOnValidThread());
117 return ERR_SOCKET_NOT_CONNECTED;
119 if (!local_address_.get()) {
120 SockaddrStorage storage;
121 if (getsockname(socket_, storage.addr, &storage.addr_len))
122 return MapSystemError(errno);
123 scoped_ptr<IPEndPoint> address(new IPEndPoint());
124 if (!address->FromSockAddr(storage.addr, storage.addr_len))
126 local_address_.reset(address.release());
127 net_log_.AddEvent(NetLog::TYPE_UDP_LOCAL_ADDRESS,
128 CreateNetLogUDPConnectCallback(local_address_.get()));
131 *address = *local_address_;
135 int UDPSocketLibevent::Read(IOBuffer* buf,
137 const CompletionCallback& callback) {
138 return RecvFrom(buf, buf_len, NULL, callback);
141 int UDPSocketLibevent::RecvFrom(IOBuffer* buf,
144 const CompletionCallback& callback) {
145 DCHECK(CalledOnValidThread());
146 DCHECK_NE(kInvalidSocket, socket_);
147 DCHECK(read_callback_.is_null());
148 DCHECK(!recv_from_address_);
149 DCHECK(!callback.is_null()); // Synchronous operation not supported
150 DCHECK_GT(buf_len, 0);
152 int nread = InternalRecvFrom(buf, buf_len, address);
153 if (nread != ERR_IO_PENDING)
156 if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
157 socket_, true, base::MessageLoopForIO::WATCH_READ,
158 &read_socket_watcher_, &read_watcher_)) {
159 PLOG(ERROR) << "WatchFileDescriptor failed on read";
160 int result = MapSystemError(errno);
161 LogRead(result, NULL, 0, NULL);
166 read_buf_len_ = buf_len;
167 recv_from_address_ = address;
168 read_callback_ = callback;
169 return ERR_IO_PENDING;
172 int UDPSocketLibevent::Write(IOBuffer* buf,
174 const CompletionCallback& callback) {
175 return SendToOrWrite(buf, buf_len, NULL, callback);
178 int UDPSocketLibevent::SendTo(IOBuffer* buf,
180 const IPEndPoint& address,
181 const CompletionCallback& callback) {
182 return SendToOrWrite(buf, buf_len, &address, callback);
185 int UDPSocketLibevent::SendToOrWrite(IOBuffer* buf,
187 const IPEndPoint* address,
188 const CompletionCallback& callback) {
189 DCHECK(CalledOnValidThread());
190 DCHECK_NE(kInvalidSocket, socket_);
191 DCHECK(write_callback_.is_null());
192 DCHECK(!callback.is_null()); // Synchronous operation not supported
193 DCHECK_GT(buf_len, 0);
195 int result = InternalSendTo(buf, buf_len, address);
196 if (result != ERR_IO_PENDING)
199 if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
200 socket_, true, base::MessageLoopForIO::WATCH_WRITE,
201 &write_socket_watcher_, &write_watcher_)) {
202 DVLOG(1) << "WatchFileDescriptor failed on write, errno " << errno;
203 int result = MapSystemError(errno);
204 LogWrite(result, NULL, NULL);
209 write_buf_len_ = buf_len;
210 DCHECK(!send_to_address_.get());
212 send_to_address_.reset(new IPEndPoint(*address));
214 write_callback_ = callback;
215 return ERR_IO_PENDING;
218 int UDPSocketLibevent::Connect(const IPEndPoint& address) {
219 net_log_.BeginEvent(NetLog::TYPE_UDP_CONNECT,
220 CreateNetLogUDPConnectCallback(&address));
221 int rv = InternalConnect(address);
224 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_UDP_CONNECT, rv);
228 int UDPSocketLibevent::InternalConnect(const IPEndPoint& address) {
229 DCHECK(CalledOnValidThread());
230 DCHECK(!is_connected());
231 DCHECK(!remote_address_.get());
232 int rv = CreateSocket(address);
236 if (bind_type_ == DatagramSocket::RANDOM_BIND)
237 rv = RandomBind(address);
238 // else connect() does the DatagramSocket::DEFAULT_BIND
245 SockaddrStorage storage;
246 if (!address.ToSockAddr(storage.addr, &storage.addr_len)) {
248 return ERR_ADDRESS_INVALID;
251 rv = HANDLE_EINTR(connect(socket_, storage.addr, storage.addr_len));
253 // Close() may change the current errno. Map errno beforehand.
254 int result = MapSystemError(errno);
259 remote_address_.reset(new IPEndPoint(address));
263 int UDPSocketLibevent::Bind(const IPEndPoint& address) {
264 DCHECK(CalledOnValidThread());
265 DCHECK(!is_connected());
266 int rv = CreateSocket(address);
270 rv = SetSocketOptions();
275 rv = DoBind(address);
280 local_address_.reset();
284 bool UDPSocketLibevent::SetReceiveBufferSize(int32 size) {
285 DCHECK(CalledOnValidThread());
286 int rv = setsockopt(socket_, SOL_SOCKET, SO_RCVBUF,
287 reinterpret_cast<const char*>(&size), sizeof(size));
288 DCHECK(!rv) << "Could not set socket receive buffer size: " << errno;
292 bool UDPSocketLibevent::SetSendBufferSize(int32 size) {
293 DCHECK(CalledOnValidThread());
294 int rv = setsockopt(socket_, SOL_SOCKET, SO_SNDBUF,
295 reinterpret_cast<const char*>(&size), sizeof(size));
296 DCHECK(!rv) << "Could not set socket send buffer size: " << errno;
300 void UDPSocketLibevent::AllowAddressReuse() {
301 DCHECK(CalledOnValidThread());
302 DCHECK(!is_connected());
304 socket_options_ |= SOCKET_OPTION_REUSE_ADDRESS;
307 void UDPSocketLibevent::AllowBroadcast() {
308 DCHECK(CalledOnValidThread());
309 DCHECK(!is_connected());
311 socket_options_ |= SOCKET_OPTION_BROADCAST;
314 void UDPSocketLibevent::ReadWatcher::OnFileCanReadWithoutBlocking(int) {
315 if (!socket_->read_callback_.is_null())
316 socket_->DidCompleteRead();
319 void UDPSocketLibevent::WriteWatcher::OnFileCanWriteWithoutBlocking(int) {
320 if (!socket_->write_callback_.is_null())
321 socket_->DidCompleteWrite();
324 void UDPSocketLibevent::DoReadCallback(int rv) {
325 DCHECK_NE(rv, ERR_IO_PENDING);
326 DCHECK(!read_callback_.is_null());
328 // since Run may result in Read being called, clear read_callback_ up front.
329 CompletionCallback c = read_callback_;
330 read_callback_.Reset();
334 void UDPSocketLibevent::DoWriteCallback(int rv) {
335 DCHECK_NE(rv, ERR_IO_PENDING);
336 DCHECK(!write_callback_.is_null());
338 // since Run may result in Write being called, clear write_callback_ up front.
339 CompletionCallback c = write_callback_;
340 write_callback_.Reset();
344 void UDPSocketLibevent::DidCompleteRead() {
346 InternalRecvFrom(read_buf_.get(), read_buf_len_, recv_from_address_);
347 if (result != ERR_IO_PENDING) {
350 recv_from_address_ = NULL;
351 bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
353 DoReadCallback(result);
357 void UDPSocketLibevent::LogRead(int result,
360 const sockaddr* addr) const {
362 net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_RECEIVE_ERROR, result);
366 if (net_log_.IsLoggingAllEvents()) {
367 DCHECK(addr_len > 0);
371 bool is_address_valid = address.FromSockAddr(addr, addr_len);
373 NetLog::TYPE_UDP_BYTES_RECEIVED,
374 CreateNetLogUDPDataTranferCallback(
376 is_address_valid ? &address : NULL));
379 base::StatsCounter read_bytes("udp.read_bytes");
380 read_bytes.Add(result);
383 int UDPSocketLibevent::CreateSocket(const IPEndPoint& address) {
384 addr_family_ = address.GetSockAddrFamily();
385 socket_ = CreatePlatformSocket(addr_family_, SOCK_DGRAM, 0);
386 if (socket_ == kInvalidSocket)
387 return MapSystemError(errno);
388 if (SetNonBlocking(socket_)) {
389 const int err = MapSystemError(errno);
396 void UDPSocketLibevent::DidCompleteWrite() {
398 InternalSendTo(write_buf_.get(), write_buf_len_, send_to_address_.get());
400 if (result != ERR_IO_PENDING) {
403 send_to_address_.reset();
404 write_socket_watcher_.StopWatchingFileDescriptor();
405 DoWriteCallback(result);
409 void UDPSocketLibevent::LogWrite(int result,
411 const IPEndPoint* address) const {
413 net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_SEND_ERROR, result);
417 if (net_log_.IsLoggingAllEvents()) {
419 NetLog::TYPE_UDP_BYTES_SENT,
420 CreateNetLogUDPDataTranferCallback(result, bytes, address));
423 base::StatsCounter write_bytes("udp.write_bytes");
424 write_bytes.Add(result);
427 int UDPSocketLibevent::InternalRecvFrom(IOBuffer* buf, int buf_len,
428 IPEndPoint* address) {
429 int bytes_transferred;
432 SockaddrStorage storage;
435 HANDLE_EINTR(recvfrom(socket_,
442 if (bytes_transferred >= 0) {
443 result = bytes_transferred;
444 if (address && !address->FromSockAddr(storage.addr, storage.addr_len))
447 result = MapSystemError(errno);
449 if (result != ERR_IO_PENDING)
450 LogRead(result, buf->data(), storage.addr_len, storage.addr);
454 int UDPSocketLibevent::InternalSendTo(IOBuffer* buf, int buf_len,
455 const IPEndPoint* address) {
456 SockaddrStorage storage;
457 struct sockaddr* addr = storage.addr;
460 storage.addr_len = 0;
462 if (!address->ToSockAddr(storage.addr, &storage.addr_len)) {
463 int result = ERR_FAILED;
464 LogWrite(result, NULL, NULL);
469 int result = HANDLE_EINTR(sendto(socket_,
476 result = MapSystemError(errno);
477 if (result != ERR_IO_PENDING)
478 LogWrite(result, buf->data(), address);
482 int UDPSocketLibevent::SetSocketOptions() {
484 if (socket_options_ & SOCKET_OPTION_REUSE_ADDRESS) {
485 int rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &true_value,
488 return MapSystemError(errno);
490 if (socket_options_ & SOCKET_OPTION_BROADCAST) {
492 #if defined(OS_MACOSX)
493 // SO_REUSEPORT on OSX permits multiple processes to each receive
494 // UDP multicast or broadcast datagrams destined for the bound
496 rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &true_value,
499 rv = setsockopt(socket_, SOL_SOCKET, SO_BROADCAST, &true_value,
501 #endif // defined(OS_MACOSX)
503 return MapSystemError(errno);
506 if (!(socket_options_ & SOCKET_OPTION_MULTICAST_LOOP)) {
508 if (addr_family_ == AF_INET) {
510 rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_LOOP,
511 &loop, sizeof(loop));
514 rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
515 &loop, sizeof(loop));
518 return MapSystemError(errno);
520 if (multicast_time_to_live_ != IP_DEFAULT_MULTICAST_TTL) {
522 if (addr_family_ == AF_INET) {
523 u_char ttl = multicast_time_to_live_;
524 rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_TTL,
527 // Signed interger. -1 to use route default.
528 int ttl = multicast_time_to_live_;
529 rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
533 return MapSystemError(errno);
538 int UDPSocketLibevent::DoBind(const IPEndPoint& address) {
539 SockaddrStorage storage;
540 if (!address.ToSockAddr(storage.addr, &storage.addr_len))
541 return ERR_ADDRESS_INVALID;
542 int rv = bind(socket_, storage.addr, storage.addr_len);
543 return rv < 0 ? MapSystemError(errno) : rv;
546 int UDPSocketLibevent::RandomBind(const IPEndPoint& address) {
547 DCHECK(bind_type_ == DatagramSocket::RANDOM_BIND && !rand_int_cb_.is_null());
549 // Construct IPAddressNumber of appropriate size (IPv4 or IPv6) of 0s.
550 IPAddressNumber ip(address.address().size());
552 for (int i = 0; i < kBindRetries; ++i) {
553 int rv = DoBind(IPEndPoint(ip, rand_int_cb_.Run(kPortStart, kPortEnd)));
554 if (rv == OK || rv != ERR_ADDRESS_IN_USE)
557 return DoBind(IPEndPoint(ip, 0));
560 int UDPSocketLibevent::JoinGroup(const IPAddressNumber& group_address) const {
561 DCHECK(CalledOnValidThread());
563 return ERR_SOCKET_NOT_CONNECTED;
565 switch (group_address.size()) {
566 case kIPv4AddressSize: {
567 if (addr_family_ != AF_INET)
568 return ERR_ADDRESS_INVALID;
570 mreq.imr_interface.s_addr = INADDR_ANY;
571 memcpy(&mreq.imr_multiaddr, &group_address[0], kIPv4AddressSize);
572 int rv = setsockopt(socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP,
573 &mreq, sizeof(mreq));
575 return MapSystemError(errno);
578 case kIPv6AddressSize: {
579 if (addr_family_ != AF_INET6)
580 return ERR_ADDRESS_INVALID;
582 mreq.ipv6mr_interface = 0; // 0 indicates default multicast interface.
583 memcpy(&mreq.ipv6mr_multiaddr, &group_address[0], kIPv6AddressSize);
584 int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_JOIN_GROUP,
585 &mreq, sizeof(mreq));
587 return MapSystemError(errno);
591 NOTREACHED() << "Invalid address family";
592 return ERR_ADDRESS_INVALID;
596 int UDPSocketLibevent::LeaveGroup(const IPAddressNumber& group_address) const {
597 DCHECK(CalledOnValidThread());
600 return ERR_SOCKET_NOT_CONNECTED;
602 switch (group_address.size()) {
603 case kIPv4AddressSize: {
604 if (addr_family_ != AF_INET)
605 return ERR_ADDRESS_INVALID;
607 mreq.imr_interface.s_addr = INADDR_ANY;
608 memcpy(&mreq.imr_multiaddr, &group_address[0], kIPv4AddressSize);
609 int rv = setsockopt(socket_, IPPROTO_IP, IP_DROP_MEMBERSHIP,
610 &mreq, sizeof(mreq));
612 return MapSystemError(errno);
615 case kIPv6AddressSize: {
616 if (addr_family_ != AF_INET6)
617 return ERR_ADDRESS_INVALID;
619 mreq.ipv6mr_interface = 0; // 0 indicates default multicast interface.
620 memcpy(&mreq.ipv6mr_multiaddr, &group_address[0], kIPv6AddressSize);
621 int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_LEAVE_GROUP,
622 &mreq, sizeof(mreq));
624 return MapSystemError(errno);
628 NOTREACHED() << "Invalid address family";
629 return ERR_ADDRESS_INVALID;
633 int UDPSocketLibevent::SetMulticastTimeToLive(int time_to_live) {
634 DCHECK(CalledOnValidThread());
636 return ERR_SOCKET_IS_CONNECTED;
638 if (time_to_live < 0 || time_to_live > 255)
639 return ERR_INVALID_ARGUMENT;
640 multicast_time_to_live_ = time_to_live;
644 int UDPSocketLibevent::SetMulticastLoopbackMode(bool loopback) {
645 DCHECK(CalledOnValidThread());
647 return ERR_SOCKET_IS_CONNECTED;
650 socket_options_ |= SOCKET_OPTION_MULTICAST_LOOP;
652 socket_options_ &= ~SOCKET_OPTION_MULTICAST_LOOP;
656 int UDPSocketLibevent::SetDiffServCodePoint(DiffServCodePoint dscp) {
657 if (dscp == DSCP_NO_CHANGE) {
661 int dscp_and_ecn = dscp << 2;
662 if (addr_family_ == AF_INET) {
663 rv = setsockopt(socket_, IPPROTO_IP, IP_TOS,
664 &dscp_and_ecn, sizeof(dscp_and_ecn));
666 rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_TCLASS,
667 &dscp_and_ecn, sizeof(dscp_and_ecn));
670 return MapSystemError(errno);