1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "nacl_io/socket/udp_node.h"
12 #include "nacl_io/log.h"
13 #include "nacl_io/pepper_interface.h"
14 #include "nacl_io/socket/packet.h"
15 #include "nacl_io/socket/udp_event_emitter.h"
16 #include "nacl_io/stream/stream_fs.h"
19 const size_t kMaxPacketSize = 65536;
20 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
25 class UdpWork : public StreamFs::Work {
27 explicit UdpWork(const ScopedUdpEventEmitter& emitter)
28 : StreamFs::Work(emitter->stream()->stream()),
32 ~UdpWork() { delete packet_; }
34 UDPSocketInterface* UDPInterface() {
35 return filesystem()->ppapi()->GetUDPSocketInterface();
39 ScopedUdpEventEmitter emitter_;
43 class UdpSendWork : public UdpWork {
45 explicit UdpSendWork(const ScopedUdpEventEmitter& emitter,
46 const ScopedSocketNode& node)
47 : UdpWork(emitter), node_(node) {}
49 virtual bool Start(int32_t val) {
50 AUTO_LOCK(emitter_->GetLock());
52 // Does the stream exist, and can it send?
53 if (!node_->TestStreamFlags(SSF_CAN_SEND))
56 packet_ = emitter_->ReadTXPacket_Locked();
60 int err = UDPInterface()->SendTo(node_->socket_resource(),
64 filesystem()->GetRunCompletion(this));
65 if (err != PP_OK_COMPLETIONPENDING) {
66 // Anything else, we should assume the socket has gone bad.
67 node_->SetError_Locked(err);
71 node_->SetStreamFlags(SSF_SENDING);
75 virtual void Run(int32_t length_error) {
76 AUTO_LOCK(emitter_->GetLock());
78 if (length_error < 0) {
79 node_->SetError_Locked(length_error);
83 // If we did send, then Q more work.
84 node_->ClearStreamFlags(SSF_SENDING);
89 // We assume that transmits will always complete. If the upstream
90 // actually back pressures, enough to prevent the Send callback
91 // from triggering, this resource may never go away.
92 ScopedSocketNode node_;
95 class UdpRecvWork : public UdpWork {
97 explicit UdpRecvWork(const ScopedUdpEventEmitter& emitter)
101 virtual bool Start(int32_t val) {
102 AUTO_LOCK(emitter_->GetLock());
103 UdpNode* stream = static_cast<UdpNode*>(emitter_->stream());
105 // Does the stream exist, and can it recv?
106 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
109 // Check if we are already receiving.
110 if (stream->TestStreamFlags(SSF_RECVING))
113 stream->SetStreamFlags(SSF_RECVING);
114 int err = UDPInterface()->RecvFrom(stream->socket_resource(),
118 filesystem()->GetRunCompletion(this));
119 if (err != PP_OK_COMPLETIONPENDING) {
120 stream->SetError_Locked(err);
127 virtual void Run(int32_t length_error) {
128 AUTO_LOCK(emitter_->GetLock());
129 UdpNode* stream = static_cast<UdpNode*>(emitter_->stream());
133 // On successful receive we queue more input
134 if (length_error > 0) {
135 Packet* packet = new Packet(filesystem()->ppapi());
136 packet->Copy(data_, length_error, addr_);
137 filesystem()->ppapi()->ReleaseResource(addr_);
138 emitter_->WriteRXPacket_Locked(packet);
139 stream->ClearStreamFlags(SSF_RECVING);
140 stream->QueueInput();
142 stream->SetError_Locked(length_error);
147 char data_[kMaxPacketSize];
151 UdpNode::UdpNode(Filesystem* filesystem)
152 : SocketNode(filesystem),
153 emitter_(new UdpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)) {
154 emitter_->AttachStream(this);
157 void UdpNode::Destroy() {
158 emitter_->DetachStream();
159 SocketNode::Destroy();
162 UdpEventEmitter* UdpNode::GetEventEmitter() {
163 return emitter_.get();
166 Error UdpNode::Init(int open_flags) {
167 Error err = SocketNode::Init(open_flags);
171 if (UDPInterface() == NULL) {
172 LOG_ERROR("Got NULL interface: UDP");
177 UDPInterface()->Create(filesystem_->ppapi()->GetInstance());
178 if (0 == socket_resource_) {
179 LOG_ERROR("Unable to create UDP resource.");
186 void UdpNode::QueueInput() {
187 UdpRecvWork* work = new UdpRecvWork(emitter_);
188 stream()->EnqueueWork(work);
191 void UdpNode::QueueOutput() {
192 if (!TestStreamFlags(SSF_CAN_SEND))
195 if (TestStreamFlags(SSF_SENDING))
198 UdpSendWork* work = new UdpSendWork(emitter_, ScopedSocketNode(this));
199 stream()->EnqueueWork(work);
202 Error UdpNode::Bind(const struct sockaddr* addr, socklen_t len) {
203 if (0 == socket_resource_)
206 /* Only bind once. */
210 PP_Resource out_addr = SockAddrToResource(addr, len);
215 UDPInterface()->Bind(socket_resource_, out_addr, PP_BlockUntilComplete());
216 filesystem_->ppapi()->ReleaseResource(out_addr);
218 return PPErrorToErrno(err);
220 // Get the address that was actually bound (in case addr was 0.0.0.0:0).
221 out_addr = UDPInterface()->GetBoundAddress(socket_resource_);
225 // Now that we are bound, we can start sending and receiving.
226 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
229 local_addr_ = out_addr;
233 Error UdpNode::Connect(const HandleAttr& attr,
234 const struct sockaddr* addr,
236 if (0 == socket_resource_)
239 /* Connect for UDP is the default dest, it's legal to change it. */
240 if (remote_addr_ != 0) {
241 filesystem_->ppapi()->ReleaseResource(remote_addr_);
245 remote_addr_ = SockAddrToResource(addr, len);
246 if (0 == remote_addr_)
252 Error UdpNode::Recv_Locked(void* buf,
254 PP_Resource* out_addr,
256 Packet* packet = emitter_->ReadRXPacket_Locked();
261 int capped_len = static_cast<int32_t>(std::min<int>(len, packet->len()));
262 memcpy(buf, packet->buffer(), capped_len);
264 if (packet->addr() != 0) {
265 filesystem_->ppapi()->AddRefResource(packet->addr());
266 *out_addr = packet->addr();
269 *out_len = capped_len;
274 // Should never happen, Recv_Locked should not be called
275 // unless already in a POLLIN state.
279 Error UdpNode::Send_Locked(const void* buf,
284 // Pepper requires a socket to be bound before it can send.
286 addr.sin_family = AF_INET;
288 memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
290 Bind(reinterpret_cast<const struct sockaddr*>(&addr), sizeof(addr));
296 int capped_len = static_cast<int32_t>(std::min<int>(len, kMaxPacketSize));
297 Packet* packet = new Packet(filesystem_->ppapi());
298 packet->Copy(buf, capped_len, addr);
300 emitter_->WriteTXPacket_Locked(packet);
301 *out_len = capped_len;
305 } // namespace nacl_io