1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "nacl_io/socket/udp_node.h"
12 #include "nacl_io/log.h"
13 #include "nacl_io/pepper_interface.h"
14 #include "nacl_io/socket/packet.h"
15 #include "nacl_io/socket/udp_event_emitter.h"
16 #include "nacl_io/stream/stream_fs.h"
19 const size_t kMaxPacketSize = 65536;
20 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
25 class UdpWork : public StreamFs::Work {
27 explicit UdpWork(const ScopedUdpEventEmitter& emitter)
28 : StreamFs::Work(emitter->stream()->stream()),
32 ~UdpWork() { delete packet_; }
34 UDPSocketInterface* UDPInterface() {
35 return filesystem()->ppapi()->GetUDPSocketInterface();
39 ScopedUdpEventEmitter emitter_;
43 class UdpSendWork : public UdpWork {
45 explicit UdpSendWork(const ScopedUdpEventEmitter& emitter,
46 const ScopedSocketNode& node)
47 : UdpWork(emitter), node_(node) {}
49 virtual bool Start(int32_t val) {
50 AUTO_LOCK(emitter_->GetLock());
52 // Does the stream exist, and can it send?
53 if (!node_->TestStreamFlags(SSF_CAN_SEND))
56 // Check if we are already sending.
57 if (node_->TestStreamFlags(SSF_SENDING))
60 packet_ = emitter_->ReadTXPacket_Locked();
64 int err = UDPInterface()->SendTo(node_->socket_resource(),
68 filesystem()->GetRunCompletion(this));
69 if (err != PP_OK_COMPLETIONPENDING) {
70 // Anything else, we should assume the socket has gone bad.
71 node_->SetError_Locked(err);
75 node_->SetStreamFlags(SSF_SENDING);
79 virtual void Run(int32_t length_error) {
80 AUTO_LOCK(emitter_->GetLock());
82 if (length_error < 0) {
83 node_->SetError_Locked(length_error);
87 // If we did send, then Q more work.
88 node_->ClearStreamFlags(SSF_SENDING);
93 // We assume that transmits will always complete. If the upstream
94 // actually back pressures, enough to prevent the Send callback
95 // from triggering, this resource may never go away.
96 ScopedSocketNode node_;
99 class UdpRecvWork : public UdpWork {
101 explicit UdpRecvWork(const ScopedUdpEventEmitter& emitter)
105 virtual bool Start(int32_t val) {
106 AUTO_LOCK(emitter_->GetLock());
107 UdpNode* stream = static_cast<UdpNode*>(emitter_->stream());
109 // Does the stream exist, and can it recv?
110 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
113 // Check if we are already receiving.
114 if (stream->TestStreamFlags(SSF_RECVING))
117 stream->SetStreamFlags(SSF_RECVING);
118 int err = UDPInterface()->RecvFrom(stream->socket_resource(),
122 filesystem()->GetRunCompletion(this));
123 if (err != PP_OK_COMPLETIONPENDING) {
124 stream->SetError_Locked(err);
131 virtual void Run(int32_t length_error) {
132 AUTO_LOCK(emitter_->GetLock());
133 UdpNode* stream = static_cast<UdpNode*>(emitter_->stream());
137 // On successful receive we queue more input
138 if (length_error > 0) {
139 Packet* packet = new Packet(filesystem()->ppapi());
140 packet->Copy(data_, length_error, addr_);
141 filesystem()->ppapi()->ReleaseResource(addr_);
142 emitter_->WriteRXPacket_Locked(packet);
143 stream->ClearStreamFlags(SSF_RECVING);
144 stream->QueueInput();
146 stream->SetError_Locked(length_error);
151 char data_[kMaxPacketSize];
155 UdpNode::UdpNode(Filesystem* filesystem)
156 : SocketNode(filesystem),
157 emitter_(new UdpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)) {
158 emitter_->AttachStream(this);
161 void UdpNode::Destroy() {
162 emitter_->DetachStream();
163 SocketNode::Destroy();
166 UdpEventEmitter* UdpNode::GetEventEmitter() {
167 return emitter_.get();
170 Error UdpNode::Init(int open_flags) {
171 Error err = SocketNode::Init(open_flags);
175 if (UDPInterface() == NULL) {
176 LOG_ERROR("Got NULL interface: UDP");
181 UDPInterface()->Create(filesystem_->ppapi()->GetInstance());
182 if (0 == socket_resource_) {
183 LOG_ERROR("Unable to create UDP resource.");
190 void UdpNode::QueueInput() {
191 UdpRecvWork* work = new UdpRecvWork(emitter_);
192 stream()->EnqueueWork(work);
195 void UdpNode::QueueOutput() {
196 if (!TestStreamFlags(SSF_CAN_SEND))
199 if (TestStreamFlags(SSF_SENDING))
202 UdpSendWork* work = new UdpSendWork(emitter_, ScopedSocketNode(this));
203 stream()->EnqueueWork(work);
206 Error UdpNode::SetSockOpt(int lvl,
210 if (lvl == SOL_SOCKET && optname == SO_RCVBUF) {
211 if (static_cast<size_t>(len) < sizeof(int))
213 AUTO_LOCK(node_lock_);
214 int bufsize = *static_cast<const int*>(optval);
216 UDPInterface()->SetOption(socket_resource_,
217 PP_UDPSOCKET_OPTION_RECV_BUFFER_SIZE,
218 PP_MakeInt32(bufsize),
219 PP_BlockUntilComplete());
220 return PPErrorToErrno(error);
221 } else if (lvl == SOL_SOCKET && optname == SO_SNDBUF) {
222 if (static_cast<size_t>(len) < sizeof(int))
224 AUTO_LOCK(node_lock_);
225 int bufsize = *static_cast<const int*>(optval);
227 UDPInterface()->SetOption(socket_resource_,
228 PP_UDPSOCKET_OPTION_SEND_BUFFER_SIZE,
229 PP_MakeInt32(bufsize),
230 PP_BlockUntilComplete());
231 return PPErrorToErrno(error);
234 return SocketNode::SetSockOpt(lvl, optname, optval, len);
237 Error UdpNode::Bind(const struct sockaddr* addr, socklen_t len) {
238 if (0 == socket_resource_)
241 /* Only bind once. */
245 PP_Resource out_addr = SockAddrToResource(addr, len);
250 UDPInterface()->Bind(socket_resource_, out_addr, PP_BlockUntilComplete());
251 filesystem_->ppapi()->ReleaseResource(out_addr);
253 return PPErrorToErrno(err);
255 // Get the address that was actually bound (in case addr was 0.0.0.0:0).
256 out_addr = UDPInterface()->GetBoundAddress(socket_resource_);
260 // Now that we are bound, we can start sending and receiving.
261 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
264 local_addr_ = out_addr;
268 Error UdpNode::Connect(const HandleAttr& attr,
269 const struct sockaddr* addr,
271 if (0 == socket_resource_)
274 /* Connect for UDP is the default dest, it's legal to change it. */
275 if (remote_addr_ != 0) {
276 filesystem_->ppapi()->ReleaseResource(remote_addr_);
280 remote_addr_ = SockAddrToResource(addr, len);
281 if (0 == remote_addr_)
287 Error UdpNode::Recv_Locked(void* buf,
289 PP_Resource* out_addr,
291 Packet* packet = emitter_->ReadRXPacket_Locked();
296 int capped_len = static_cast<int32_t>(std::min<int>(len, packet->len()));
297 memcpy(buf, packet->buffer(), capped_len);
299 if (packet->addr() != 0) {
300 filesystem_->ppapi()->AddRefResource(packet->addr());
301 *out_addr = packet->addr();
304 *out_len = capped_len;
309 // Should never happen, Recv_Locked should not be called
310 // unless already in a POLLIN state.
314 Error UdpNode::Send_Locked(const void* buf,
319 // Pepper requires a socket to be bound before it can send.
321 addr.sin_family = AF_INET;
323 memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
325 Bind(reinterpret_cast<const struct sockaddr*>(&addr), sizeof(addr));
331 int capped_len = static_cast<int32_t>(std::min<int>(len, kMaxPacketSize));
332 Packet* packet = new Packet(filesystem_->ppapi());
333 packet->Copy(buf, capped_len, addr);
335 emitter_->WriteTXPacket_Locked(packet);
336 *out_len = capped_len;
340 } // namespace nacl_io